/
dedupe.go
302 lines (256 loc) · 8.96 KB
/
dedupe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package processor
import (
"bytes"
"context"
"fmt"
"strconv"
"time"
"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/internal/tracing"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/response"
"github.com/Jeffail/benthos/v3/lib/types"
"github.com/OneOfOne/xxhash"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeDedupe] = TypeSpec{
constructor: NewDedupe,
Categories: []Category{
CategoryUtility,
},
Summary: `
Deduplicates message batches by caching selected (and optionally hashed)
messages, dropping batches that are already cached.`,
Description: `
This processor acts across an entire batch, in order to deduplicate individual
messages within a batch use this processor with the
` + "[`for_each`](/docs/components/processors/for_each)" + ` processor.
Optionally, the ` + "`key`" + ` field can be populated in order to hash on a
function interpolated string rather than the full contents of messages. This
allows you to deduplicate based on dynamic fields within a message, such as its
metadata, JSON fields, etc. A full list of interpolation functions can be found
[here](/docs/configuration/interpolation#bloblang-queries).
For example, the following config would deduplicate based on the concatenated
values of the metadata field ` + "`kafka_key`" + ` and the value of the JSON
path ` + "`id`" + ` within the message contents:
` + "```yaml" + `
pipeline:
processors:
- dedupe:
cache: foocache
key: ${! meta("kafka_key") }-${! json("id") }
` + "```" + `
Caches should be configured as a resource, for more information check out the
[documentation here](/docs/components/caches/about).
When using this processor with an output target that might fail you should
always wrap the output within a ` + "[`retry`](/docs/components/outputs/retry)" + `
block. This ensures that during outages your messages aren't reprocessed after
failures, which would result in messages being dropped.
## Delivery Guarantees
Performing deduplication on a stream using a distributed cache voids any
at-least-once guarantees that it previously had. This is because the cache will
preserve message signatures even if the message fails to leave the Benthos
pipeline, which would cause message loss in the event of an outage at the output
sink followed by a restart of the Benthos instance.
If you intend to preserve at-least-once delivery guarantees you can avoid this
problem by using a memory based cache. This is a compromise that can achieve
effective deduplication but parallel deployments of the pipeline as well as
service restarts increase the chances of duplicates passing undetected.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("cache", "The [`cache` resource](/docs/components/caches/about) to target with this processor."),
docs.FieldCommon("hash", "The hash type to used.").HasOptions("none", "xxhash"),
docs.FieldCommon("key", "An optional key to use for deduplication (instead of the entire message contents).").IsInterpolated(),
docs.FieldCommon("drop_on_err", "Whether messages should be dropped when the cache returns an error."),
docs.FieldAdvanced("parts", "An array of message indexes within the batch to deduplicate based on. If left empty all messages are included. This field is only applicable when batching messages [at the input level](/docs/configuration/batching).").Array(),
},
}
}
//------------------------------------------------------------------------------
// DedupeConfig contains configuration fields for the Dedupe processor.
type DedupeConfig struct {
Cache string `json:"cache" yaml:"cache"`
HashType string `json:"hash" yaml:"hash"`
Parts []int `json:"parts" yaml:"parts"` // message parts to hash
Key string `json:"key" yaml:"key"`
DropOnCacheErr bool `json:"drop_on_err" yaml:"drop_on_err"`
}
// NewDedupeConfig returns a DedupeConfig with default values.
func NewDedupeConfig() DedupeConfig {
return DedupeConfig{
Cache: "",
HashType: "none",
Parts: []int{0}, // only consider the 1st part
Key: "",
DropOnCacheErr: true,
}
}
//------------------------------------------------------------------------------
type hasher interface {
Write(str []byte) (int, error)
Bytes() []byte
}
type hasherFunc func() hasher
//------------------------------------------------------------------------------
type xxhashHasher struct {
h *xxhash.XXHash64
}
func (x *xxhashHasher) Write(str []byte) (int, error) {
return x.h.Write(str)
}
func (x *xxhashHasher) Bytes() []byte {
return []byte(strconv.FormatUint(x.h.Sum64(), 10))
}
//------------------------------------------------------------------------------
func strToHasher(str string) (hasherFunc, error) {
switch str {
case "none":
return func() hasher {
return bytes.NewBuffer(nil)
}, nil
case "xxhash":
return func() hasher {
return &xxhashHasher{
h: xxhash.New64(),
}
}, nil
}
return nil, fmt.Errorf("hash type not recognised: %v", str)
}
//------------------------------------------------------------------------------
// Dedupe is a processor that deduplicates messages either by hashing the full
// contents of message parts or by hashing the value of an interpolated string.
type Dedupe struct {
conf Config
log log.Modular
stats metrics.Type
key *field.Expression
mgr types.Manager
cacheName string
hasherFunc hasherFunc
mCount metrics.StatCounter
mErrHash metrics.StatCounter
mErrCache metrics.StatCounter
mErr metrics.StatCounter
mDropped metrics.StatCounter
mSent metrics.StatCounter
mBatchSent metrics.StatCounter
}
// NewDedupe returns a Dedupe processor.
func NewDedupe(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
hFunc, err := strToHasher(conf.Dedupe.HashType)
if err != nil {
return nil, err
}
key, err := interop.NewBloblangField(mgr, conf.Dedupe.Key)
if err != nil {
return nil, fmt.Errorf("failed to parse key expression: %v", err)
}
if err := interop.ProbeCache(context.Background(), mgr, conf.Dedupe.Cache); err != nil {
return nil, err
}
return &Dedupe{
conf: conf,
log: log,
stats: stats,
key: key,
mgr: mgr,
cacheName: conf.Dedupe.Cache,
hasherFunc: hFunc,
mCount: stats.GetCounter("count"),
mErrHash: stats.GetCounter("error.hash"),
mErrCache: stats.GetCounter("error.cache"),
mErr: stats.GetCounter("error"),
mDropped: stats.GetCounter("dropped"),
mSent: stats.GetCounter("sent"),
mBatchSent: stats.GetCounter("batch.sent"),
}, nil
}
//------------------------------------------------------------------------------
// ProcessMessage applies the processor to a message, either creating >0
// resulting messages or a response to be sent back to the message source.
func (d *Dedupe) ProcessMessage(msg types.Message) ([]types.Message, types.Response) {
d.mCount.Incr(1)
extractedHash := false
hasher := d.hasherFunc()
spans := tracing.CreateChildSpans(TypeDedupe, msg)
defer func() {
for _, s := range spans {
s.Finish()
}
}()
key := d.key.Bytes(0, msg)
if len(key) > 0 {
hasher.Write(key)
extractedHash = true
} else {
for _, index := range d.conf.Dedupe.Parts {
// Attempt to add whole part to hash.
if partBytes := msg.Get(index).Get(); partBytes != nil {
if _, err := hasher.Write(msg.Get(index).Get()); err != nil {
d.mErrHash.Incr(1)
d.mErr.Incr(1)
d.mDropped.Incr(1)
d.log.Errorf("Hash error: %v\n", err)
} else {
extractedHash = true
}
}
}
}
if !extractedHash {
if d.conf.Dedupe.DropOnCacheErr {
d.mDropped.Incr(1)
return nil, response.NewAck()
}
} else {
var err error
if cerr := interop.AccessCache(context.Background(), d.mgr, d.cacheName, func(cache types.Cache) {
err = cache.Add(string(hasher.Bytes()), []byte{'t'})
}); cerr != nil {
err = cerr
}
if err != nil {
if err == types.ErrKeyAlreadyExists {
for _, s := range spans {
s.LogKV(
"event", "dropped",
"type", "deduplicated",
)
}
d.mDropped.Incr(1)
return nil, response.NewAck()
}
d.mErrCache.Incr(1)
d.mErr.Incr(1)
d.log.Errorf("Cache error: %v\n", err)
for _, s := range spans {
s.LogKV(
"event", "error",
"type", err.Error(),
)
}
if d.conf.Dedupe.DropOnCacheErr {
d.mDropped.Incr(1)
return nil, response.NewAck()
}
}
}
d.mBatchSent.Incr(1)
d.mSent.Incr(int64(msg.Len()))
msgs := [1]types.Message{msg}
return msgs[:], nil
}
// CloseAsync shuts down the processor and stops processing requests.
func (d *Dedupe) CloseAsync() {
}
// WaitForClose blocks until the processor has closed down.
func (d *Dedupe) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------