/
output_retry.go
347 lines (295 loc) · 9.82 KB
/
output_retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package pure
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/Jeffail/shutdown"
"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/interop"
"github.com/benthosdev/benthos/v4/internal/component/output"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/public/service"
)
const (
crboFieldMaxRetries = "max_retries"
crboFieldBackOff = "backoff"
crboFieldInitInterval = "initial_interval"
crboFieldMaxInterval = "max_interval"
crboFieldMaxElapsedTime = "max_elapsed_time"
)
func CommonRetryBackOffFields(
defaultMaxRetries int,
defaultInitInterval string,
defaultMaxInterval string,
defaultMaxElapsed string,
) []*service.ConfigField {
return []*service.ConfigField{
service.NewIntField(crboFieldMaxRetries).
Description("The maximum number of retries before giving up on the request. If set to zero there is no discrete limit.").
Default(defaultMaxRetries).
Advanced(),
service.NewObjectField(crboFieldBackOff,
service.NewDurationField(crboFieldInitInterval).
Description("The initial period to wait between retry attempts.").
Default(defaultInitInterval),
service.NewDurationField(crboFieldMaxInterval).
Description("The maximum period to wait between retry attempts.").
Default(defaultMaxInterval),
service.NewDurationField(crboFieldMaxElapsedTime).
Description("The maximum period to wait before retry attempts are abandoned. If zero then no limit is used.").
Default(defaultMaxElapsed),
).
Description("Control time intervals between retry attempts.").
Advanced(),
}
}
func fieldDurationOrEmptyStr(pConf *service.ParsedConfig, path ...string) (time.Duration, error) {
if dStr, err := pConf.FieldString(path...); err == nil && dStr == "" {
return 0, nil
}
return pConf.FieldDuration(path...)
}
func CommonRetryBackOffCtorFromParsed(pConf *service.ParsedConfig) (ctor func() backoff.BackOff, err error) {
var maxRetries int
if maxRetries, err = pConf.FieldInt(crboFieldMaxRetries); err != nil {
return
}
var initInterval, maxInterval, maxElapsed time.Duration
if pConf.Contains(crboFieldBackOff) {
bConf := pConf.Namespace(crboFieldBackOff)
if initInterval, err = fieldDurationOrEmptyStr(bConf, crboFieldInitInterval); err != nil {
return
}
if maxInterval, err = fieldDurationOrEmptyStr(bConf, crboFieldMaxInterval); err != nil {
return
}
if maxElapsed, err = fieldDurationOrEmptyStr(bConf, crboFieldMaxElapsedTime); err != nil {
return
}
}
return func() backoff.BackOff {
boff := backoff.NewExponentialBackOff()
boff.InitialInterval = initInterval
boff.MaxInterval = maxInterval
boff.MaxElapsedTime = maxElapsed
if maxRetries > 0 {
return backoff.WithMaxRetries(boff, uint64(maxRetries))
}
return boff
}, nil
}
//------------------------------------------------------------------------------
const roFieldOutput = "output"
func retryOutputSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Categories("Utility").
Stable().
Summary("Attempts to write messages to a child output and if the write fails for any reason the message is retried either until success or, if the retries or max elapsed time fields are non-zero, either is reached.").
Description(`
All messages in Benthos are always retried on an output error, but this would usually involve propagating the error back to the source of the message, whereby it would be reprocessed before reaching the output layer once again.
This output type is useful whenever we wish to avoid reprocessing a message on the event of a failed send. We might, for example, have a dedupe processor that we want to avoid reapplying to the same message more than once in the pipeline.
Rather than retrying the same output you may wish to retry the send using a different output target (a dead letter queue). In which case you should instead use the ` + "[`fallback`](/docs/components/outputs/fallback)" + ` output type.`).
Fields(CommonRetryBackOffFields(0, "500ms", "3s", "0s")...).
Fields(
service.NewOutputField(roFieldOutput).
Description("A child output."),
)
}
func init() {
err := service.RegisterBatchOutput(
"retry", retryOutputSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) {
maxInFlight = 1
var s output.Streamed
if s, err = retryOutputFromConfig(conf, interop.UnwrapManagement(mgr)); err != nil {
return
}
out = interop.NewUnwrapInternalOutput(s)
return
})
if err != nil {
panic(err)
}
}
//------------------------------------------------------------------------------
// RetryOutputIndefinitely returns a wrapped variant of the provided output
// where send errors downstream are automatically caught and retried rather than
// propagated upstream as nacks.
func RetryOutputIndefinitely(mgr bundle.NewManagement, wrapped output.Streamed) (output.Streamed, error) {
return newIndefiniteRetry(mgr, nil, wrapped)
}
func retryOutputFromConfig(conf *service.ParsedConfig, mgr bundle.NewManagement) (output.Streamed, error) {
pOut, err := conf.FieldOutput(dooFieldOutput)
if err != nil {
return nil, err
}
var boffCtor func() backoff.BackOff
if boffCtor, err = CommonRetryBackOffCtorFromParsed(conf); err != nil {
return nil, err
}
return newIndefiniteRetry(mgr, boffCtor, interop.UnwrapOwnedOutput(pOut))
}
func newIndefiniteRetry(mgr bundle.NewManagement, backoffCtor func() backoff.BackOff, wrapped output.Streamed) (*indefiniteRetry, error) {
if backoffCtor == nil {
backoffCtor = func() backoff.BackOff {
boff := backoff.NewExponentialBackOff()
boff.InitialInterval = time.Millisecond * 500
boff.MaxInterval = time.Second * 3
boff.MaxElapsedTime = 0
return boff
}
}
return &indefiniteRetry{
log: mgr.Logger(),
wrapped: wrapped,
backoffCtor: backoffCtor,
transactionsOut: make(chan message.Transaction),
shutSig: shutdown.NewSignaller(),
}, nil
}
// indefiniteRetry is an output type that continuously writes a message to a
// child output until the send is successful.
type indefiniteRetry struct {
wrapped output.Streamed
backoffCtor func() backoff.BackOff
log log.Modular
transactionsIn <-chan message.Transaction
transactionsOut chan message.Transaction
shutSig *shutdown.Signaller
}
func (r *indefiniteRetry) loop() {
wg := sync.WaitGroup{}
defer func() {
wg.Wait()
close(r.transactionsOut)
r.wrapped.TriggerCloseNow()
_ = r.wrapped.WaitForClose(context.Background())
r.shutSig.TriggerHasStopped()
}()
cnCtx, cnDone := r.shutSig.HardStopCtx(context.Background())
defer cnDone()
errInterruptChan := make(chan struct{})
var errLooped int64
for !r.shutSig.IsSoftStopSignalled() {
// Do not consume another message while pending messages are being
// reattempted.
for atomic.LoadInt64(&errLooped) > 0 {
select {
case <-errInterruptChan:
case <-time.After(time.Millisecond * 100):
// Just incase an interrupt doesn't arrive.
case <-r.shutSig.HardStopChan():
return
}
}
var tran message.Transaction
var open bool
select {
case tran, open = <-r.transactionsIn:
if !open {
return
}
case <-r.shutSig.HardStopChan():
return
}
rChan := make(chan error)
select {
case r.transactionsOut <- message.NewTransaction(tran.Payload.ShallowCopy(), rChan):
case <-r.shutSig.HardStopChan():
return
}
wg.Add(1)
go func(ts message.Transaction, resChan chan error) {
var backOff backoff.BackOff
var resOut error
var inErrLoop bool
defer func() {
wg.Done()
if inErrLoop {
atomic.AddInt64(&errLooped, -1)
// We're exiting our error loop, so (attempt to) interrupt the
// consumer.
select {
case errInterruptChan <- struct{}{}:
default:
}
}
}()
for !r.shutSig.IsHardStopSignalled() {
var res error
select {
case res = <-resChan:
case <-r.shutSig.HardStopChan():
return
}
if res != nil {
if !inErrLoop {
inErrLoop = true
atomic.AddInt64(&errLooped, 1)
}
if backOff == nil {
backOff = r.backoffCtor()
}
nextBackoff := backOff.NextBackOff()
if nextBackoff == backoff.Stop {
r.log.Error("Failed to send message: %v\n", res)
resOut = errors.New("message failed to reach a target destination")
break
}
r.log.Warn("Failed to send message: %v\n", res)
select {
case <-time.After(nextBackoff):
case <-r.shutSig.HardStopChan():
return
}
select {
case r.transactionsOut <- message.NewTransaction(ts.Payload.ShallowCopy(), resChan):
case <-r.shutSig.HardStopChan():
return
}
} else {
resOut = nil
break
}
}
if err := ts.Ack(cnCtx, resOut); err != nil && cnCtx.Err() != nil {
return
}
}(tran, rChan)
}
}
// Consume assigns a messages channel for the output to read.
func (r *indefiniteRetry) Consume(ts <-chan message.Transaction) error {
if r.transactionsIn != nil {
return component.ErrAlreadyStarted
}
if err := r.wrapped.Consume(r.transactionsOut); err != nil {
return err
}
r.transactionsIn = ts
go r.loop()
return nil
}
// Connected returns a boolean indicating whether this output is currently
// connected to its target.
func (r *indefiniteRetry) Connected() bool {
return r.wrapped.Connected()
}
// CloseAsync shuts down the Retry input and stops processing requests.
func (r *indefiniteRetry) TriggerCloseNow() {
r.shutSig.TriggerHardStop()
}
// WaitForClose blocks until the Retry input has closed down.
func (r *indefiniteRetry) WaitForClose(ctx context.Context) error {
select {
case <-r.shutSig.HasStoppedChan():
case <-ctx.Done():
return ctx.Err()
}
return nil
}