/
reject.go
102 lines (87 loc) · 3.2 KB
/
reject.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package output
import (
"context"
"errors"
"fmt"
"time"
"github.com/Jeffail/benthos/v3/internal/bloblang/field"
"github.com/Jeffail/benthos/v3/internal/docs"
"github.com/Jeffail/benthos/v3/internal/interop"
"github.com/Jeffail/benthos/v3/lib/log"
"github.com/Jeffail/benthos/v3/lib/metrics"
"github.com/Jeffail/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeReject] = TypeSpec{
constructor: fromSimpleConstructor(func(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
f, err := newRejectWriter(mgr, string(conf.Reject))
if err != nil {
return nil, err
}
return NewAsyncWriter(TypeReject, 1, f, log, stats)
}),
Status: docs.StatusStable,
Summary: `
Rejects all messages, treating them as though the output destination failed to publish them.`,
Description: `
The routing of messages after this output depends on the type of input it came from. For inputs that support propagating nacks upstream such as AMQP or NATS the message will be nacked. However, for inputs that are sequential such as files or Kafka the messages will simply be reprocessed from scratch.
If you're still scratching your head as to when this output could be useful check out [the examples below](#examples).`,
Categories: []Category{
CategoryUtility,
},
Examples: []docs.AnnotatedExample{
{
Title: "Rejecting Failed Messages",
Summary: `
This input is particularly useful for routing messages that have failed during processing, where instead of routing them to some sort of dead letter queue we wish to push the error upstream. We can do this with a switch broker:`,
Config: `
output:
switch:
retry_until_success: false
cases:
- check: '!errored()'
output:
amqp_1:
url: amqps://guest:guest@localhost:5672/
target_address: queue:/the_foos
- output:
reject: "processing failed due to: ${! error() }"
`,
},
},
config: docs.FieldComponent().HasType(docs.FieldTypeString).HasDefault(""),
}
}
//------------------------------------------------------------------------------
// RejectConfig contains configuration fields for the file based output type.
type RejectConfig string
// NewRejectConfig creates a new RejectConfig with default values.
func NewRejectConfig() RejectConfig {
return RejectConfig("")
}
type rejectWriter struct {
errExpr *field.Expression
}
func newRejectWriter(mgr types.Manager, errorString string) (*rejectWriter, error) {
if errorString == "" {
return nil, errors.New("an error message must be provided in order to provide context for the rejection")
}
errExpr, err := interop.NewBloblangField(mgr, errorString)
if err != nil {
return nil, fmt.Errorf("failed to parse error expression: %w", err)
}
return &rejectWriter{errExpr}, nil
}
func (w *rejectWriter) ConnectWithContext(ctx context.Context) error {
return nil
}
func (w *rejectWriter) WriteWithContext(ctx context.Context, msg types.Message) error {
errStr := w.errExpr.String(0, msg)
return errors.New(errStr)
}
func (w *rejectWriter) CloseAsync() {
}
func (w *rejectWriter) WaitForClose(timeout time.Duration) error {
return nil
}