forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
package.go
141 lines (108 loc) · 6.07 KB
/
package.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright (c) 2018 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
/*
Package stream creates and manages a full Benthos stream pipeline, consisting
of an input layer of consumers, an optional buffer layer, a processing pipelines
layer, and an output layer of producers:
Inputs -> Buffer -> Processing Pipelines -> Outputs
The number of parallel input consumers, processing pipelines, and output
producers depends on the configuration of the stream.
Custom Stream Processors
It is possible to construct a stream with your own custom processor
implementations embedded within it. This results in your processors being
executed for each discrete message batch at the end of any other configured
processors.
Your custom processors will be constructed once per pipeline processing thread,
e.g. with four pipeline processing threads the pipeline would look like this:
Inputs -> Buffer -> Processing Pipeline -> Custom Processor -> Outputs
\ Processing Pipeline -> Custom Processor /
\ Processing Pipeline -> Custom Processor /
\ Processing Pipeline -> Custom Processor /
Message Batches
In Benthos every message is a batch, and it is the configuration of a stream
that determines the size of each batch (usually 1.) Therefore all processors,
including your custom implementations, support batches.
Sometimes your custom processors will require batches of a certain size in order
to function. It is recommended that you perform message batching using the
standard Benthos batch or combine processors, as it will ensure resiliency
throughout the stream pipeline. For example, you can add a batch processor to
your input layer:
conf := NewConfig()
conf.Input.Type = input.TypeKafka
conf.Input.Kafka.Addresses = []string{"localhost:9092"}
conf.Input.Kafka.Topic = "example_topic_one"
conf.Input.Processors = append(conf.Input.Processors, processor.NewConfig())
conf.Input.Processors[0].Type = processor.TypeBatch
conf.Input.Processors[0].Batch.ByteSize = 10000000 // 10MB
Horizontal Scaling
The standard set of processors of a Benthos stream are stateless and can
therefore be horizontally scaled without impacting the results. Horizontal
scaling therefore only depends on the sources of data of a stream.
Most message queues/protocols provide mechanisms to automatically distribute
messages horizontally across consumers. Horizontally scaling Benthos is
therefore as simple as applying those means.
Kafka, for example, allows you to distribute messages across partitions, which
can either be statically distributed across consumers or, using the
kafka_balanced input type, can be dynamically distributed across consumers.
Vertical Scaling
Vertically scaled message processing can be done in Benthos with parallel
processing pipelines, where the number of threads is configurable in the
pipeline second of a stream configuration. However, in order to saturate those
processing threads your configuration needs one of two things: multiple parallel
inputs or a memory buffer.
Adding a memory buffer is a simple way of scaling a single input consumer across
processing threads, but this removes the automatic delivery guarantees that
Benthos provides.
Instead, it is recommended that you create parallel input sources, the number of
which should at least match the number of processing threads. This retains the
delivery guarantees of your sources and sinks by keeping them tightly coupled
and is done by configuring a broker input type, for example, processing across
four threads with eight parallel consumers:
// Create a Kafka input with automatic partition balancing
inputConf := input.NewConfig()
inputConf.Type = input.TypeKafkaBalanced
inputConf.KafkaBalanced.Addresses = []string{"localhost:9092"}
inputConf.KafkaBalanced.Topics = []string{"example_topic_one"}
// Create a decompression processor (default gzip)
processorConf := processor.NewConfig()
processorConf.Type = processor.TypeDecompress
// Create a stream with eight parallel consumers and four processing threads
conf := NewConfig()
conf.Input.Type = input.TypeBroker
conf.Input.Broker.Inputs = append(conf.Input.Broker.Inputs, inputConf)
conf.Input.Broker.Copies = 8
conf.Pipeline.Processors = append(conf.Pipeline.Processors, processorConf)
conf.Pipeline.Threads = 4
Delivery Guarantees
A Benthos stream, without a buffer (the default), guarantees at-least-once
message delivery matching the source and sink protocols used. Meaning if you are
consuming a Kafka stream and producing to a Kafka stream then Benthos matches
the at-least-once delivery guarantees of Kafka.
If you configure a stream with a buffer then your delivery guarantees will
depend on the resiliency of the buffer method you have chosen.
Processor Idempotency
Benthos processors are usually stateless operations that are idempotent by their
nature, meaning duplicate messages travelling the pipeline do not impact the
result of the processor itself.
If your custom processors are stateful and exhibit side effects you will need to
implement your own tooling in order to guarantee exactly-once processing of
messages.
*/
package stream