/
job_message_queue.go
127 lines (113 loc) · 4.15 KB
/
job_message_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/awslabs/aws-go-wordfreq-sample"
)
// A JobMessageQueue provides listening to a SQS queue for job messages, and
// providing those job messages as a job channel to workers so the jobs can be
// processed.
type JobMessageQueue struct {
queueURL string
queueVisibility int64
queueWait int64
jobCh chan *wordfreq.Job
msgSvc sqsiface.SQSAPI
}
// NewJobMessageQueue creates a new instance of the JobMessageQueue configuring it
// for the SQS service client it will use. The sqsiface.SQSAPI is used so that
// the code could be unit tested in isolating without also testing the SDK.
func NewJobMessageQueue(url string, visibilityTime, waitTime int64, svc sqsiface.SQSAPI) *JobMessageQueue {
return &JobMessageQueue{
queueURL: url,
queueVisibility: visibilityTime,
queueWait: waitTime,
jobCh: make(chan *wordfreq.Job, 10),
msgSvc: svc,
}
}
// Listen waits for messages to arrive from the SQS queue, parses the JSON
// message and sends the jobs to the job channel to be processed by the worker pool.
func (m *JobMessageQueue) Listen(doneCh <-chan struct{}) {
fmt.Println("Job Message queue starting")
defer close(m.jobCh)
defer fmt.Println("Job Message queue quitting.")
for {
select {
case <-doneCh:
return
default:
msgs, err := m.receiveMsg()
if err != nil {
log.Println("Failed to read from message queue", err)
time.Sleep(5 * time.Second)
continue
}
// Since SQS ReceiveMessage could return multiple messages at once
// we should loop over then instead of assuming only a single message
// message is returned. This is also and easier pattern if we want
// to bump up the number of messages that will be read from SQS at once
// by default only one message is read.
for _, msg := range msgs {
parseErr := parseJobMessage(m.jobCh,
wordfreq.JobMessage{
ID: *msg.MessageId,
ReceiptHandle: *msg.ReceiptHandle,
Body: *msg.Body,
},
m.queueVisibility,
)
if parseErr != nil {
fmt.Println("Failed to parse", *msg.MessageId, "job message,", parseErr)
m.DeleteMessage(*msg.ReceiptHandle)
}
}
}
}
}
// receiveMsg reads a message from the SQS job queue. A visibility timeout is set
// so that no other reader will be able to see the message which this service
// received. Preventing duplication of work. And a wait time provides long pooling
// so the service does not need to micro manage its pooling of SQS.
func (m *JobMessageQueue) receiveMsg() ([]*sqs.Message, error) {
result, err := m.msgSvc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(m.queueURL),
WaitTimeSeconds: aws.Int64(m.queueWait),
VisibilityTimeout: aws.Int64(m.queueVisibility),
})
if err != nil {
return nil, err
}
return result.Messages, nil
}
// DeleteMessage deletes a previously received message from the job message queue
// Once a job is complete it can safely be deleted from the queue so that no
// other service or worker will rerun the job.
func (m *JobMessageQueue) DeleteMessage(receiptHandle string) error {
_, err := m.msgSvc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(m.queueURL),
ReceiptHandle: aws.String(receiptHandle),
})
return err
}
// UpdateMessageVisibility extends the amount of time a job message is hidden from
// other readers of the SQS job queue. This allows a worker to keep processing
// a long running job.
func (m *JobMessageQueue) UpdateMessageVisibility(receiptHandle string) (int64, error) {
_, err := m.msgSvc.ChangeMessageVisibility(&sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(m.queueURL),
ReceiptHandle: aws.String(receiptHandle),
VisibilityTimeout: aws.Int64(m.queueVisibility),
})
return m.queueVisibility, err
}
// GetJobs returns a read only channel to read jobs from. This channel will
// be closed when the JobMessageQueue no longer is listening for further SQS
// job messages.
func (m *JobMessageQueue) GetJobs() <-chan *wordfreq.Job {
return m.jobCh
}