forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aws_kinesis_record_batcher.go
121 lines (103 loc) · 2.94 KB
/
aws_kinesis_record_batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package input
import (
"context"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/dafanshu/benthos/v3/internal/checkpoint"
"github.com/dafanshu/benthos/v3/lib/message"
"github.com/dafanshu/benthos/v3/lib/message/batch"
"github.com/dafanshu/benthos/v3/lib/types"
)
type awsKinesisRecordBatcher struct {
streamID string
shardID string
batchPolicy *batch.Policy
checkpointer *checkpoint.Capped
flushedMessage types.Message
batchedSequence string
ackedSequence string
ackedMut sync.Mutex
ackedWG sync.WaitGroup
}
func (k *kinesisReader) newAWSKinesisRecordBatcher(streamID, shardID, sequence string) (*awsKinesisRecordBatcher, error) {
batchPolicy, err := batch.NewPolicy(k.conf.Batching, k.mgr, k.log, k.stats)
if err != nil {
return nil, fmt.Errorf("failed to initialize batch policy for shard consumer: %w", err)
}
return &awsKinesisRecordBatcher{
streamID: streamID,
shardID: shardID,
batchPolicy: batchPolicy,
checkpointer: checkpoint.NewCapped(int64(k.conf.CheckpointLimit)),
ackedSequence: sequence,
}, nil
}
func (a *awsKinesisRecordBatcher) AddRecord(r *kinesis.Record) bool {
p := message.NewPart(r.Data)
met := p.Metadata()
met.Set("kinesis_stream", a.streamID)
met.Set("kinesis_shard", a.shardID)
if r.PartitionKey != nil {
met.Set("kinesis_partition_key", *r.PartitionKey)
}
met.Set("kinesis_sequence_number", *r.SequenceNumber)
a.batchedSequence = *r.SequenceNumber
if a.flushedMessage != nil {
// Upstream shouldn't really be adding records if a prior flush was
// unsuccessful. However, we can still accommodate this by appending it
// to the flushed message.
a.flushedMessage.Append(p)
return true
}
return a.batchPolicy.Add(p)
}
func (a *awsKinesisRecordBatcher) HasPendingMessage() bool {
return a.flushedMessage != nil
}
func (a *awsKinesisRecordBatcher) FlushMessage(ctx context.Context) (asyncMessage, error) {
if a.flushedMessage == nil {
if a.flushedMessage = a.batchPolicy.Flush(); a.flushedMessage == nil {
return asyncMessage{}, nil
}
}
resolveFn, err := a.checkpointer.Track(ctx, a.batchedSequence, int64(a.flushedMessage.Len()))
if err != nil {
if err == types.ErrTimeout {
err = nil
}
return asyncMessage{}, err
}
a.ackedWG.Add(1)
aMsg := asyncMessage{
msg: a.flushedMessage,
ackFn: func(ctx context.Context, res types.Response) error {
topSequence := resolveFn()
if topSequence != nil {
a.ackedMut.Lock()
a.ackedSequence = topSequence.(string)
a.ackedMut.Unlock()
}
a.ackedWG.Done()
return err
},
}
a.flushedMessage = nil
return aMsg, nil
}
func (a *awsKinesisRecordBatcher) UntilNext() time.Duration {
return a.batchPolicy.UntilNext()
}
func (a *awsKinesisRecordBatcher) GetSequence() string {
a.ackedMut.Lock()
seq := a.ackedSequence
a.ackedMut.Unlock()
return seq
}
func (a *awsKinesisRecordBatcher) Close(blocked bool) {
if blocked {
a.ackedWG.Wait()
}
a.batchPolicy.CloseAsync()
}