forked from aws/amazon-kinesis-streams-for-fluent-bit
/
aggregator.go
232 lines (188 loc) · 6.63 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package aggregate
import (
"crypto/md5"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/compress"
"github.com/canva/amazon-kinesis-streams-for-fluent-bit/util"
)
var (
// Magic number for KCL aggregated records. See this for details: https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
kclMagicNumber = []byte{0xF3, 0x89, 0x9A, 0xC2}
kclMagicNumberLen = len(kclMagicNumber)
)
const (
defaultMaximumRecordSize = 1024 * 1024 // 1 MB
defaultMaxAggRecordSize = 20 * 1024 // 20K
initialAggRecordSize = 0
fieldNumberSize = 1 // All field numbers are below 16, meaning they will only take up 1 byte
)
// Aggregator kinesis aggregator
type Aggregator struct {
partitionKeys map[string]uint64
records []*Record
aggSize int // Size of both records, and partitionKeys in bytes
maximumRecordSize int
maxAggRecordSize int
stringGen *util.RandomStringGenerator
}
// Config is for aggregation customizations.
type Config struct {
MaximumRecordSize *int
MaxAggRecordSize *int
}
// NewAggregator create a new aggregator
func NewAggregator(stringGen *util.RandomStringGenerator, cfg *Config) *Aggregator {
a := &Aggregator{
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maximumRecordSize: defaultMaximumRecordSize,
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
stringGen: stringGen,
}
if cfg.MaximumRecordSize != nil {
a.maximumRecordSize = *cfg.MaximumRecordSize
}
if cfg.MaxAggRecordSize != nil {
a.maxAggRecordSize = *cfg.MaxAggRecordSize
}
return a
}
// AddRecord to the aggregate buffer.
// Will return a kinesis PutRecordsRequest once buffer is full, or if the data exceeds the aggregate limit.
func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {
if hasPartitionKey {
partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
}
}
dataSize := len(data)
// If this is a very large record, then don't aggregate it.
if dataSize >= a.maxAggRecordSize {
if !hasPartitionKey {
partitionKey = a.stringGen.RandomString()
}
return &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
}, nil
}
if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take any partition key from the map, as long as one exists
for k, _ := range a.partitionKeys {
partitionKey = k
break
}
} else {
partitionKey = a.stringGen.RandomString()
}
}
// Check if we need to add a new partition key, and if we do how much space it will take
pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)
// data field size is proto size of data + data field number size
// partition key field size is varint of index size + field number size
dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize
pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is byte size of data + pkey field + field number of parent proto
if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= a.maximumRecordSize {
// Aggregate records, and return if error
entry, err = a.AggregateRecords()
if err != nil {
return entry, err
}
if !hasPartitionKey {
// choose a new partition key if needed now that we've aggregated the previous records
partitionKey = a.stringGen.RandomString()
}
// Recompute field size, since it changed
pKeyIdx, _ = a.checkPartitionKey(partitionKey)
pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
}
// Add new record, and update aggSize
partitionKeyIndex := a.addPartitionKey(partitionKey)
a.records = append(a.records, &Record{
Data: data,
PartitionKeyIndex: &partitionKeyIndex,
})
a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize
return entry, err
}
// AggregateRecords will flush proto-buffered records into a put request
func (a *Aggregator) AggregateRecords() (entry *kinesis.PutRecordsRequestEntry, err error) {
if len(a.records) == 0 {
return nil, nil
}
pkeys := a.getPartitionKeys()
agg := &AggregatedRecord{
PartitionKeyTable: pkeys,
Records: a.records,
}
protoBufData, err := proto.Marshal(agg)
if err != nil {
logrus.Errorf("Failed to encode record: %v", err)
return nil, err
}
md5Sum := md5.New()
md5Sum.Write(protoBufData)
md5CheckSum := md5Sum.Sum(nil)
kclData := append(kclMagicNumber, protoBufData...)
kclData = append(kclData, md5CheckSum...)
logrus.Debugf("[kinesis ] Aggregated (%d) records of size (%d) with total size (%d), partition key (%s)\n", len(a.records), a.getSize(), len(kclData), pkeys[0])
compressedData, err := compress.Compress(kclData)
if err != nil {
logrus.Warnf("Failed to compress KCL data: %v", err)
// This should not result in dropping records/increasing retries.
// Compressor will return original data if it fails to compress them.
}
// Clear buffer if aggregation didn't fail
a.clearBuffer()
return &kinesis.PutRecordsRequestEntry{
Data: compressedData,
PartitionKey: aws.String(pkeys[0]),
}, nil
}
// GetRecordCount gets number of buffered records
func (a *Aggregator) GetRecordCount() int {
return len(a.records)
}
func (a *Aggregator) addPartitionKey(partitionKey string) uint64 {
if idx, ok := a.partitionKeys[partitionKey]; ok {
return idx
}
idx := uint64(len(a.partitionKeys))
a.partitionKeys[partitionKey] = idx
partitionKeyLen := len([]byte(partitionKey))
a.aggSize += protowire.SizeBytes(partitionKeyLen) + fieldNumberSize
return idx
}
func (a *Aggregator) checkPartitionKey(partitionKey string) (uint64, int) {
if idx, ok := a.partitionKeys[partitionKey]; ok {
return idx, 0
}
idx := uint64(len(a.partitionKeys))
partitionKeyLen := len([]byte(partitionKey))
return idx, protowire.SizeBytes(partitionKeyLen) + fieldNumberSize
}
func (a *Aggregator) getPartitionKeys() []string {
keys := make([]string, 0)
for pk := range a.partitionKeys {
keys = append(keys, pk)
}
return keys
}
// getSize of protobuf records, partitionKeys, magicNumber, and md5sum in bytes
func (a *Aggregator) getSize() int {
return kclMagicNumberLen + md5.Size + a.aggSize
}
func (a *Aggregator) clearBuffer() {
a.partitionKeys = make(map[string]uint64, 0)
a.records = make([]*Record, 0)
a.aggSize = initialAggRecordSize
}