-
Notifications
You must be signed in to change notification settings - Fork 786
/
message.go
632 lines (565 loc) · 21.9 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
package service
import (
"context"
"errors"
"github.com/benthosdev/benthos/v4/internal/bloblang/mapping"
"github.com/benthosdev/benthos/v4/internal/bloblang/query"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/internal/transaction"
"github.com/benthosdev/benthos/v4/internal/value"
"github.com/benthosdev/benthos/v4/public/bloblang"
)
// MessageHandlerFunc is a function signature defining a component that consumes
// Benthos messages. An error must be returned if the context is cancelled, or
// if the message could not be delivered or processed.
type MessageHandlerFunc func(context.Context, *Message) error
// MessageBatchHandlerFunc is a function signature defining a component that
// consumes Benthos message batches. An error must be returned if the context is
// cancelled, or if the messages could not be delivered or processed.
type MessageBatchHandlerFunc func(context.Context, MessageBatch) error
// Message represents a single discrete message passing through a Benthos
// pipeline. It is safe to mutate the message via Set methods, but the
// underlying byte data should not be edited directly.
type Message struct {
part *message.Part
onErr func(err error)
}
// MessageBatch describes a collection of one or more messages.
type MessageBatch []*Message
// Copy creates a new slice of the same messages, which can be modified without
// changing the contents of the original batch.
func (b MessageBatch) Copy() MessageBatch {
bCopy := make(MessageBatch, len(b))
for i, m := range b {
bCopy[i] = m.Copy()
}
return bCopy
}
// DeepCopy creates a new slice of the same messages, which can be modified
// without changing the contents of the original batch and are unchanged from
// deep mutations performed on the source message.
//
// This is required in situations where a component wishes to retain a copy of a
// message batch beyond the boundaries of a process or write command. This is
// specifically required for buffer implementations that operate by keeping a
// reference to the message.
func (b MessageBatch) DeepCopy() MessageBatch {
bCopy := make(MessageBatch, len(b))
for i, m := range b {
bCopy[i] = m.DeepCopy()
}
return bCopy
}
// WalkWithBatchedErrors walks a batch and executes a closure function for each
// message. If the provided closure returns an error then iteration of the batch
// is not stopped and instead a *BatchError is created and populated.
//
// The one exception to this behaviour is when an error is returned that is
// considered fatal such as ErrNotConnected, in which case iteration is
// terminated early and that error is returned immediately.
//
// This is a useful pattern for batched outputs that deliver messages
// individually.
func (b MessageBatch) WalkWithBatchedErrors(fn func(int, *Message) error) error {
if len(b) == 1 {
return fn(0, b[0])
}
var batchErr *BatchError
for i, m := range b {
tmpErr := fn(i, m)
if tmpErr != nil {
if errors.Is(tmpErr, ErrNotConnected) {
return tmpErr
}
if batchErr == nil {
batchErr = NewBatchError(b, tmpErr)
}
_ = batchErr.Failed(i, tmpErr)
}
}
if batchErr != nil {
return batchErr
}
return nil
}
// Index mutates the batch in situ such that each message in the batch retains
// knowledge of where in the batch it currently resides. An indexer is then
// returned which can be used as a way of re-acquiring the original order of a
// batch derived from this one even after filtering, duplication and reordering
// has been done by other components.
//
// This can be useful in situations where a batch of messages is going to be
// mutated outside of the control of this component (by processors, for example)
// in ways that may change the ordering or presence of messages in the resulting
// batch. Having an indexer that we created prior to this processing allows us
// to take the resulting batch and join the messages within to the messages we
// started with.
func (b MessageBatch) Index() *Indexer {
parts := make(message.Batch, len(b))
for i, m := range b {
parts[i] = m.part
}
var s *message.SortGroup
s, parts = message.NewSortGroup(parts)
for i, p := range parts {
b[i].part = p
}
return &Indexer{
wrapped: s,
sourceBatch: b.Copy(),
}
}
// Indexer encapsulates the ability to acquire the original index of a message
// from a derivative batch as it was when the indexer was created. This can be
// useful in situations where a batch is being dispatched to processors or
// outputs and a derivative batch needs to be associated with the origin.
type Indexer struct {
wrapped *message.SortGroup
sourceBatch MessageBatch
}
// IndexOf attempts to obtain the index of a message as it occurred within the
// origin batch known at the time the indexer was created. If the message is an
// orphan and does not originate from that batch then -1 is returned. It is
// possible that zero, one or more derivative messages yield any given index of
// the origin batch due to filtering and/or duplication enacted on the batch.
func (s *Indexer) IndexOf(m *Message) int {
return s.wrapped.GetIndex(m.part)
}
// NewMessage creates a new message with an initial raw bytes content. The
// initial content can be nil, which is recommended if you intend to set it with
// structured contents.
func NewMessage(content []byte) *Message {
return &Message{
part: message.NewPart(content),
}
}
// NewInternalMessage returns a message wrapped around an instantiation of the
// internal message package. This function is for internal use only and intended
// as a scaffold for internal components migrating to the new APIs.
func NewInternalMessage(imsg *message.Part) *Message {
return &Message{part: imsg}
}
// Copy creates a shallow copy of a message that is safe to mutate with Set
// methods without mutating the original. Both messages will share a context,
// and therefore a tracing ID, if one has been associated with them.
func (m *Message) Copy() *Message {
return &Message{
part: m.part.ShallowCopy(),
}
}
// DeepCopy creates a deep copy of a message and its contents that is safe to
// mutate with Set methods without mutating the original, and mutations on the
// inner (deep) contents of the source message will not mutate the copy.
//
// This is required in situations where a component wishes to retain a copy of a
// message beyond the boundaries of a process or write command. This is
// specifically required for buffer implementations that operate by keeping a
// reference to the message.
func (m *Message) DeepCopy() *Message {
return &Message{
part: m.part.DeepCopy(),
}
}
// Context returns a context associated with the message, or a background
// context in the absence of one.
func (m *Message) Context() context.Context {
return message.GetContext(m.part)
}
// WithContext returns a new message with a provided context associated with it.
func (m *Message) WithContext(ctx context.Context) *Message {
return &Message{
part: message.WithContext(ctx, m.part),
}
}
// AsBytes returns the underlying byte array contents of a message or, if the
// contents are a structured type, attempts to marshal the contents as a JSON
// document and returns either the byte array result or an error.
//
// It is NOT safe to mutate the contents of the returned slice.
func (m *Message) AsBytes() ([]byte, error) {
// TODO: Escalate errors in marshalling once we're able.
return m.part.AsBytes(), nil
}
// AsStructured returns the underlying structured contents of a message or, if
// the contents are a byte array, attempts to parse the bytes contents as a JSON
// document and returns either the structured result or an error.
//
// It is NOT safe to mutate the contents of the returned value if it is a
// reference type (slice or map). In order to safely mutate the structured
// contents of a message use AsStructuredMut.
func (m *Message) AsStructured() (any, error) {
return m.part.AsStructured()
}
// AsStructuredMut returns the underlying structured contents of a message or,
// if the contents are a byte array, attempts to parse the bytes contents as a
// JSON document and returns either the structured result or an error.
//
// It is safe to mutate the contents of the returned value even if it is a
// reference type (slice or map), as the structured contents will be lazily deep
// cloned if it is still owned by an upstream component.
func (m *Message) AsStructuredMut() (any, error) {
v, err := m.part.AsStructuredMut()
if err != nil {
return nil, err
}
return v, nil
}
// SetBytes sets the underlying contents of the message as a byte slice.
func (m *Message) SetBytes(b []byte) {
m.part.SetBytes(b)
}
// SetStructured sets the underlying contents of the message as a structured
// type. This structured value should be a scalar Go type, or either a
// map[string]interface{} or []interface{} containing the same types all the way
// through the hierarchy, this ensures that other processors are able to work
// with the contents and that they can be JSON marshalled when coerced into a
// byte array.
//
// The provided structure is considered read-only, which means subsequent
// processors will need to fully clone the structure in order to perform
// mutations on the data.
func (m *Message) SetStructured(i any) {
m.part.SetStructured(i)
}
// SetStructuredMut sets the underlying contents of the message as a structured
// type. This structured value should be a scalar Go type, or either a
// map[string]interface{} or []interface{} containing the same types all the way
// through the hierarchy, this ensures that other processors are able to work
// with the contents and that they can be JSON marshalled when coerced into a
// byte array.
//
// The provided structure is considered mutable, which means subsequent
// processors might mutate the structure without performing a deep copy.
func (m *Message) SetStructuredMut(i any) {
m.part.SetStructuredMut(i)
}
// SetError marks the message as having failed a processing step and adds the
// error to it as context. Messages marked with errors can be handled using a
// range of methods outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (m *Message) SetError(err error) {
if m.onErr != nil {
m.onErr(err)
}
m.part.ErrorSet(err)
}
// GetError returns an error associated with a message, or nil if there isn't
// one. Messages marked with errors can be handled using a range of methods
// outlined in https://www.benthos.dev/docs/configuration/error_handling.
func (m *Message) GetError() error {
return m.part.ErrorGet()
}
// MetaGet attempts to find a metadata key from the message and returns a string
// result and a boolean indicating whether it was found.
//
// Strong advice: Use MetaGetMut instead.
func (m *Message) MetaGet(key string) (string, bool) {
v, exists := m.part.MetaGetMut(key)
if !exists {
return "", false
}
return value.IToString(v), true
}
// MetaGetMut attempts to find a metadata key from the message and returns the
// value if found, and a boolean indicating whether it was found. The value
// returned is mutable, and so it is safe to modify even though it may be a
// reference type such as a slice or map.
func (m *Message) MetaGetMut(key string) (any, bool) {
v, exists := m.part.MetaGetMut(key)
if !exists {
return "", false
}
return v, true
}
// MetaSet sets the value of a metadata key. If the value is an empty string the
// metadata key is deleted.
//
// Strong advice: Use MetaSetMut instead.
func (m *Message) MetaSet(key, value string) {
if value == "" {
m.part.MetaDelete(key)
} else {
m.part.MetaSetMut(key, value)
}
}
// MetaSetMut sets the value of a metadata key to any value. The value provided
// is stored as mutable, and therefore if it is a reference type such as a slice
// or map then it could be modified by a downstream component.
func (m *Message) MetaSetMut(key string, value any) {
m.part.MetaSetMut(key, value)
}
// MetaDelete removes a key from the message metadata.
func (m *Message) MetaDelete(key string) {
m.part.MetaDelete(key)
}
// MetaWalk iterates each metadata key/value pair and executes a provided
// closure on each iteration. To stop iterating, return an error from the
// closure. An error returned by the closure will be returned by this function.
//
// Strong advice: Use MetaWalkMut instead.
func (m *Message) MetaWalk(fn func(string, string) error) error {
return m.part.MetaIterStr(fn)
}
// MetaWalkMut iterates each metadata key/value pair and executes a provided
// closure on each iteration. To stop iterating, return an error from the
// closure. An error returned by the closure will be returned by this function.
func (m *Message) MetaWalkMut(fn func(key string, value any) error) error {
return m.part.MetaIterMut(fn)
}
//------------------------------------------------------------------------------
// BloblangQuery executes a parsed Bloblang mapping on a message and returns a
// message back or an error if the mapping fails. If the mapping results in the
// root being deleted the returned message will be nil, which indicates it has
// been filtered.
func (m *Message) BloblangQuery(blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := message.Batch{m.part}
res, err := uw.MapPart(0, msg)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}
// BloblangQueryValue executes a parsed Bloblang mapping on a message and
// returns the raw value result, or an error if either the mapping fails.
// The error bloblang.ErrRootDeleted is returned if the root of the mapping
// value is deleted, this is in order to allow distinction between a real nil
// value and a deleted value.
func (m *Message) BloblangQueryValue(blobl *bloblang.Executor) (any, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := message.Batch{m.part}
res, err := uw.Exec(query.FunctionContext{
Maps: uw.Maps(),
Vars: map[string]any{},
Index: 0,
MsgBatch: msg,
})
if err != nil {
return nil, err
}
switch res.(type) {
case value.Delete:
return nil, bloblang.ErrRootDeleted
case value.Nothing:
return nil, nil
}
return res, nil
}
// BloblangMutate executes a parsed Bloblang mapping onto a message where the
// contents of the message are mutated directly rather than creating an entirely
// new object.
//
// Returns the same message back in a mutated form, or an error if the mapping
// fails. If the mapping results in the root being deleted the returned message
// will be nil, which indicates it has been filtered.
//
// Note that using a Mutate execution means certain functions within the
// Bloblang mapping will behave differently. In the root of the mapping the
// right-hand keywords `root` and `this` refer to the same mutable root of the
// output document.
func (m *Message) BloblangMutate(blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := message.Batch{m.part}
res, err := uw.MapOnto(m.part, 0, msg)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}
// BloblangMutateFrom executes a parsed Bloblang mapping onto a message where
// the reference material for the mapping comes from a provided message rather
// than the target message of the map. Contents of the target message are
// mutated directly rather than creating an entirely new object.
//
// Returns the same message back in a mutated form, or an error if the mapping
// fails. If the mapping results in the root being deleted the returned message
// will be nil, which indicates it has been filtered.
//
// Note that using a MutateFrom execution means certain functions within the
// Bloblang mapping will behave differently. In the root of the mapping the
// right-hand keyword `root` refers to the same mutable root of the output
// document, but the keyword `this` refers to the message being provided as an
// argument.
func (m *Message) BloblangMutateFrom(blobl *bloblang.Executor, from *Message) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := message.Batch{from.part}
res, err := uw.MapOnto(m.part, 0, msg)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}
// BloblangQuery executes a parsed Bloblang mapping on a message batch, from the
// perspective of a particular message index, and returns a message back or an
// error if the mapping fails. If the mapping results in the root being deleted
// the returned message will be nil, which indicates it has been filtered.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
func (b MessageBatch) BloblangQuery(index int, blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
res, err := uw.MapPart(index, msg)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}
// BloblangQueryValue executes a parsed Bloblang mapping on a message batch,
// from the perspective of a particular message index, and returns the raw value
// result or an error if the mapping fails. The error bloblang.ErrRootDeleted is
// returned if the root of the mapping value is deleted, this is in order to
// allow distinction between a real nil value and a deleted value.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
func (b MessageBatch) BloblangQueryValue(index int, blobl *bloblang.Executor) (any, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
res, err := uw.Exec(query.FunctionContext{
Maps: uw.Maps(),
Vars: map[string]any{},
Index: index,
MsgBatch: msg,
})
if err != nil {
return nil, err
}
switch res.(type) {
case value.Delete:
return nil, bloblang.ErrRootDeleted
case value.Nothing:
return nil, nil
}
return res, nil
}
// BloblangMutate executes a parsed Bloblang mapping onto a message within the
// batch, where the contents of the message are mutated directly rather than
// creating an entirely new object.
//
// Returns the same message back in a mutated form, or an error if the mapping
// fails. If the mapping results in the root being deleted the returned message
// will be nil, which indicates it has been filtered.
//
// This method allows mappings to perform windowed aggregations across message
// batches.
//
// Note that using overlay means certain functions within the Bloblang mapping
// will behave differently. In the root of the mapping the right-hand keywords
// `root` and `this` refer to the same mutable root of the output document.
func (b MessageBatch) BloblangMutate(index int, blobl *bloblang.Executor) (*Message, error) {
uw := blobl.XUnwrapper().(interface {
Unwrap() *mapping.Executor
}).Unwrap()
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
res, err := uw.MapOnto(b[index].part, index, msg)
if err != nil {
return nil, err
}
if res != nil {
return NewInternalMessage(res), nil
}
return nil, nil
}
// TryInterpolatedString resolves an interpolated string expression on a message
// batch, from the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
func (b MessageBatch) TryInterpolatedString(index int, i *InterpolatedString) (string, error) {
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
return i.expr.String(index, msg)
}
// TryInterpolatedBytes resolves an interpolated string expression on a message
// batch, from the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
func (b MessageBatch) TryInterpolatedBytes(index int, i *InterpolatedString) ([]byte, error) {
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
return i.expr.Bytes(index, msg)
}
// InterpolatedString resolves an interpolated string expression on a message
// batch, from the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
//
// Deprecated: Use TryInterpolatedString instead.
func (b MessageBatch) InterpolatedString(index int, i *InterpolatedString) string {
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
s, _ := i.expr.String(index, msg)
return s
}
// InterpolatedBytes resolves an interpolated string expression on a message
// batch, from the perspective of a particular message index.
//
// This method allows interpolation functions to perform windowed aggregations
// across message batches, and is a more powerful way to interpolate strings
// than the standard .String method.
//
// Deprecated: Use TryInterpolatedBytes instead.
func (b MessageBatch) InterpolatedBytes(index int, i *InterpolatedString) []byte {
msg := make(message.Batch, len(b))
for i, m := range b {
msg[i] = m.part
}
bRes, _ := i.expr.Bytes(index, msg)
return bRes
}
// AddSyncResponse attempts to add this batch of messages, in its exact current
// condition, to the synchronous response destined for the original source input
// of this data. Synchronous responses aren't supported by all inputs, and so
// it's possible that attempting to mark a batch as ready for a synchronous
// response will return an error.
func (b MessageBatch) AddSyncResponse() error {
parts := make([]*message.Part, len(b))
for i, m := range b {
parts[i] = m.part
}
return transaction.SetAsResponse(parts)
}