forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
broker.go
283 lines (237 loc) · 8.81 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package input
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/internal/interop"
"github.com/dafanshu/benthos/v3/lib/broker"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/message/batch"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"gopkg.in/yaml.v3"
)
//------------------------------------------------------------------------------
var (
// ErrBrokerNoInputs is returned when creating a broker with zero inputs.
ErrBrokerNoInputs = errors.New("attempting to create broker input type with no inputs")
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeBroker] = TypeSpec{
constructor: newBrokerHasBatchProcessor,
Summary: `
Allows you to combine multiple inputs into a single stream of data, where each input will be read in parallel.`,
Description: `
A broker type is configured with its own list of input configurations and a field to specify how many copies of the list of inputs should be created.
Adding more input types allows you to combine streams from multiple sources into one. For example, reading from both RabbitMQ and Kafka:
` + "```yaml" + `
input:
broker:
copies: 1
inputs:
- amqp_0_9:
url: amqp://guest:guest@localhost:5672/
consumer_tag: benthos-consumer
queue: benthos-queue
# Optional list of input specific processing steps
processors:
- bloblang: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
- kafka:
addresses:
- localhost:9092
client_id: benthos_kafka_input
consumer_group: benthos_consumer_group
topics: [ benthos_stream:0 ]
` + "```" + `
If the number of copies is greater than zero the list will be copied that number
of times. For example, if your inputs were of type foo and bar, with 'copies'
set to '2', you would end up with two 'foo' inputs and two 'bar' inputs.
### Batching
It's possible to configure a [batch policy](/docs/configuration/batching#batch-policy)
with a broker using the ` + "`batching`" + ` fields. When doing this the feeds
from all child inputs are combined. Some inputs do not support broker based
batching and specify this in their documentation.
### Processors
It is possible to configure [processors](/docs/components/processors/about) at
the broker level, where they will be applied to _all_ child inputs, as well as
on the individual child inputs. If you have processors at both the broker level
_and_ on child inputs then the broker processors will be applied _after_ the
child nodes processors.`,
Categories: []Category{
CategoryUtility,
},
FieldSpecs: docs.FieldSpecs{
docs.FieldAdvanced("copies", "Whatever is specified within `inputs` will be created this many times."),
docs.FieldCommon("inputs", "A list of inputs to create.").Array().HasType(docs.FieldTypeInput),
batch.FieldSpec(),
},
}
}
//------------------------------------------------------------------------------
// BrokerConfig contains configuration fields for the Broker input type.
type BrokerConfig struct {
Copies int `json:"copies" yaml:"copies"`
Inputs brokerInputList `json:"inputs" yaml:"inputs"`
Batching batch.PolicyConfig `json:"batching" yaml:"batching"`
}
// NewBrokerConfig creates a new BrokerConfig with default values.
func NewBrokerConfig() BrokerConfig {
return BrokerConfig{
Copies: 1,
Inputs: brokerInputList{},
Batching: batch.NewPolicyConfig(),
}
}
//------------------------------------------------------------------------------
type brokerInputList []Config
// UnmarshalJSON ensures that when parsing configs that are in a map or slice
// the default values are still applied.
func (b *brokerInputList) UnmarshalJSON(bytes []byte) error {
genericInputs := []interface{}{}
if err := json.Unmarshal(bytes, &genericInputs); err != nil {
return err
}
inputConfs, err := parseInputConfsWithDefaults(genericInputs)
if err != nil {
return err
}
*b = inputConfs
return nil
}
// UnmarshalYAML ensures that when parsing configs that are in a map or slice
// the default values are still applied.
func (b *brokerInputList) UnmarshalYAML(unmarshal func(interface{}) error) error {
genericInputs := []interface{}{}
if err := unmarshal(&genericInputs); err != nil {
return err
}
inputConfs, err := parseInputConfsWithDefaults(genericInputs)
if err != nil {
return err
}
*b = inputConfs
return nil
}
//------------------------------------------------------------------------------
// parseInputConfsWithDefaults takes a slice of generic input configs and
// returns a slice of input configs with default values in place of omitted
// values. This is necessary because when unmarshalling config files using
// structs you can pre-populate non-reference type struct fields with default
// values, but array objects will lose those defaults.
//
// In order to ensure that omitted values are set to default we initially parse
// the array as interface{} types and then individually apply the defaults by
// marshalling and unmarshalling. The correct way to do this would be to use
// json.RawMessage, but our config files can contain a range of different
// formats that we do not know at this stage, therefore we use the more hacky
// method as performance is not an issue at this stage.
func parseInputConfsWithDefaults(rawInputs []interface{}) ([]Config, error) {
inputConfs := []Config{}
// NOTE: Use yaml here as it supports more types than JSON
// (map[interface{}]interface{}).
for i, boxedConfig := range rawInputs {
newConfigs := make([]Config, 1)
label := broker.GetGenericType(boxedConfig)
// TODO: V4 Remove ditto
if i > 0 && strings.Index(label, "ditto") == 0 {
broker.RemoveGenericType(boxedConfig)
// Check if there is a ditto multiplier.
if len(label) > 5 && label[5] == '_' {
if label[6:] == "0" {
// This is a special case where we are expressing that
// we want to end up with zero duplicates.
newConfigs = nil
} else {
n, err := strconv.Atoi(label[6:])
if err != nil {
return nil, fmt.Errorf("failed to parse ditto multiplier: %v", err)
}
newConfigs = make([]Config, n)
}
} else {
newConfigs = make([]Config, 1)
}
broker.ComplementGenericConfig(boxedConfig, rawInputs[i-1])
}
for _, conf := range newConfigs {
rawBytes, err := yaml.Marshal(boxedConfig)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal(rawBytes, &conf); err != nil {
return nil, err
}
inputConfs = append(inputConfs, conf)
}
}
return inputConfs, nil
}
//------------------------------------------------------------------------------
// NewBroker creates a new Broker input type.
func NewBroker(
conf Config,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
pipelines ...types.PipelineConstructorFunc,
) (Type, error) {
return newBrokerHasBatchProcessor(false, conf, mgr, log, stats, pipelines...)
}
// Deprecated: This is a hack for until the batch processor is removed.
// TODO: V4 Remove this.
func newBrokerHasBatchProcessor(
hasBatchProc bool,
conf Config,
mgr types.Manager,
log log.Modular,
stats metrics.Type,
pipelines ...types.PipelineConstructorFunc,
) (Type, error) {
hasBatchProc, pipelines = appendProcessorsFromConfigBatchAware(hasBatchProc, conf, mgr, log, stats, pipelines...)
lInputs := len(conf.Broker.Inputs) * conf.Broker.Copies
if lInputs <= 0 {
return nil, ErrBrokerNoInputs
}
var err error
var b Type
if lInputs == 1 {
if b, err = newHasBatchProcessor(hasBatchProc, conf.Broker.Inputs[0], mgr, log, stats, pipelines...); err != nil {
return nil, err
}
} else {
inputs := make([]types.Producer, lInputs)
for j := 0; j < conf.Broker.Copies; j++ {
for i, iConf := range conf.Broker.Inputs {
iMgr, iLog, iStats := interop.LabelChild(fmt.Sprintf("broker.inputs.%v", i), mgr, log, stats)
iStats = metrics.Combine(stats, iStats)
inputs[len(conf.Broker.Inputs)*j+i], err = newHasBatchProcessor(
hasBatchProc, iConf, iMgr, iLog, iStats,
pipelines...,
)
if err != nil {
return nil, fmt.Errorf("failed to create input '%v' type '%v': %v", i, iConf.Type, err)
}
}
}
if b, err = broker.NewFanIn(inputs, stats); err != nil {
return nil, err
}
}
if conf.Broker.Batching.IsNoop() {
return b, nil
}
bMgr, bLog, bStats := interop.LabelChild("batching", mgr, log, stats)
policy, err := batch.NewPolicy(conf.Broker.Batching, bMgr, bLog, bStats)
if err != nil {
return nil, fmt.Errorf("failed to construct batch policy: %v", err)
}
return NewBatcher(policy, b, log, stats), nil
}
//------------------------------------------------------------------------------