-
Notifications
You must be signed in to change notification settings - Fork 818
/
buffer.go
118 lines (105 loc) · 4.49 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package service
import (
"context"
"errors"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/buffer"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/internal/shutdown"
)
// BatchBuffer is an interface implemented by Buffers able to read and write
// message batches. Buffers are a component type that are placed after inputs,
// and decouples the acknowledgement system of the inputs from the rest of the
// pipeline.
//
// Buffers are useful when implementing buffers intended to relieve back
// pressure from upstream components, or when implementing message aggregators
// where the concept of discrete messages running through a pipeline no longer
// applies (such as with windowing algorithms).
//
// Buffers are advanced component types that weaken delivery guarantees of a
// Benthos pipeline. Therefore, if you aren't absolutely sure that a component
// you wish to build should be a buffer type then it likely shouldn't be.
type BatchBuffer interface {
// Write a batch of messages to the buffer, the batch is accompanied with an
// acknowledge function. A non-nil error should be returned if it is not
// possible to store the given message batch in the buffer.
//
// If a nil error is returned the buffer assumes responsibility for calling
// the acknowledge function at least once during the lifetime of the
// message.
//
// This could be at the point where the message is written to the buffer,
// which weakens delivery guarantees but can be useful for decoupling the
// input from downstream components. Alternatively, this could be when the
// associated batch has been read from the buffer and acknowledged
// downstream, which preserves delivery guarantees.
WriteBatch(context.Context, MessageBatch, AckFunc) error
// Read a batch of messages from the buffer. This call should block until
// either a batch is ready to consume, the provided context is cancelled or
// EndOfInput has been called which indicates that the buffer is no longer
// being populated with new messages.
//
// The returned acknowledge function will be called when a consumed message
// batch has been processed and sent downstream. It is up to the buffer
// implementation whether the ack function is used, it might be used in
// order to "commit" the removal of a message from the buffer in cases where
// the buffer is a persisted storage solution, or in cases where the output
// of the buffer is temporal (a windowing algorithm, etc) it might be
// considered correct to simply drop message batches that are not acked.
//
// When the buffer is closed (EndOfInput has been called and no more
// messages are available) this method should return an ErrEndOfBuffer in
// order to indicate the end of the buffered stream.
//
// It is valid to return a batch of only one message.
ReadBatch(context.Context) (MessageBatch, AckFunc, error)
// EndOfInput indicates to the buffer that the input has ended and that once
// the buffer is depleted it should return ErrEndOfBuffer from ReadBatch in
// order to gracefully shut down the pipeline.
//
// EndOfInput should be idempotent as it may be called more than once.
EndOfInput()
Closer
}
//------------------------------------------------------------------------------
// Implements buffer.ReaderWriter
type airGapBatchBuffer struct {
b BatchBuffer
sig *shutdown.Signaller
}
func newAirGapBatchBuffer(b BatchBuffer) buffer.ReaderWriter {
return &airGapBatchBuffer{b, shutdown.NewSignaller()}
}
func (a *airGapBatchBuffer) Write(ctx context.Context, msg *message.Batch, aFn buffer.AckFunc) error {
parts := make([]*Message, msg.Len())
_ = msg.Iter(func(i int, part *message.Part) error {
// Copy because we ack the message after returning, therefore we lose
// ownership of the underlying.
parts[i] = newMessageFromPart(part).Copy()
return nil
})
return a.b.WriteBatch(ctx, parts, AckFunc(aFn))
}
func (a *airGapBatchBuffer) Read(ctx context.Context) (*message.Batch, buffer.AckFunc, error) {
batch, ackFn, err := a.b.ReadBatch(ctx)
if err != nil {
if errors.Is(err, ErrEndOfBuffer) {
err = component.ErrTypeClosed
}
return nil, nil, err
}
tMsg := message.QuickBatch(nil)
for _, msg := range batch {
tMsg.Append(msg.part)
}
return tMsg, func(c context.Context, aerr error) error {
return ackFn(c, aerr)
}, nil
}
func (a *airGapBatchBuffer) EndOfInput() {
a.b.EndOfInput()
}
func (a *airGapBatchBuffer) Close(ctx context.Context) error {
return a.b.Close(ctx)
}