/
sqs.go
79 lines (70 loc) · 2.23 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main
import (
"log"
"sort"
"strconv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
const (
awsRegion = "us-east-1"
queueNamePrefix = "gaia-sim-"
)
var (
sessionSQS = sqs.New(session.Must(session.NewSession(&aws.Config{
Region: aws.String(awsRegion),
})))
)
func sendBatch(batchRequestEntries []*sqs.SendMessageBatchRequestEntry, queues *sqs.ListQueuesOutput) {
if _, err = sessionSQS.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: batchRequestEntries,
QueueUrl: queues.QueueUrls[0],
}); err != nil {
// If this fails, simulation can still run, just output will be uglier
log.Print(err.Error())
}
}
func removeEmpties(batch []*sqs.SendMessageBatchRequestEntry) []*sqs.SendMessageBatchRequestEntry {
var newBatch []*sqs.SendMessageBatchRequestEntry
for _, msg := range batch {
if msg != nil {
newBatch = append(newBatch, msg)
}
}
return newBatch
}
func sendSqsMsg(instanceIndex []int) {
var (
maxMessages = 0
batchRequestEntries = make([]*sqs.SendMessageBatchRequestEntry, 10)
queues *sqs.ListQueuesOutput
)
if queues, err = sessionSQS.ListQueues(&sqs.ListQueuesInput{
QueueNamePrefix: aws.String(queueNamePrefix),
}); err != nil {
log.Fatal(err.Error())
}
sort.Ints(instanceIndex)
for index := range instanceIndex {
batchRequestEntries[maxMessages] = &sqs.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(index)),
MessageBody: aws.String("Instance " + strconv.Itoa(index)), // Required field, we don't care about the body right now
}
maxMessages++
// SQS only accepts batches of max 10 messages
if maxMessages == 10 {
sendBatch(batchRequestEntries, queues)
batchRequestEntries = make([]*sqs.SendMessageBatchRequestEntry, 10)
maxMessages = 0
}
// We want the queue length to be one less than the number of instances that were created
// An empty queue will prompt the last running instance to send the simulation finished message
if index == len(instanceIndex)-2 {
// Can't have nil elements in the list or the sqs send function will segfault
batchRequestEntries = removeEmpties(batchRequestEntries)
sendBatch(batchRequestEntries, queues)
break
}
}
}