-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
116 lines (103 loc) · 2.81 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package sqs
import (
"errors"
"strings"
"github.com/PacktPublishing/Cloud-Native-programming-with-Golang/chapter06/src/lib/msgqueue"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
type SQSListener struct {
mapper msgqueue.EventMapper
sqsSvc *sqs.SQS
queueURL *string
maxNumberOfMessages int64
waitTime int64
visibilityTimeOut int64
}
func NewSQSListener(s *session.Session, queueName string, maxMsgs, wtTime, visTO int64) (listener msgqueue.EventListener, err error) {
if s == nil {
s, err = session.NewSession()
if err != nil {
return
}
}
svc := sqs.New(s)
QUResult, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return
}
listener = &SQSListener{
sqsSvc: svc,
queueURL: QUResult.QueueUrl,
mapper: msgqueue.NewEventMapper(),
maxNumberOfMessages: maxMsgs,
waitTime: wtTime,
visibilityTimeOut: visTO,
}
return
}
func (sqsListener *SQSListener) Listen(events ...string) (<-chan msgqueue.Event, <-chan error, error) {
if sqsListener == nil {
return nil, nil, errors.New("SQSListener: the Listen() method was called on a nil pointer")
}
eventCh := make(chan msgqueue.Event)
errorCh := make(chan error)
go func() {
for {
sqsListener.receiveMessage(eventCh, errorCh)
}
}()
return eventCh, errorCh, nil
}
func (sqsListener *SQSListener) receiveMessage(eventCh chan msgqueue.Event, errorCh chan error, events ...string) {
recvMsgResult, err := sqsListener.sqsSvc.ReceiveMessage(&sqs.ReceiveMessageInput{
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: sqsListener.queueURL,
MaxNumberOfMessages: aws.Int64(sqsListener.maxNumberOfMessages),
WaitTimeSeconds: aws.Int64(sqsListener.waitTime),
VisibilityTimeout: aws.Int64(sqsListener.visibilityTimeOut),
})
if err != nil {
errorCh <- err
return
}
bContinue := false
for _, msg := range recvMsgResult.Messages {
value, ok := msg.MessageAttributes["event_name"]
if !ok {
continue
}
eventName := aws.StringValue(value.StringValue)
for _, event := range events {
if strings.EqualFold(eventName, event) {
bContinue = true
break
}
}
if !bContinue {
continue
}
message := aws.StringValue(msg.Body)
event, err := sqsListener.mapper.MapEvent(eventName, []byte(message))
if err != nil {
errorCh <- err
continue
}
eventCh <- event
_, err = sqsListener.sqsSvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: sqsListener.queueURL,
ReceiptHandle: msg.ReceiptHandle,
})
if err != nil {
errorCh <- err
}
}
}
func (sqsListener *SQSListener) Mapper() msgqueue.EventMapper {
return sqsListener.mapper
}