/
queue_watcher.go
84 lines (65 loc) · 2.01 KB
/
queue_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package watcher
import (
"encoding/json"
"fmt"
"os"
"github.com/Gamebuildr/Dave/pkg/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
)
// QueueWatcher is the aws sns implementation of the NotificationService
type QueueWatcher struct {
client sqsiface.SQSAPI
}
var messageIDToReceiptHandle = map[string]*string{}
// Setup gets the watcher ready to use
func (watcher *QueueWatcher) Setup() {
sess := session.Must(session.NewSession())
sess.Config.Region = aws.String(os.Getenv(config.Region))
watcher.client = sqs.New(sess)
}
// ReadNextMessage from the specified queue
func (watcher QueueWatcher) ReadNextMessage(url string) (*MessageInfo, error) {
response, err := watcher.getMessageFromSqs(url)
if err != nil {
return new(MessageInfo), err
}
messages := response.Messages
if len(messages) == 0 {
return new(MessageInfo), nil
}
message := messages[0]
var messageInfo MessageInfo
data := []byte(*message.Body)
json.Unmarshal(data, &messageInfo)
messageIDToReceiptHandle[messageInfo.MessageID] = message.ReceiptHandle
return &messageInfo, err
}
// DeleteMessage with ID from the specified queue
func (watcher QueueWatcher) DeleteMessage(messageID string, url string) error {
handle, exists := messageIDToReceiptHandle[messageID]
if !exists {
return fmt.Errorf("Message requested for delete does not exist: %v", messageID)
}
deleteMsg := &sqs.DeleteMessageInput{
QueueUrl: aws.String(url),
ReceiptHandle: handle,
}
_, err := watcher.client.DeleteMessage(deleteMsg)
if err != nil {
return err
}
return nil
}
func (watcher *QueueWatcher) getMessageFromSqs(url string) (*sqs.ReceiveMessageOutput, error) {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(url),
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(1),
WaitTimeSeconds: aws.Int64(1),
}
response, err := watcher.client.ReceiveMessage(params)
return response, err
}