/
drop_on_error.go
256 lines (211 loc) · 6.49 KB
/
drop_on_error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package output
import (
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/Jeffail/benthos/v3/internal/component/output"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/shutdown"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/response"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeDropOnError] = TypeSpec{
constructor: fromSimpleConstructor(NewDropOnError),
Status: docs.StatusDeprecated,
Summary: `
Attempts to write messages to a child output and if the write fails for any
reason the message is dropped instead of being reattempted.`,
Description: `
## Alternatives
This output has been replaced with the more explicit and configurable ` + "[`drop_on`](/docs/components/outputs/drop_on)" + ` output.
This output can be combined with a child [` + "`retry`" + `](/docs/components/outputs/retry)
output in order to set an explicit number of retry attempts before dropping a
message.
For example, the following configuration attempts to send to a hypothetical
output type ` + "`foo`" + ` three times, but if all three attempts fail the
message is dropped entirely:
` + "```yaml" + `
output:
drop_on_error:
retry:
max_retries: 2
output:
type: foo
` + "```" + ``,
config: docs.FieldComponent().HasType(docs.FieldTypeObject),
}
}
//------------------------------------------------------------------------------
// DropOnErrorConfig contains configuration values for the DropOnError output
// type.
type DropOnErrorConfig struct {
*Config `yaml:",inline" json:",inline"`
}
// NewDropOnErrorConfig creates a new DropOnErrorConfig with default values.
func NewDropOnErrorConfig() DropOnErrorConfig {
return DropOnErrorConfig{
Config: nil,
}
}
//------------------------------------------------------------------------------
// MarshalJSON prints an empty object instead of nil.
func (d DropOnErrorConfig) MarshalJSON() ([]byte, error) {
if d.Config != nil {
return json.Marshal(d.Config)
}
return json.Marshal(struct{}{})
}
// MarshalYAML prints an empty object instead of nil.
func (d DropOnErrorConfig) MarshalYAML() (interface{}, error) {
if d.Config != nil {
return *d.Config, nil
}
return struct{}{}, nil
}
//------------------------------------------------------------------------------
// UnmarshalJSON ensures that when parsing child config it is initialised.
func (d *DropOnErrorConfig) UnmarshalJSON(bytes []byte) error {
if d.Config == nil {
nConf := NewConfig()
d.Config = &nConf
}
return json.Unmarshal(bytes, d.Config)
}
// UnmarshalYAML ensures that when parsing child config it is initialised.
func (d *DropOnErrorConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
if d.Config == nil {
nConf := NewConfig()
d.Config = &nConf
}
return unmarshal(d.Config)
}
//------------------------------------------------------------------------------
// DropOnError is an output type that continuously writes a message to a child output
// until the send is successful.
type DropOnError struct {
running int32
wrapped Type
stats metrics.Type
log log.Modular
transactionsIn <-chan types.Transaction
transactionsOut chan types.Transaction
closeChan chan struct{}
closedChan chan struct{}
}
// NewDropOnError creates a new DropOnError input type.
func NewDropOnError(
conf Config,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
) (Type, error) {
if conf.DropOnError.Config == nil {
return nil, errors.New("cannot create a drop_on_error output without a child")
}
wrapped, err := New(*conf.DropOnError.Config, mgr, log, stats)
if err != nil {
return nil, fmt.Errorf("failed to create output '%v': %v", conf.DropOnError.Config.Type, err)
}
return &DropOnError{
running: 1,
log: log,
stats: stats,
wrapped: wrapped,
transactionsOut: make(chan types.Transaction),
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}, nil
}
//------------------------------------------------------------------------------
func (d *DropOnError) loop() {
// Metrics paths
var (
mDropped = d.stats.GetCounter("drop_on_error.dropped")
mDroppedBatch = d.stats.GetCounter("drop_on_error.batch.dropped")
)
defer func() {
close(d.transactionsOut)
d.wrapped.CloseAsync()
_ = d.wrapped.WaitForClose(shutdown.MaximumShutdownWait())
close(d.closedChan)
}()
resChan := make(chan types.Response)
for atomic.LoadInt32(&d.running) == 1 {
var ts types.Transaction
var open bool
select {
case ts, open = <-d.transactionsIn:
if !open {
return
}
case <-d.closeChan:
return
}
select {
case d.transactionsOut <- types.NewTransaction(ts.Payload, resChan):
case <-d.closeChan:
return
}
var res types.Response
select {
case res = <-resChan:
case <-d.closeChan:
return
}
if res.Error() != nil {
mDropped.Incr(int64(ts.Payload.Len()))
mDroppedBatch.Incr(1)
d.log.Warnf("Message dropped due to: %v\n", res.Error())
}
select {
case ts.ResponseChan <- response.NewAck():
case <-d.closeChan:
return
}
}
}
// Consume assigns a messages channel for the output to read.
func (d *DropOnError) Consume(ts <-chan types.Transaction) error {
if d.transactionsIn != nil {
return types.ErrAlreadyStarted
}
if err := d.wrapped.Consume(d.transactionsOut); err != nil {
return err
}
d.transactionsIn = ts
go d.loop()
return nil
}
// Connected returns a boolean indicating whether this output is currently
// connected to its target.
func (d *DropOnError) Connected() bool {
return d.wrapped.Connected()
}
// MaxInFlight returns the maximum number of in flight messages permitted by the
// output. This value can be used to determine a sensible value for parent
// outputs, but should not be relied upon as part of dispatcher logic.
func (d *DropOnError) MaxInFlight() (int, bool) {
return output.GetMaxInFlight(d.wrapped)
}
// CloseAsync shuts down the DropOnError input and stops processing requests.
func (d *DropOnError) CloseAsync() {
if atomic.CompareAndSwapInt32(&d.running, 1, 0) {
close(d.closeChan)
}
}
// WaitForClose blocks until the DropOnError input has closed down.
func (d *DropOnError) WaitForClose(timeout time.Duration) error {
select {
case <-d.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
}
return nil
}
//------------------------------------------------------------------------------