/
sqs.go
79 lines (68 loc) · 1.86 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package sqs
import (
"context"
"fmt"
"strconv"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/rs/zerolog"
)
const (
batchSize = 10
)
type Client struct {
*sqs.Client
log zerolog.Logger
}
func New(cfg aws.Config, log zerolog.Logger) *Client {
return &Client{
Client: sqs.NewFromConfig(cfg),
log: log.With().Str("module", "sqs").Logger(),
}
}
func (c *Client) NewPublisher(ctx context.Context, queue string, msgs <-chan string) error {
batch := make([]*string, 0, batchSize)
for msg := range msgs {
msg := msg
batch = append(batch, &msg)
if len(batch) == batchSize {
if err := c.publishBatch(ctx, queue, batch); err != nil {
return fmt.Errorf("failed to publish batch: %w", err)
}
batch = make([]*string, 0, batchSize)
}
}
if len(batch) > 0 {
if err := c.publishBatch(ctx, queue, batch); err != nil {
return fmt.Errorf("failed to publish batch: %w", err)
}
}
return nil
}
func (c *Client) publishBatch(ctx context.Context, queue string, batch []*string) error {
log := c.log.With().Str("queue", queue).Logger()
log.Debug().Int("batch_size", len(batch)).Msg("publishing batch")
result, err := c.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{
QueueUrl: aws.String(queue),
Entries: toEntries(batch),
})
if err != nil {
return err
}
for _, f := range result.Failed {
log.Error().Str("id", *f.Id).Str("code", *f.Code).Str("message", *f.Message).Msg("failed to publish message")
}
log.Debug().Msg("published batch")
return nil
}
func toEntries(batch []*string) []types.SendMessageBatchRequestEntry {
entries := make([]types.SendMessageBatchRequestEntry, len(batch))
for i, m := range batch {
entries[i] = types.SendMessageBatchRequestEntry{
Id: aws.String(strconv.Itoa(i)),
MessageBody: m,
}
}
return entries
}