-
Notifications
You must be signed in to change notification settings - Fork 14
/
sqs.go
115 lines (93 loc) · 2.67 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/brexhq/substation"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/message"
"golang.org/x/sync/errgroup"
)
type sqsMetadata struct {
EventSourceArn string `json:"eventSourceArn"`
MessageID string `json:"messageId"`
BodyMd5 string `json:"bodyMd5"`
Attributes map[string]string `json:"attributes"`
}
func sqsHandler(ctx context.Context, event events.SQSEvent) error {
// Retrieve and load configuration.
conf, err := getConfig(ctx)
if err != nil {
return fmt.Errorf("sqs handler: %v", err)
}
cfg := customConfig{}
if err := json.NewDecoder(conf).Decode(&cfg); err != nil {
return fmt.Errorf("sqs handler: %v", err)
}
sub, err := substation.New(ctx, cfg.Config)
if err != nil {
return fmt.Errorf("sqs handler: %v", err)
}
ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)
// Data transformation. Transforms are executed concurrently using a worker pool
// managed by an errgroup. Each message is processed in a separate goroutine.
group.Go(func() error {
tfGroup, tfCtx := errgroup.WithContext(ctx)
tfGroup.SetLimit(cfg.Concurrency)
for message := range ch.Recv() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msg := message
tfGroup.Go(func() error {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := sub.Transform(tfCtx, msg); err != nil {
return err
}
return nil
})
}
if err := tfGroup.Wait(); err != nil {
return err
}
// CTRL messages flush the pipeline. This must be done
// after all messages have been processed.
ctrl := message.New().AsControl()
if _, err := sub.Transform(ctx, ctrl); err != nil {
return err
}
return nil
})
// Data ingest.
group.Go(func() error {
defer ch.Close()
// Create Message metadata.
m := sqsMetadata{
EventSourceArn: event.Records[0].EventSourceARN,
MessageID: event.Records[0].MessageId,
BodyMd5: event.Records[0].Md5OfBody,
Attributes: event.Records[0].Attributes,
}
metadata, err := json.Marshal(m)
if err != nil {
return fmt.Errorf("sqs handler: %v", err)
}
for _, record := range event.Records {
b := []byte(record.Body)
msg := message.New().SetData(b).SetMetadata(metadata)
ch.Send(msg)
}
return nil
})
// Wait for all goroutines to complete. This includes the goroutines that are
// executing the transform functions.
if err := group.Wait(); err != nil {
return fmt.Errorf("sqs handler: %v", err)
}
return nil
}