-
Notifications
You must be signed in to change notification settings - Fork 7
/
consumer.go
613 lines (512 loc) · 19.6 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
package kafka
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"fmt"
"math"
"time"
"unsafe"
)
/*
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
}
*/
import "C"
// RebalanceCb provides a per-Subscribe*() rebalance event callback.
// The passed Event will be either AssignedPartitions or RevokedPartitions
type RebalanceCb func(*Consumer, Event) error
// Consumer implements a High-level Apache Kafka Consumer instance
type Consumer struct {
events chan Event
handle handle
eventsChanEnable bool
readerTermChan chan bool
rebalanceCb RebalanceCb
appReassigned bool
appRebalanceEnable bool // config setting
}
// Strings returns a human readable name for a Consumer instance
func (c *Consumer) String() string {
return c.handle.String()
}
// getHandle implements the Handle interface
func (c *Consumer) gethandle() *handle {
return &c.handle
}
// Subscribe to a single topic
// This replaces the current subscription
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error {
return c.SubscribeTopics([]string{topic}, rebalanceCb)
}
// SubscribeTopics subscribes to the provided list of topics.
// This replaces the current subscription.
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) {
ctopics := C.rd_kafka_topic_partition_list_new(C.int(len(topics)))
defer C.rd_kafka_topic_partition_list_destroy(ctopics)
for _, topic := range topics {
ctopic := C.CString(topic)
defer C.free(unsafe.Pointer(ctopic))
C.rd_kafka_topic_partition_list_add(ctopics, ctopic, C.RD_KAFKA_PARTITION_UA)
}
e := C.rd_kafka_subscribe(c.handle.rk, ctopics)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
c.rebalanceCb = rebalanceCb
c.handle.currAppRebalanceEnable = c.rebalanceCb != nil || c.appRebalanceEnable
return nil
}
// Unsubscribe from the current subscription, if any.
func (c *Consumer) Unsubscribe() (err error) {
C.rd_kafka_unsubscribe(c.handle.rk)
return nil
}
// Assign an atomic set of partitions to consume.
// This replaces the current assignment.
func (c *Consumer) Assign(partitions []TopicPartition) (err error) {
c.appReassigned = true
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
e := C.rd_kafka_assign(c.handle.rk, cparts)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
return nil
}
// Unassign the current set of partitions to consume.
func (c *Consumer) Unassign() (err error) {
c.appReassigned = true
e := C.rd_kafka_assign(c.handle.rk, nil)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
return nil
}
// commit offsets for specified offsets.
// If offsets is nil the currently assigned partitions' offsets are committed.
// This is a blocking call, caller will need to wrap in go-routine to
// get async or throw-away behaviour.
func (c *Consumer) commit(offsets []TopicPartition) (committedOffsets []TopicPartition, err error) {
var rkqu *C.rd_kafka_queue_t
rkqu = C.rd_kafka_queue_new(c.handle.rk)
defer C.rd_kafka_queue_destroy(rkqu)
var coffsets *C.rd_kafka_topic_partition_list_t
if offsets != nil {
coffsets = newCPartsFromTopicPartitions(offsets)
defer C.rd_kafka_topic_partition_list_destroy(coffsets)
}
cErr := C.rd_kafka_commit_queue(c.handle.rk, coffsets, rkqu, nil, nil)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
rkev := C.rd_kafka_queue_poll(rkqu, C.int(-1))
if rkev == nil {
// shouldn't happen
return nil, newError(C.RD_KAFKA_RESP_ERR__DESTROY)
}
defer C.rd_kafka_event_destroy(rkev)
if C.rd_kafka_event_type(rkev) != C.RD_KAFKA_EVENT_OFFSET_COMMIT {
panic(fmt.Sprintf("Expected OFFSET_COMMIT, got %s",
C.GoString(C.rd_kafka_event_name(rkev))))
}
cErr = C.rd_kafka_event_error(rkev)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
}
cRetoffsets := C.rd_kafka_event_topic_partition_list(rkev)
if cRetoffsets == nil {
// no offsets, no error
return nil, nil
}
committedOffsets = newTopicPartitionsFromCparts(cRetoffsets)
return committedOffsets, nil
}
// Commit offsets for currently assigned partitions
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) Commit() ([]TopicPartition, error) {
return c.commit(nil)
}
// CommitMessage commits offset based on the provided message.
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) {
if m.TopicPartition.Error != nil {
return nil, newErrorFromString(ErrInvalidArg, "Can't commit errored message")
}
offsets := []TopicPartition{m.TopicPartition}
offsets[0].Offset++
return c.commit(offsets)
}
// CommitOffsets commits the provided list of offsets
// This is a blocking call.
// Returns the committed offsets on success.
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error) {
return c.commit(offsets)
}
// StoreOffsets stores the provided list of offsets that will be committed
// to the offset store according to `auto.commit.interval.ms` or manual
// offset-less Commit().
//
// Returns the stored offsets on success. If at least one offset couldn't be stored,
// an error and a list of offsets is returned. Each offset can be checked for
// specific errors via its `.Error` member.
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error) {
coffsets := newCPartsFromTopicPartitions(offsets)
defer C.rd_kafka_topic_partition_list_destroy(coffsets)
cErr := C.rd_kafka_offsets_store(c.handle.rk, coffsets)
// coffsets might be annotated with an error
storedOffsets = newTopicPartitionsFromCparts(coffsets)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return storedOffsets, newError(cErr)
}
return storedOffsets, nil
}
// Seek seeks the given topic partitions using the offset from the TopicPartition.
//
// If timeoutMs is not 0 the call will wait this long for the
// seek to be performed. If the timeout is reached the internal state
// will be unknown and this function returns ErrTimedOut.
// If timeoutMs is 0 it will initiate the seek but return
// immediately without any error reporting (e.g., async).
//
// Seek() may only be used for partitions already being consumed
// (through Assign() or implicitly through a self-rebalanced Subscribe()).
// To set the starting offset it is preferred to use Assign() and provide
// a starting offset for each partition.
//
// Returns an error on failure or nil otherwise.
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
rkt := c.handle.getRkt(*partition.Topic)
cErr := C.rd_kafka_seek(rkt,
C.int32_t(partition.Partition),
C.int64_t(partition.Offset),
C.int(timeoutMs))
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cErr)
}
return nil
}
// Poll the consumer for messages or events.
//
// Will block for at most timeoutMs milliseconds
//
// The following callbacks may be triggered:
// Subscribe()'s rebalanceCb
//
// Returns nil on timeout, else an Event
func (c *Consumer) Poll(timeoutMs int) (event Event) {
ev, _ := c.handle.eventPoll(nil, timeoutMs, 1, nil)
return ev
}
// Events returns the Events channel (if enabled)
func (c *Consumer) Events() chan Event {
return c.events
}
// ReadMessage polls the consumer for a message.
//
// This is a conveniance API that wraps Poll() and only returns
// messages or errors. All other event types are discarded.
//
// The call will block for at most `timeout` waiting for
// a new message or error. `timeout` may be set to -1 for
// indefinite wait.
//
// Timeout is returned as (nil, err) where err is `kafka.(Error).Code == Kafka.ErrTimedOut`.
//
// Messages are returned as (msg, nil),
// while general errors are returned as (nil, err),
// and partition-specific errors are returned as (msg, err) where
// msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
//
// All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded.
//
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {
var absTimeout time.Time
var timeoutMs int
if timeout > 0 {
absTimeout = time.Now().Add(timeout)
timeoutMs = (int)(timeout.Seconds() * 1000.0)
} else {
timeoutMs = (int)(timeout)
}
for {
ev := c.Poll(timeoutMs)
switch e := ev.(type) {
case *Message:
if e.TopicPartition.Error != nil {
return e, e.TopicPartition.Error
}
return e, nil
case Error:
return nil, e
default:
// Ignore other event types
}
if timeout > 0 {
// Calculate remaining time
timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
}
if timeoutMs == 0 && ev == nil {
return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
}
}
}
// Close Consumer instance.
// The object is no longer usable after this call.
func (c *Consumer) Close() (err error) {
if c.eventsChanEnable {
// Wait for consumerReader() to terminate (by closing readerTermChan)
close(c.readerTermChan)
c.handle.waitTerminated(1)
close(c.events)
}
C.rd_kafka_queue_destroy(c.handle.rkq)
c.handle.rkq = nil
e := C.rd_kafka_consumer_close(c.handle.rk)
if e != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(e)
}
c.handle.cleanup()
C.rd_kafka_destroy(c.handle.rk)
return nil
}
// NewConsumer creates a new high-level Consumer instance.
//
// Supported special configuration properties:
// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
// If set to true the app must handle the AssignedPartitions and
// RevokedPartitions events and call Assign() and Unassign()
// respectively.
// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
// go.events.channel.size (int, 1000) - Events() channel size
//
// WARNING: Due to the buffering nature of channels (and queues in general) the
// use of the events channel risks receiving outdated events and
// messages. Minimizing go.events.channel.size reduces the risk
// and number of outdated events and messages but does not eliminate
// the factor completely. With a channel size of 1 at most one
// event or message may be outdated.
func NewConsumer(conf *ConfigMap) (*Consumer, error) {
err := versionCheck()
if err != nil {
return nil, err
}
// before we do anything with the configuration, create a copy such that
// the original is not mutated.
confCopy := conf.clone()
groupid, _ := confCopy.get("group.id", nil)
if groupid == nil {
// without a group.id the underlying cgrp subsystem in librdkafka wont get started
// and without it there is no way to consume assigned partitions.
// So for now require the group.id, this might change in the future.
return nil, newErrorFromString(ErrInvalidArg, "Required property group.id not set")
}
c := &Consumer{}
v, err := confCopy.extract("go.application.rebalance.enable", false)
if err != nil {
return nil, err
}
c.appRebalanceEnable = v.(bool)
v, err = confCopy.extract("go.events.channel.enable", false)
if err != nil {
return nil, err
}
c.eventsChanEnable = v.(bool)
v, err = confCopy.extract("go.events.channel.size", 1000)
if err != nil {
return nil, err
}
eventsChanSize := v.(int)
cConf, err := confCopy.convert()
if err != nil {
return nil, err
}
cErrstr := (*C.char)(C.malloc(C.size_t(256)))
defer C.free(unsafe.Pointer(cErrstr))
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS|C.RD_KAFKA_EVENT_ERROR|C.RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH)
c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
if c.handle.rk == nil {
return nil, newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
}
C.rd_kafka_poll_set_consumer(c.handle.rk)
c.handle.c = c
c.handle.setup()
c.handle.rkq = C.rd_kafka_queue_get_consumer(c.handle.rk)
if c.handle.rkq == nil {
// no cgrp (no group.id configured), revert to main queue.
c.handle.rkq = C.rd_kafka_queue_get_main(c.handle.rk)
}
if c.eventsChanEnable {
c.events = make(chan Event, eventsChanSize)
c.readerTermChan = make(chan bool)
/* Start rdkafka consumer queue reader -> events writer goroutine */
go consumerReader(c, c.readerTermChan)
}
return c, nil
}
// rebalance calls the application's rebalance callback, if any.
// Returns true if the underlying assignment was updated, else false.
func (c *Consumer) rebalance(ev Event) bool {
c.appReassigned = false
if c.rebalanceCb != nil {
c.rebalanceCb(c, ev)
}
return c.appReassigned
}
// consumerReader reads messages and events from the librdkafka consumer queue
// and posts them on the consumer channel.
// Runs until termChan closes
func consumerReader(c *Consumer, termChan chan bool) {
out:
for true {
select {
case _ = <-termChan:
break out
default:
_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
if term {
break out
}
}
}
c.handle.terminatedChan <- "consumerReader"
return
}
// GetMetadata queries broker for cluster and topic metadata.
// If topic is non-nil only information about that topic is returned, else if
// allTopics is false only information about locally used topics is returned,
// else information about all topics is returned.
// GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API.
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) {
return getMetadata(c, topic, allTopics, timeoutMs)
}
// QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
return queryWatermarkOffsets(c, topic, partition, timeoutMs)
}
// GetWatermarkOffsets returns the cached low and high offsets for the given topic
// and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets.
// The low offset is populated every statistics.interval.ms if that value is set.
// OffsetInvalid will be returned if there is no cached offset for either value.
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error) {
return getWatermarkOffsets(c, topic, partition)
}
// OffsetsForTimes looks up offsets by timestamp for the given partitions.
//
// The returned offset for each partition is the earliest offset whose
// timestamp is greater than or equal to the given timestamp in the
// corresponding partition.
//
// The timestamps to query are represented as `.Offset` in the `times`
// argument and the looked up offsets are represented as `.Offset` in the returned
// `offsets` list.
//
// The function will block for at most timeoutMs milliseconds.
//
// Duplicate Topic+Partitions are not supported.
// Per-partition errors may be returned in the `.Error` field.
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
return offsetsForTimes(c, times, timeoutMs)
}
// Subscription returns the current subscription as set by Subscribe()
func (c *Consumer) Subscription() (topics []string, err error) {
var cTopics *C.rd_kafka_topic_partition_list_t
cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
defer C.rd_kafka_topic_partition_list_destroy(cTopics)
topicCnt := int(cTopics.cnt)
topics = make([]string, topicCnt)
for i := 0; i < topicCnt; i++ {
crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
C.int(i))
topics[i] = C.GoString(crktpar.topic)
}
return topics, nil
}
// Assignment returns the current partition assignments
func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
var cParts *C.rd_kafka_topic_partition_list_t
cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cErr)
}
defer C.rd_kafka_topic_partition_list_destroy(cParts)
partitions = newTopicPartitionsFromCparts(cParts)
return partitions, nil
}
// Committed retrieves committed offsets for the given set of partitions
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_committed(c.handle.rk, cparts, C.int(timeoutMs))
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return nil, newError(cerr)
}
return newTopicPartitionsFromCparts(cparts), nil
}
// Pause consumption for the provided list of partitions
//
// Note that messages already enqueued on the consumer's Event channel
// (if `go.events.channel.enable` has been set) will NOT be purged by
// this call, set `go.events.channel.size` accordingly.
func (c *Consumer) Pause(partitions []TopicPartition) (err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_pause_partitions(c.handle.rk, cparts)
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cerr)
}
return nil
}
// Resume consumption for the provided list of partitions
func (c *Consumer) Resume(partitions []TopicPartition) (err error) {
cparts := newCPartsFromTopicPartitions(partitions)
defer C.rd_kafka_topic_partition_list_destroy(cparts)
cerr := C.rd_kafka_resume_partitions(c.handle.rk, cparts)
if cerr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
return newError(cerr)
}
return nil
}
// SetOAuthBearerToken sets the the data to be transmitted
// to a broker during SASL/OAUTHBEARER authentication. It will return nil
// on success, otherwise an error if:
// 1) the token data is invalid (meaning an expiration time in the past
// or either a token value or an extension key or value that does not meet
// the regular expression requirements as per
// https://tools.ietf.org/html/rfc7628#section-3.1);
// 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
// 3) SASL/OAUTHBEARER is supported but is not configured as the client's
// authentication mechanism.
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error {
return c.handle.setOAuthBearerToken(oauthBearerToken)
}
// SetOAuthBearerTokenFailure sets the error message describing why token
// retrieval/setting failed; it also schedules a new token refresh event for 10
// seconds later so the attempt may be retried. It will return nil on
// success, otherwise an error if:
// 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build;
// 2) SASL/OAUTHBEARER is supported but is not configured as the client's
// authentication mechanism.
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error {
return c.handle.setOAuthBearerTokenFailure(errstr)
}