-
Notifications
You must be signed in to change notification settings - Fork 3
/
sqsproducer.go
113 lines (100 loc) · 2.98 KB
/
sqsproducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
Copyright 2021 Adevinta
*/
package queue
import (
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
var (
// ErrQueueDoesNotExist is returned when the MultipleSQSProducer tries to send
// a message to a queue that is not defined.
ErrQueueDoesNotExist = errors.New("queue does not exist")
)
// SQSProducer reads and consumes sqs messages.
type SQSProducer struct {
sqs sqsiface.SQSAPI
queueURL string
}
// NewSQSProducer creates a new SQSProducer that allows to send messages to
// the given queueARN.
func NewSQSProducer(queueARN string, endpoint string, log log.Logger) (*SQSProducer, error) {
sess, err := session.NewSession()
if err != nil {
_ = level.Error(log).Log("CreatingAWSSession", err)
return nil, err
}
arn, err := arn.Parse(queueARN)
if err != nil {
_ = level.Error(log).Log("ParsingSQSQueueARN", err)
return nil, fmt.Errorf("error parsing SQS queue ARN: %v", err)
}
awsCfg := aws.NewConfig()
if arn.Region != "" {
awsCfg = awsCfg.WithRegion(arn.Region)
}
if endpoint != "" {
awsCfg = awsCfg.WithEndpoint(endpoint)
}
sqsSrv := sqs.New(sess, awsCfg)
params := &sqs.GetQueueUrlInput{
QueueName: aws.String(arn.Resource),
}
if arn.AccountID != "" {
params.SetQueueOwnerAWSAccountId(arn.AccountID)
}
resp, err := sqsSrv.GetQueueUrl(params)
if err != nil {
_ = level.Error(log).Log("ErrorRetrievingSQSURL", err)
return nil, fmt.Errorf("error retrieving SQS queue URL: %v", err)
}
if resp.QueueUrl == nil {
return nil, errors.New("unexpected nill getting SQSProducer queue ARN")
}
return &SQSProducer{
queueURL: *resp.QueueUrl,
sqs: sqsSrv,
}, nil
}
// SendMessage sends a message to the producer defined queue.
func (s *SQSProducer) SendMessage(body string) error {
msg := &sqs.SendMessageInput{
QueueUrl: &s.queueURL,
MessageBody: &body,
}
_, err := s.sqs.SendMessage(msg)
return err
}
// MultiSQSProducer allows to send messages to different named queues.
type MultiSQSProducer struct {
producers map[string]*SQSProducer
}
// NewMultiSQSProducer creates a new MultipleSQSProducer given a map containing
// the name of the queues as keys and the ARN for those queues as values.
func NewMultiSQSProducer(queues map[string]string, endpoint string, log log.Logger) (*MultiSQSProducer, error) {
var m = make(map[string]*SQSProducer)
for n, a := range queues {
producer, err := NewSQSProducer(a, endpoint, log)
if err != nil {
return nil, err
}
m[n] = producer
}
return &MultiSQSProducer{m}, nil
}
// Send send a message to a queue with the given name. If the queue
// is not defined in the producer a QueueDoesNotExistError is returned.
func (m *MultiSQSProducer) Send(queueName string, body string) error {
p, ok := m.producers[queueName]
if !ok {
return ErrQueueDoesNotExist
}
return p.SendMessage(body)
}