forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.go
547 lines (485 loc) · 19 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
package input
import (
"context"
"crypto/tls"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/Jeffail/gabs/v2"
"github.com/Shopify/sarama"
"github.com/dafanshu/benthos/v3/internal/checkpoint"
"github.com/dafanshu/benthos/v3/internal/component/input"
"github.com/dafanshu/benthos/v3/internal/docs"
"github.com/dafanshu/benthos/v3/lib/input/reader"
"github.com/dafanshu/benthos/v3/lib/log"
"github.com/dafanshu/benthos/v3/lib/message"
"github.com/dafanshu/benthos/v3/lib/message/batch"
"github.com/dafanshu/benthos/v3/lib/metrics"
"github.com/dafanshu/benthos/v3/lib/types"
"github.com/dafanshu/benthos/v3/lib/util/kafka/sasl"
btls "github.com/dafanshu/benthos/v3/lib/util/tls"
)
//------------------------------------------------------------------------------
func init() {
Constructors[TypeKafka] = TypeSpec{
constructor: fromSimpleConstructor(NewKafka),
Summary: `
Connects to Kafka brokers and consumes one or more topics.`,
Description: `
Offsets are managed within Kafka under the specified consumer group, and partitions for each topic are automatically balanced across members of the consumer group.
The Kafka input allows parallel processing of messages from different topic partitions, but by default messages of the same topic partition are processed in lockstep in order to enforce ordered processing. This protection often means that batching messages at the output level can stall, in which case it can be tuned by increasing the field ` + "[`checkpoint_limit`](#checkpoint_limit)" + `, ideally to a value greater than the number of messages you expect to batch.
Alternatively, if you perform batching at the input level using the ` + "[`batching`](#batching)" + ` field it is done per-partition and therefore avoids stalling.
### Metadata
This input adds the following metadata fields to each message:
` + "``` text" + `
- kafka_key
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_unix
- All existing message headers (version 0.11+)
` + "```" + `
The field ` + "`kafka_lag`" + ` is the calculated difference between the high water mark offset of the partition at the time of ingestion and the current message offset.
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#metadata).
### Troubleshooting
If you're seeing issues writing to or reading from Kafka with this component then it's worth trying out the newer ` + "[`kafka_franz` input](/docs/components/inputs/kafka_franz)" + `.
- I'm seeing logs that report ` + "`Failed to connect to kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)`" + `, but the brokers are definitely reachable.
Unfortunately this error message will appear for a wide range of connection problems even when the broker endpoint can be reached. Double check your authentication configuration and also ensure that you have [enabled TLS](#tlsenabled) if applicable.`,
FieldSpecs: docs.FieldSpecs{
docs.FieldString(
"addresses", "A list of broker addresses to connect to. If an item of the list contains commas it will be expanded into multiple addresses.",
[]string{"localhost:9092"}, []string{"localhost:9041,localhost:9042"}, []string{"localhost:9041", "localhost:9042"},
).Array(),
docs.FieldString(
"topics",
"A list of topics to consume from. Multiple comma separated topics can be listed in a single element. Partitions are automatically distributed across consumers of a topic. Alternatively, it's possible to specify explicit partitions to consume from with a colon after the topic name, e.g. `foo:0` would consume the partition 0 of the topic foo. This syntax supports ranges, e.g. `foo:0-10` would consume partitions 0 through to 10 inclusive.",
[]string{"foo", "bar"},
[]string{"foo,bar"},
[]string{"foo:0", "bar:1", "bar:3"},
[]string{"foo:0,bar:1,bar:3"},
[]string{"foo:0-5"},
).AtVersion("3.33.0").Array(),
docs.FieldString("target_version", "The version of the Kafka protocol to use. This limits the capabilities used by the client and should ideally match the version of your brokers."),
btls.FieldSpec(),
sasl.FieldSpec(),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the connection. This field can be explicitly made empty in order to disable stored offsets for the consumed topic partitions."),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldCommon(
"checkpoint_limit", "The maximum number of messages of the same topic and partition that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level to work on individual partitions. Any given offset will not be committed unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.",
).AtVersion("3.33.0"),
docs.FieldAdvanced("commit_period", "The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown."),
docs.FieldAdvanced("max_processing_period", "A maximum estimate for the time taken to process a message, this is used for tuning consumer group synchronization."),
input.ExtractTracingSpanMappingDocs,
docs.FieldAdvanced("group", "Tuning parameters for consumer group synchronization.").WithChildren(
docs.FieldAdvanced("session_timeout", "A period after which a consumer of the group is kicked after no heartbeats."),
docs.FieldAdvanced("heartbeat_interval", "A period in which heartbeats should be sent out."),
docs.FieldAdvanced("rebalance_timeout", "A period after which rebalancing is abandoned if unresolved."),
),
docs.FieldAdvanced("fetch_buffer_cap", "The maximum number of unprocessed messages to fetch at a given time."),
func() docs.FieldSpec {
b := batch.FieldSpec()
b.IsAdvanced = true
return b
}(),
// TODO: Remove V4
docs.FieldDeprecated("max_batch_count"),
docs.FieldDeprecated("topic").OmitWhen(func(field, parent interface{}) (string, bool) {
return "field topic is deprecated and should be omitted when topics is used",
len(gabs.Wrap(parent).S("topics").Children()) > 0
}),
docs.FieldDeprecated("partition").OmitWhen(func(field, parent interface{}) (string, bool) {
return "field partition is deprecated and should be omitted when topics is used",
len(gabs.Wrap(parent).S("topics").Children()) > 0
}),
},
Categories: []Category{
CategoryServices,
},
}
}
//------------------------------------------------------------------------------
// NewKafka creates a new Kafka input type.
func NewKafka(conf Config, mgr types.Manager, log log.Modular, stats metrics.Type) (Type, error) {
if !conf.Kafka.IsDeprecated() || len(conf.Kafka.Topics) > 0 {
var rdr reader.Async
var err error
if rdr, err = newKafkaReader(conf.Kafka, mgr, log, stats); err != nil {
return nil, err
}
if conf.Kafka.ExtractTracingMap != "" {
if rdr, err = input.NewSpanReader(TypeKafka, conf.Kafka.ExtractTracingMap, rdr, mgr, log); err != nil {
return nil, err
}
}
return NewAsyncReader(TypeKafka, false, reader.NewAsyncPreserver(rdr), log, stats)
}
// TODO: V4 Remove this.
if conf.Kafka.MaxBatchCount > 1 {
log.Warnf("Field '%v.max_batch_count' is deprecated, use '%v.batching.count' instead.\n", conf.Type, conf.Type)
conf.Kafka.Batching.Count = conf.Kafka.MaxBatchCount
}
log.Warnln("The kafka input has been revamped, falling back to the deprecated version. In order to use the new version use the field `topics`.")
k, err := reader.NewKafka(conf.Kafka, mgr, log, stats)
if err != nil {
return nil, err
}
var kb reader.Type = k
if !conf.Kafka.Batching.IsNoop() {
if kb, err = reader.NewSyncBatcher(conf.Kafka.Batching, k, mgr, log, stats); err != nil {
return nil, err
}
}
return NewReader(TypeKafka, reader.NewPreserver(kb), log, stats)
}
//------------------------------------------------------------------------------
type asyncMessage struct {
msg types.Message
ackFn reader.AsyncAckFn
}
type offsetMarker interface {
MarkOffset(topic string, partition int32, offset int64, metadata string)
}
type kafkaReader struct {
version sarama.KafkaVersion
tlsConf *tls.Config
addresses []string
topicPartitions map[string][]int32
balancedTopics []string
commitPeriod time.Duration
sessionTimeout time.Duration
heartbeatInterval time.Duration
rebalanceTimeout time.Duration
maxProcPeriod time.Duration
// Connection resources
cMut sync.Mutex
consumerCloseFn context.CancelFunc
consumerDoneCtx context.Context
msgChan chan asyncMessage
session offsetMarker
mRebalanced metrics.StatCounter
conf reader.KafkaConfig
stats metrics.Type
log log.Modular
mgr types.Manager
closeOnce sync.Once
closedChan chan struct{}
}
var errCannotMixBalanced = errors.New("it is not currently possible to include balanced and explicit partition topics in the same kafka input")
func parsePartitions(expr string) ([]int32, error) {
rangeExpr := strings.Split(expr, "-")
if len(rangeExpr) > 2 {
return nil, fmt.Errorf("partition '%v' is invalid, only one range can be specified", expr)
}
if len(rangeExpr) == 1 {
partition, err := strconv.ParseInt(expr, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse partition number: %w", err)
}
return []int32{int32(partition)}, nil
}
start, err := strconv.ParseInt(rangeExpr[0], 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse start of range: %w", err)
}
end, err := strconv.ParseInt(rangeExpr[1], 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse end of range: %w", err)
}
var parts []int32
for i := start; i <= end; i++ {
parts = append(parts, int32(i))
}
return parts, nil
}
func newKafkaReader(
conf reader.KafkaConfig, mgr types.Manager, log log.Modular, stats metrics.Type,
) (*kafkaReader, error) {
if conf.Batching.IsNoop() {
conf.Batching.Count = 1
}
k := kafkaReader{
conf: conf,
stats: stats,
consumerCloseFn: nil,
log: log,
mgr: mgr,
mRebalanced: stats.GetCounter("rebalanced"),
closedChan: make(chan struct{}),
topicPartitions: map[string][]int32{},
}
if conf.TLS.Enabled {
var err error
if k.tlsConf, err = conf.TLS.Get(); err != nil {
return nil, err
}
}
for _, addr := range conf.Addresses {
for _, splitAddr := range strings.Split(addr, ",") {
if trimmed := strings.TrimSpace(splitAddr); len(trimmed) > 0 {
k.addresses = append(k.addresses, trimmed)
}
}
}
if len(conf.Topics) == 0 {
return nil, errors.New("must specify at least one topic in the topics field")
}
for _, t := range conf.Topics {
for _, splitTopics := range strings.Split(t, ",") {
if trimmed := strings.TrimSpace(splitTopics); len(trimmed) > 0 {
if withParts := strings.Split(trimmed, ":"); len(withParts) > 1 {
if len(k.balancedTopics) > 0 {
return nil, errCannotMixBalanced
}
if len(withParts) > 2 {
return nil, fmt.Errorf("topic '%v' is invalid, only one partition should be specified and the same topic can be listed multiple times, e.g. use `foo:0,foo:1` not `foo:0:1`", trimmed)
}
topic := strings.TrimSpace(withParts[0])
parts, err := parsePartitions(withParts[1])
if err != nil {
return nil, err
}
k.topicPartitions[topic] = append(k.topicPartitions[topic], parts...)
} else {
if len(k.topicPartitions) > 0 {
return nil, errCannotMixBalanced
}
k.balancedTopics = append(k.balancedTopics, trimmed)
}
}
}
}
if tout := conf.CommitPeriod; len(tout) > 0 {
var err error
if k.commitPeriod, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse commit period string: %v", err)
}
}
if tout := conf.Group.SessionTimeout; len(tout) > 0 {
var err error
if k.sessionTimeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse session timeout string: %v", err)
}
}
if tout := conf.Group.HeartbeatInterval; len(tout) > 0 {
var err error
if k.heartbeatInterval, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse heartbeat interval string: %v", err)
}
}
if tout := conf.Group.RebalanceTimeout; len(tout) > 0 {
var err error
if k.rebalanceTimeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse rebalance timeout string: %v", err)
}
}
if tout := conf.MaxProcessingPeriod; len(tout) > 0 {
var err error
if k.maxProcPeriod, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse max processing period string: %v", err)
}
}
if conf.ConsumerGroup == "" && len(k.balancedTopics) > 0 {
return nil, errors.New("a consumer group must be specified when consuming balanced topics")
}
var err error
if k.version, err = sarama.ParseKafkaVersion(conf.TargetVersion); err != nil {
return nil, err
}
return &k, nil
}
//------------------------------------------------------------------------------
func (k *kafkaReader) asyncCheckpointer(topic string, partition int32) func(context.Context, chan<- asyncMessage, types.Message, int64) bool {
cp := checkpoint.NewCapped(int64(k.conf.CheckpointLimit))
return func(ctx context.Context, c chan<- asyncMessage, msg types.Message, offset int64) bool {
if msg == nil {
return true
}
resolveFn, err := cp.Track(ctx, offset, int64(msg.Len()))
if err != nil {
if err != types.ErrTimeout {
k.log.Errorf("Failed to checkpoint offset: %v\n", err)
}
return false
}
select {
case c <- asyncMessage{
msg: msg,
ackFn: func(ctx context.Context, res types.Response) error {
maxOffset := resolveFn()
if maxOffset == nil {
return nil
}
k.cMut.Lock()
if k.session != nil {
k.log.Debugf("Marking offset for topic '%v' partition '%v'.\n", topic, partition)
k.session.MarkOffset(topic, partition, maxOffset.(int64), "")
} else {
k.log.Debugf("Unable to mark offset for topic '%v' partition '%v'.\n", topic, partition)
}
k.cMut.Unlock()
return nil
},
}:
case <-ctx.Done():
return false
}
return true
}
}
func (k *kafkaReader) syncCheckpointer(topic string, partition int32) func(context.Context, chan<- asyncMessage, types.Message, int64) bool {
ackedChan := make(chan error)
return func(ctx context.Context, c chan<- asyncMessage, msg types.Message, offset int64) bool {
if msg == nil {
return true
}
select {
case c <- asyncMessage{
msg: msg,
ackFn: func(ctx context.Context, res types.Response) error {
resErr := res.Error()
if resErr == nil {
k.cMut.Lock()
if k.session != nil {
k.log.Debugf("Marking offset for topic '%v' partition '%v'.\n", topic, partition)
k.session.MarkOffset(topic, partition, offset, "")
} else {
k.log.Debugf("Unable to mark offset for topic '%v' partition '%v'.\n", topic, partition)
}
k.cMut.Unlock()
}
select {
case ackedChan <- resErr:
case <-ctx.Done():
}
return nil
},
}:
select {
case resErr := <-ackedChan:
if resErr != nil {
k.log.Errorf("Received error from message batch: %v, shutting down consumer.\n", resErr)
return false
}
case <-ctx.Done():
return false
}
case <-ctx.Done():
return false
}
return true
}
}
func dataToPart(highestOffset int64, data *sarama.ConsumerMessage) types.Part {
part := message.NewPart(data.Value)
meta := part.Metadata()
for _, hdr := range data.Headers {
meta.Set(string(hdr.Key), string(hdr.Value))
}
lag := highestOffset - data.Offset - 1
if lag < 0 {
lag = 0
}
meta.Set("kafka_key", string(data.Key))
meta.Set("kafka_partition", strconv.Itoa(int(data.Partition)))
meta.Set("kafka_topic", data.Topic)
meta.Set("kafka_offset", strconv.Itoa(int(data.Offset)))
meta.Set("kafka_lag", strconv.FormatInt(lag, 10))
meta.Set("kafka_timestamp_unix", strconv.FormatInt(data.Timestamp.Unix(), 10))
return part
}
//------------------------------------------------------------------------------
func (k *kafkaReader) closeGroupAndConsumers() {
k.cMut.Lock()
consumerCloseFn := k.consumerCloseFn
consumerDoneCtx := k.consumerDoneCtx
k.cMut.Unlock()
if consumerCloseFn != nil {
k.log.Debugln("Waiting for topic consumers to close.")
consumerCloseFn()
<-consumerDoneCtx.Done()
k.log.Debugln("Topic consumers are closed.")
}
k.closeOnce.Do(func() {
close(k.closedChan)
})
}
//------------------------------------------------------------------------------
// ConnectWithContext establishes a kafkaReader connection.
func (k *kafkaReader) ConnectWithContext(ctx context.Context) error {
k.cMut.Lock()
defer k.cMut.Unlock()
if k.msgChan != nil {
return nil
}
config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = k.maxProcPeriod
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = k.commitPeriod
config.Consumer.Group.Session.Timeout = k.sessionTimeout
config.Consumer.Group.Heartbeat.Interval = k.heartbeatInterval
config.Consumer.Group.Rebalance.Timeout = k.rebalanceTimeout
config.ChannelBufferSize = k.conf.FetchBufferCap
if config.Net.ReadTimeout <= k.sessionTimeout {
config.Net.ReadTimeout = k.sessionTimeout * 2
}
if config.Net.ReadTimeout <= k.rebalanceTimeout {
config.Net.ReadTimeout = k.rebalanceTimeout * 2
}
config.Net.TLS.Enable = k.conf.TLS.Enabled
if k.conf.TLS.Enabled {
config.Net.TLS.Config = k.tlsConf
}
if k.conf.StartFromOldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
if err := k.conf.SASL.Apply(k.mgr, config); err != nil {
return err
}
if len(k.topicPartitions) > 0 {
return k.connectExplicitTopics(ctx, config)
}
return k.connectBalancedTopics(ctx, config)
}
// ReadWithContext attempts to read a message from a kafkaReader topic.
func (k *kafkaReader) ReadWithContext(ctx context.Context) (types.Message, reader.AsyncAckFn, error) {
k.cMut.Lock()
msgChan := k.msgChan
k.cMut.Unlock()
if msgChan == nil {
return nil, nil, types.ErrNotConnected
}
select {
case m, open := <-msgChan:
if !open {
return nil, nil, types.ErrNotConnected
}
return m.msg, m.ackFn, nil
case <-ctx.Done():
}
return nil, nil, types.ErrTimeout
}
// CloseAsync shuts down the kafkaReader input and stops processing requests.
func (k *kafkaReader) CloseAsync() {
go k.closeGroupAndConsumers()
}
// WaitForClose blocks until the kafkaReader input has closed down.
func (k *kafkaReader) WaitForClose(timeout time.Duration) error {
select {
case <-k.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
}
return nil
}
//------------------------------------------------------------------------------