/
kinesis_stream.go
128 lines (105 loc) · 2.91 KB
/
kinesis_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"context"
"encoding/json"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/awslabs/kinesis-aggregation/go/deaggregator"
"github.com/brexhq/substation"
"github.com/brexhq/substation/internal/aws/kinesis"
"github.com/brexhq/substation/internal/channel"
"github.com/brexhq/substation/message"
"golang.org/x/sync/errgroup"
)
type kinesisStreamMetadata struct {
ApproximateArrivalTimestamp time.Time `json:"approximateArrivalTimestamp"`
Stream string `json:"stream"`
PartitionKey string `json:"partitionKey"`
SequenceNumber string `json:"sequenceNumber"`
}
func kinesisStreamHandler(ctx context.Context, event events.KinesisEvent) error {
// Retrieve and load configuration.
conf, err := getConfig(ctx)
if err != nil {
return err
}
cfg := customConfig{}
if err := json.NewDecoder(conf).Decode(&cfg); err != nil {
return err
}
sub, err := substation.New(ctx, cfg.Config)
if err != nil {
return err
}
ch := channel.New[*message.Message]()
group, ctx := errgroup.WithContext(ctx)
// Data transformation. Transforms are executed concurrently using a worker pool
// managed by an errgroup. Each message is processed in a separate goroutine.
group.Go(func() error {
tfGroup, tfCtx := errgroup.WithContext(ctx)
tfGroup.SetLimit(cfg.Concurrency)
for message := range ch.Recv() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msg := message
tfGroup.Go(func() error {
// Transformed messages are never returned to the caller because
// invocation is asynchronous.
if _, err := sub.Transform(tfCtx, msg); err != nil {
return err
}
return nil
})
}
if err := tfGroup.Wait(); err != nil {
return err
}
// CTRL messages flush the pipeline. This must be done
// after all messages have been processed.
ctrl := message.New().AsControl()
if _, err := sub.Transform(ctx, ctrl); err != nil {
return err
}
return nil
})
// Data ingest.
group.Go(func() error {
defer ch.Close()
eventSourceArn := event.Records[len(event.Records)-1].EventSourceArn
converted := kinesis.ConvertEventsRecords(event.Records)
deaggregated, err := deaggregator.DeaggregateRecords(converted)
if err != nil {
return err
}
for _, record := range deaggregated {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Create Message metadata.
m := kinesisStreamMetadata{
*record.ApproximateArrivalTimestamp,
eventSourceArn,
*record.PartitionKey,
*record.SequenceNumber,
}
metadata, err := json.Marshal(m)
if err != nil {
return err
}
msg := message.New().SetData(record.Data).SetMetadata(metadata)
ch.Send(msg)
}
return nil
})
// Wait for all goroutines to complete. This includes the goroutines that are
// executing the transform functions.
if err := group.Wait(); err != nil {
return err
}
return nil
}