forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.go
318 lines (287 loc) · 9 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
package processor
import (
"github.com/dafanshu/benthos/v3/internal/tracing"
"github.com/dafanshu/benthos/v3/lib/message"
"github.com/dafanshu/benthos/v3/lib/response"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)
//------------------------------------------------------------------------------
// ExecuteAll attempts to execute a slice of processors to a message. Returns
// N resulting messages or a response. The response may indicate either a NoAck
// in the event of the message being buffered or an unrecoverable error.
func ExecuteAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response) {
resultMsgs := make([]types.Message, len(msgs))
copy(resultMsgs, msgs)
var resultRes types.Response
for i := 0; len(resultMsgs) > 0 && i < len(procs); i++ {
var nextResultMsgs []types.Message
for _, m := range resultMsgs {
var rMsgs []types.Message
if rMsgs, resultRes = procs[i].ProcessMessage(m); resultRes != nil && resultRes.Error() != nil {
// We immediately return if a processor hits an unrecoverable
// error on a message.
return nil, resultRes
}
nextResultMsgs = append(nextResultMsgs, rMsgs...)
}
resultMsgs = nextResultMsgs
}
if len(resultMsgs) == 0 {
if resultRes == nil {
resultRes = response.NewUnack()
}
return nil, resultRes
}
return resultMsgs, nil
}
// ExecuteTryAll attempts to execute a slice of processors to messages, if a
// message has failed a processing step it is prevented from being sent to
// subsequent processors. Returns N resulting messages or a response. The
// response may indicate either a NoAck in the event of the message being
// buffered or an unrecoverable error.
func ExecuteTryAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response) {
resultMsgs := make([]types.Message, len(msgs))
copy(resultMsgs, msgs)
var resultRes types.Response
for i := 0; len(resultMsgs) > 0 && i < len(procs); i++ {
var nextResultMsgs []types.Message
for _, m := range resultMsgs {
// Skip messages that failed a prior stage.
if HasFailed(m.Get(0)) {
nextResultMsgs = append(nextResultMsgs, m)
continue
}
var rMsgs []types.Message
if rMsgs, resultRes = procs[i].ProcessMessage(m); resultRes != nil && resultRes.Error() != nil {
// We immediately return if a processor hits an unrecoverable
// error on a message.
return nil, resultRes
}
nextResultMsgs = append(nextResultMsgs, rMsgs...)
}
resultMsgs = nextResultMsgs
}
if len(resultMsgs) == 0 {
if resultRes == nil {
resultRes = response.NewUnack()
}
return nil, resultRes
}
return resultMsgs, nil
}
type catchMessage struct {
batches []types.Message
caught bool
}
// ExecuteCatchAll attempts to execute a slice of processors to only messages
// that have failed a processing step. Returns N resulting messages or a
// response.
func ExecuteCatchAll(procs []types.Processor, msgs ...types.Message) ([]types.Message, types.Response) {
// Preserves the original order of messages before entering the catch block.
// Only processors that have failed a previous stage are "caught", and will
// remain caught until all catch processors are executed.
catchBatches := make([]catchMessage, len(msgs))
for i, m := range msgs {
catchBatches[i] = catchMessage{
batches: []types.Message{m},
caught: HasFailed(m.Get(0)),
}
}
var resultRes types.Response
for i := 0; i < len(procs); i++ {
for j := 0; j < len(catchBatches); j++ {
if !catchBatches[j].caught || len(catchBatches[j].batches) == 0 {
continue
}
var nextResultBatches []types.Message
for _, m := range catchBatches[j].batches {
var rMsgs []types.Message
if rMsgs, resultRes = procs[i].ProcessMessage(m); resultRes != nil && resultRes.Error() != nil {
// We immediately return if a processor hits an unrecoverable
// error on a message.
return nil, resultRes
}
nextResultBatches = append(nextResultBatches, rMsgs...)
}
catchBatches[j].batches = nextResultBatches
}
}
var resultBatches []types.Message
for _, b := range catchBatches {
resultBatches = append(resultBatches, b.batches...)
}
if len(resultBatches) == 0 {
if resultRes == nil {
resultRes = response.NewUnack()
}
return nil, resultRes
}
return resultBatches, nil
}
//------------------------------------------------------------------------------
// FailFlagKey is a metadata key used for flagging processor errors in Benthos.
// If a message part has any non-empty value for this metadata key then it will
// be interpretted as having failed a processor step somewhere in the pipeline.
var FailFlagKey = types.FailFlagKey
// FlagFail marks a message part as having failed at a processing step.
func FlagFail(part types.Part) {
part.Metadata().Set(FailFlagKey, "true")
}
// FlagErr marks a message part as having failed at a processing step with an
// error message. If the error is nil the message part remains unchanged.
func FlagErr(part types.Part, err error) {
if err != nil {
part.Metadata().Set(FailFlagKey, err.Error())
}
}
// GetFail returns an error string for a message part if it has failed, or an
// empty string if not.
func GetFail(part types.Part) string {
return part.Metadata().Get(FailFlagKey)
}
// HasFailed checks whether a message part has failed a processing step.
func HasFailed(part types.Part) bool {
return len(part.Metadata().Get(FailFlagKey)) > 0
}
// ClearFail removes any existing failure flags from a message part.
func ClearFail(part types.Part) {
part.Metadata().Delete(FailFlagKey)
}
//------------------------------------------------------------------------------
func iterateParts(
parts []int, msg types.Message,
iter func(int, types.Part) error,
) error {
exec := func(i int) error {
return iter(i, msg.Get(i))
}
if len(parts) == 0 {
for i := 0; i < msg.Len(); i++ {
if err := exec(i); err != nil {
return err
}
}
} else {
for _, i := range parts {
if err := exec(i); err != nil {
return err
}
}
}
return nil
}
// IteratePartsWithSpanV2 iterates the parts of a message according to a slice
// of indexes (if empty all parts are iterated) and calls a func for each part
// along with a tracing span for that part. If an error is returned the part is
// flagged as failed and the span has the error logged.
func IteratePartsWithSpanV2(
operationName string, parts []int, msg types.Message,
iter func(int, *tracing.Span, types.Part) error,
) {
exec := func(i int) {
part := msg.Get(i)
span := tracing.CreateChildSpan(operationName, part)
if err := iter(i, span, part); err != nil {
FlagErr(part, err)
span.SetTag("error", true)
span.LogKV(
"event", "error",
"type", err.Error(),
)
}
span.Finish()
}
if len(parts) == 0 {
for i := 0; i < msg.Len(); i++ {
exec(i)
}
} else {
for _, i := range parts {
exec(i)
}
}
}
// IteratePartsWithSpan iterates the parts of a message according to a slice of
// indexes (if empty all parts are iterated) and calls a func for each part
// along with a tracing span for that part. If an error is returned the part is
// flagged as failed and the span has the error logged.
//
// Deprecated: use IteratePartsWithSpanV2 instead.
func IteratePartsWithSpan(
operationName string, parts []int, msg types.Message,
iter func(int, opentracing.Span, types.Part) error,
) {
exec := func(i int) {
part := msg.Get(i)
span := opentracing.SpanFromContext(message.GetContext(part))
if span == nil {
span = opentracing.StartSpan(operationName)
} else {
span = opentracing.StartSpan(
operationName,
opentracing.ChildOf(span.Context()),
)
}
if err := iter(i, span, part); err != nil {
FlagErr(part, err)
span.SetTag("error", true)
span.LogFields(
olog.String("event", "error"),
olog.String("type", err.Error()),
)
}
span.Finish()
}
if len(parts) == 0 {
for i := 0; i < msg.Len(); i++ {
exec(i)
}
} else {
for _, i := range parts {
exec(i)
}
}
}
// Iterate the parts of a message, mutate them as required, and return either a
// boolean or an error. If the error is nil and the boolean is false then the
// message part is removed.
func iteratePartsFilterableWithSpan(
operationName string, parts []int, msg types.Message,
iter func(int, *tracing.Span, types.Part) (bool, error),
) {
newParts := make([]types.Part, 0, msg.Len())
exec := func(i int) bool {
part := msg.Get(i)
span := tracing.CreateChildSpan(operationName, part)
var keep bool
var err error
if keep, err = iter(i, span, part); err != nil {
FlagErr(part, err)
span.SetTag("error", true)
span.LogKV(
"event", "error",
"type", err.Error(),
)
keep = true
}
span.Finish()
return keep
}
if len(parts) == 0 {
for i := 0; i < msg.Len(); i++ {
if exec(i) {
newParts = append(newParts, msg.Get(i))
}
}
} else {
for _, i := range parts {
if exec(i) {
newParts = append(newParts, msg.Get(i))
}
}
}
msg.SetAll(newParts)
}
//------------------------------------------------------------------------------