forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bounds_check.go
154 lines (133 loc) · 4.6 KB
/
bounds_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package processor
import (
"errors"
"time"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/response"
"github.com/dafanshu/benthos/v3/lib/types"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeBoundsCheck] = TypeSpec{
constructor: NewBoundsCheck,
Categories: []Category{
CategoryUtility,
},
Summary: `
Removes messages (and batches) that do not fit within certain size boundaries.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("max_part_size", "The maximum size of a message to allow (in bytes)"),
docs.FieldCommon("min_part_size", "The minimum size of a message to allow (in bytes)"),
docs.FieldAdvanced("max_parts", "The maximum size of message batches to allow (in message count)"),
docs.FieldAdvanced("min_parts", "The minimum size of message batches to allow (in message count)"),
},
}
}
//------------------------------------------------------------------------------
// BoundsCheckConfig contains configuration fields for the BoundsCheck
// processor.
type BoundsCheckConfig struct {
MaxParts int `json:"max_parts" yaml:"max_parts"`
MinParts int `json:"min_parts" yaml:"min_parts"`
MaxPartSize int `json:"max_part_size" yaml:"max_part_size"`
MinPartSize int `json:"min_part_size" yaml:"min_part_size"`
}
// NewBoundsCheckConfig returns a BoundsCheckConfig with default values.
func NewBoundsCheckConfig() BoundsCheckConfig {
return BoundsCheckConfig{
MaxParts: 100,
MinParts: 1,
MaxPartSize: 1 * 1024 * 1024 * 1024, // 1GB
MinPartSize: 1,
}
}
//------------------------------------------------------------------------------
// BoundsCheck is a processor that checks each message against a set of bounds
// and rejects messages if they aren't within them.
type BoundsCheck struct {
conf Config
log log.Modular
stats metrics.Type
mCount metrics.StatCounter
mDropped metrics.StatCounter
mDroppedEmpty metrics.StatCounter
mDroppedNumParts metrics.StatCounter
mDroppedPartSize metrics.StatCounter
mSent metrics.StatCounter
mBatchSent metrics.StatCounter
}
// NewBoundsCheck returns a BoundsCheck processor.
func NewBoundsCheck(
conf Config, mgr types.Manager, log log.Modular, stats metrics.Type,
) (Type, error) {
return &BoundsCheck{
conf: conf,
log: log,
stats: stats,
mCount: stats.GetCounter("count"),
mDropped: stats.GetCounter("dropped"),
mDroppedEmpty: stats.GetCounter("dropped_empty"),
mDroppedNumParts: stats.GetCounter("dropped_num_parts"),
mDroppedPartSize: stats.GetCounter("dropped_part_size"),
mSent: stats.GetCounter("sent"),
mBatchSent: stats.GetCounter("batch.sent"),
}, nil
}
//------------------------------------------------------------------------------
// ProcessMessage applies the processor to a message, either creating >0
// resulting messages or a response to be sent back to the message source.
func (m *BoundsCheck) ProcessMessage(msg types.Message) ([]types.Message, types.Response) {
m.mCount.Incr(1)
lParts := msg.Len()
if lParts < m.conf.BoundsCheck.MinParts {
m.log.Debugf(
"Rejecting message due to message parts below minimum (%v): %v\n",
m.conf.BoundsCheck.MinParts, lParts,
)
m.mDropped.Incr(1)
m.mDroppedEmpty.Incr(1)
return nil, response.NewAck()
} else if lParts > m.conf.BoundsCheck.MaxParts {
m.log.Debugf(
"Rejecting message due to message parts exceeding limit (%v): %v\n",
m.conf.BoundsCheck.MaxParts, lParts,
)
m.mDropped.Incr(1)
m.mDroppedNumParts.Incr(1)
return nil, response.NewAck()
}
var reject bool
msg.Iter(func(i int, p types.Part) error {
if size := len(p.Get()); size > m.conf.BoundsCheck.MaxPartSize ||
size < m.conf.BoundsCheck.MinPartSize {
m.log.Debugf(
"Rejecting message due to message part size (%v -> %v): %v\n",
m.conf.BoundsCheck.MinPartSize,
m.conf.BoundsCheck.MaxPartSize,
size,
)
reject = true
return errors.New("exit")
}
return nil
})
if reject {
m.mDropped.Incr(1)
m.mDroppedPartSize.Incr(1)
return nil, response.NewAck()
}
m.mBatchSent.Incr(1)
m.mSent.Incr(int64(msg.Len()))
msgs := [1]types.Message{msg}
return msgs[:], nil
}
// CloseAsync shuts down the processor and stops processing requests.
func (m *BoundsCheck) CloseAsync() {
}
// WaitForClose blocks until the processor has closed down.
func (m *BoundsCheck) WaitForClose(timeout time.Duration) error {
return nil
}
//------------------------------------------------------------------------------