forked from twmb/franz-go
/
client.go
4480 lines (4017 loc) · 136 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package kgo provides a pure Go efficient Kafka client for Kafka 0.8+ with
// support for transactions, regex topic consuming, the latest partition
// strategies, and more. This client supports all client related KIPs.
//
// This client aims to be simple to use while still interacting with Kafka in a
// near ideal way. For more overview of the entire client itself, please see
// the README on the project's Github page.
package kgo
import (
"context"
"crypto/tls"
"errors"
"fmt"
"hash/crc32"
"math/rand"
"net"
"reflect"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/sasl"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli) // record crc's use Castagnoli table; for consuming/producing
// Client issues requests and handles responses to a Kafka cluster.
type Client struct {
cfg cfg
opts []Opt
ctx context.Context
ctxCancel func()
rng func(func(*rand.Rand))
brokersMu sync.RWMutex
brokers []*broker // ordered by broker ID
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
anyBrokerOrd []int32 // shuffled brokers, for random ordering
anySeedIdx int32
stopBrokers bool // set to true on close to stop updateBrokers
// A sink and a source is created once per node ID and persists
// forever. We expect the list to be small.
//
// The mutex only exists to allow consumer session stopping to read
// sources to notify when starting a session; all writes happen in the
// metadata loop.
sinksAndSourcesMu sync.Mutex
sinksAndSources map[int32]sinkAndSource
reqFormatter *kmsg.RequestFormatter
connTimeouter connTimeouter
bufPool bufPool // for to brokers to share underlying reusable request buffers
prsPool prsPool // for sinks to reuse []promisedNumberedRecord
controllerIDMu sync.Mutex
controllerID int32
// The following two ensure that we only have one fetchBrokerMetadata
// at once. This avoids unnecessary broker metadata requests and
// metadata trampling.
fetchingBrokersMu sync.Mutex
fetchingBrokers *struct {
done chan struct{}
err error
}
producer producer
consumer consumer
compressor *compressor
decompressor *decompressor
coordinatorsMu sync.Mutex
coordinators map[coordinatorKey]*coordinatorLoad
updateMetadataCh chan string
updateMetadataNowCh chan string // like above, but with high priority
blockingMetadataFnCh chan func()
metawait metawait
metadone chan struct{}
mappedMetaMu sync.Mutex
mappedMeta map[string]mappedMetadataTopic
}
func (cl *Client) idempotent() bool { return !cl.cfg.disableIdempotency }
type sinkAndSource struct {
sink *sink
source *source
}
func (cl *Client) allSinksAndSources(fn func(sns sinkAndSource)) {
cl.sinksAndSourcesMu.Lock()
defer cl.sinksAndSourcesMu.Unlock()
for _, sns := range cl.sinksAndSources {
fn(sns)
}
}
type hostport struct {
host string
port int32
}
// ValidateOpts returns an error if the options are invalid.
func ValidateOpts(opts ...Opt) error {
_, _, _, err := validateCfg(opts...)
return err
}
func parseSeeds(addrs []string) ([]hostport, error) {
seeds := make([]hostport, 0, len(addrs))
for _, seedBroker := range addrs {
hp, err := parseBrokerAddr(seedBroker)
if err != nil {
return nil, err
}
seeds = append(seeds, hp)
}
return seeds, nil
}
// This function validates the configuration and returns a few things that we
// initialize while validating. The difference between this and NewClient
// initialization is all NewClient initialization is infallible.
func validateCfg(opts ...Opt) (cfg, []hostport, *compressor, error) {
cfg := defaultCfg()
for _, opt := range opts {
opt.apply(&cfg)
}
if err := cfg.validate(); err != nil {
return cfg, nil, nil, err
}
seeds, err := parseSeeds(cfg.seedBrokers)
if err != nil {
return cfg, nil, nil, err
}
compressor, err := newCompressor(cfg.compression...)
if err != nil {
return cfg, nil, nil, err
}
return cfg, seeds, compressor, nil
}
func namefn(fn any) string {
v := reflect.ValueOf(fn)
if v.Type().Kind() != reflect.Func {
return ""
}
name := runtime.FuncForPC(v.Pointer()).Name()
dot := strings.LastIndexByte(name, '.')
if dot >= 0 {
return name[dot+1:]
}
return name
}
// OptValue returns the value for the given configuration option. If the
// given option does not exist, this returns nil. This function takes either a
// raw Opt, or an Opt function name.
//
// If a configuration option has multiple inputs, this function returns only
// the first input. If the function is a boolean function (such as
// BlockRebalanceOnPoll), this function returns the value of the internal bool.
// Variadic option inputs are returned as a single slice. Options that are
// internally stored as a pointer (ClientID, TransactionalID, and InstanceID)
// are returned as their string input; you can see if the option is internally
// nil by looking at the second value returned from OptValues.
//
// var (
// cl, _ := NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// iid = cl.OptValue(InstanceID) // iid is "foo"
// gid = cl.OptValue(ConsumerGroup) // gid is "" since groups are not used
// topics = cl.OptValue("ConsumeTopics") // topics is []string{"foo", "bar"}; string lookup for the option works
// bpoll = cl.OptValue(BlockRebalanceOnPoll) // bpoll is false
// t = cl.OptValue(SessionTimeout) // t is 45s, the internal default
// td = t.(time.Duration) // safe conversion since SessionTimeout's input is a time.Duration
// unk = cl.OptValue("Unknown"), // unk is nil
// )
func (cl *Client) OptValue(opt any) any {
vs := cl.OptValues(opt)
if len(vs) > 0 {
return vs[0]
}
return nil
}
// OptValues returns all values for options. This method is useful for
// options that have multiple inputs (notably, SoftwareNameAndVersion). This is
// also useful for options that are internally stored as a pointer (ClientID,
// TransactionalID, and InstanceID) -- this function will return the string
// value of the option but also whether the option is non-nil. Boolean options
// are returned as a single-element slice with the bool value. Variadic inputs
// are returned as a signle slice. If the input option does not exist, this
// returns nil.
//
// var (
// cl, _ = NewClient(
// InstanceID("foo"),
// ConsumeTopics("foo", "bar"),
// )
// idValues = cl.OptValues(InstanceID) // idValues is []any{"foo", true}
// tValues = cl.OptValues(SessionTimeout) // tValues is []any{45 * time.Second}
// topics = cl.OptValues(ConsumeTopics) // topics is []any{[]string{"foo", "bar"}
// bpoll = cl.OptValues(BlockRebalanceOnPoll) // bpoll is []any{false}
// unknown = cl.OptValues("Unknown") // unknown is nil
// )
func (cl *Client) OptValues(opt any) []any {
name := namefn(opt)
if s, ok := opt.(string); ok {
name = s
}
cfg := &cl.cfg
switch name {
case namefn(ClientID):
if cfg.id != nil {
return []any{*cfg.id, true}
}
return []any{"", false}
case namefn(SoftwareNameAndVersion):
return []any{cfg.softwareName, cfg.softwareVersion}
case namefn(WithLogger):
if cfg.logger != nil {
return []any{cfg.logger.(*wrappedLogger).inner}
}
return []any{nil}
case namefn(RequestTimeoutOverhead):
return []any{cfg.requestTimeoutOverhead}
case namefn(ConnIdleTimeout):
return []any{cfg.connIdleTimeout}
case namefn(Dialer):
return []any{cfg.dialFn}
case namefn(DialTLSConfig):
return []any{cfg.dialTLS}
case namefn(DialTLS):
return []any{cfg.dialTLS != nil}
case namefn(SeedBrokers):
return []any{cfg.seedBrokers}
case namefn(MaxVersions):
return []any{cfg.maxVersions}
case namefn(MinVersions):
return []any{cfg.minVersions}
case namefn(RetryBackoffFn):
return []any{cfg.retryBackoff}
case namefn(RequestRetries):
return []any{cfg.retries}
case namefn(RetryTimeout):
return []any{cfg.retryTimeout(0)}
case namefn(RetryTimeoutFn):
return []any{cfg.retryTimeout}
case namefn(AllowAutoTopicCreation):
return []any{cfg.allowAutoTopicCreation}
case namefn(BrokerMaxWriteBytes):
return []any{cfg.maxBrokerWriteBytes}
case namefn(BrokerMaxReadBytes):
return []any{cfg.maxBrokerReadBytes}
case namefn(MetadataMaxAge):
return []any{cfg.metadataMaxAge}
case namefn(MetadataMinAge):
return []any{cfg.metadataMinAge}
case namefn(SASL):
return []any{cfg.sasls}
case namefn(WithHooks):
return []any{cfg.hooks}
case namefn(ConcurrentTransactionsBackoff):
return []any{cfg.txnBackoff}
case namefn(ConsiderMissingTopicDeletedAfter):
return []any{cfg.missingTopicDelete}
case namefn(DefaultProduceTopic):
return []any{cfg.defaultProduceTopic}
case namefn(RequiredAcks):
return []any{cfg.acks}
case namefn(DisableIdempotentWrite):
return []any{cfg.disableIdempotency}
case namefn(MaxProduceRequestsInflightPerBroker):
return []any{cfg.maxProduceInflight}
case namefn(ProducerBatchCompression):
return []any{cfg.compression}
case namefn(ProducerBatchMaxBytes):
return []any{cfg.maxRecordBatchBytes}
case namefn(MaxBufferedRecords):
return []any{cfg.maxBufferedRecords}
case namefn(MaxBufferedBytes):
return []any{cfg.maxBufferedBytes}
case namefn(RecordPartitioner):
return []any{cfg.partitioner}
case namefn(ProduceRequestTimeout):
return []any{cfg.produceTimeout}
case namefn(RecordRetries):
return []any{cfg.recordRetries}
case namefn(UnknownTopicRetries):
return []any{cfg.maxUnknownFailures}
case namefn(StopProducerOnDataLossDetected):
return []any{cfg.stopOnDataLoss}
case namefn(ProducerOnDataLossDetected):
return []any{cfg.onDataLoss}
case namefn(ProducerLinger):
return []any{cfg.linger}
case namefn(ManualFlushing):
return []any{cfg.manualFlushing}
case namefn(RecordDeliveryTimeout):
return []any{cfg.recordTimeout}
case namefn(TransactionalID):
if cfg.txnID != nil {
return []any{cfg.txnID, true}
}
return []any{"", false}
case namefn(TransactionTimeout):
return []any{cfg.txnTimeout}
case namefn(ConsumePartitions):
return []any{cfg.partitions}
case namefn(ConsumePreferringLagFn):
return []any{cfg.preferLagFn}
case namefn(ConsumeRegex):
return []any{cfg.regex}
case namefn(ConsumeResetOffset):
return []any{cfg.resetOffset}
case namefn(ConsumeTopics):
return []any{cfg.topics}
case namefn(DisableFetchSessions):
return []any{cfg.disableFetchSessions}
case namefn(FetchIsolationLevel):
return []any{cfg.isolationLevel}
case namefn(FetchMaxBytes):
return []any{int32(cfg.maxBytes)}
case namefn(FetchMaxPartitionBytes):
return []any{int32(cfg.maxPartBytes)}
case namefn(FetchMaxWait):
return []any{time.Duration(cfg.maxWait) * time.Millisecond}
case namefn(FetchMinBytes):
return []any{cfg.minBytes}
case namefn(KeepControlRecords):
return []any{cfg.keepControl}
case namefn(MaxConcurrentFetches):
return []any{cfg.maxConcurrentFetches}
case namefn(Rack):
return []any{cfg.rack}
case namefn(KeepRetryableFetchErrors):
return []any{cfg.keepRetryableFetchErrors}
case namefn(AdjustFetchOffsetsFn):
return []any{cfg.adjustOffsetsBeforeAssign}
case namefn(AutoCommitCallback):
return []any{cfg.commitCallback}
case namefn(AutoCommitInterval):
return []any{cfg.autocommitInterval}
case namefn(AutoCommitMarks):
return []any{cfg.autocommitMarks}
case namefn(Balancers):
return []any{cfg.balancers}
case namefn(BlockRebalanceOnPoll):
return []any{cfg.blockRebalanceOnPoll}
case namefn(ConsumerGroup):
return []any{cfg.group}
case namefn(DisableAutoCommit):
return []any{cfg.autocommitDisable}
case namefn(GreedyAutoCommit):
return []any{cfg.autocommitGreedy}
case namefn(GroupProtocol):
return []any{cfg.protocol}
case namefn(HeartbeatInterval):
return []any{cfg.heartbeatInterval}
case namefn(InstanceID):
if cfg.instanceID != nil {
return []any{*cfg.instanceID, true}
}
return []any{"", false}
case namefn(OnOffsetsFetched):
return []any{cfg.onFetched}
case namefn(OnPartitionsAssigned):
return []any{cfg.onAssigned}
case namefn(OnPartitionsLost):
return []any{cfg.onLost}
case namefn(OnPartitionsRevoked):
return []any{cfg.onRevoked}
case namefn(RebalanceTimeout):
return []any{cfg.rebalanceTimeout}
case namefn(RequireStableFetchOffsets):
return []any{cfg.requireStable}
case namefn(SessionTimeout):
return []any{cfg.sessionTimeout}
default:
return nil
}
}
// NewClient returns a new Kafka client with the given options or an error if
// the options are invalid. Connections to brokers are lazily created only when
// requests are written to them.
//
// By default, the client uses the latest stable request versions when talking
// to Kafka. If you use a broker older than 0.10.0, then you need to manually
// set a MaxVersions option. Otherwise, there is usually no harm in defaulting
// to the latest API versions, although occasionally Kafka introduces new
// required parameters that do not have zero value defaults.
//
// NewClient also launches a goroutine which periodically updates the cached
// topic metadata.
func NewClient(opts ...Opt) (*Client, error) {
cfg, seeds, compressor, err := validateCfg(opts...)
if err != nil {
return nil, err
}
if cfg.retryTimeout == nil {
cfg.retryTimeout = func(key int16) time.Duration {
switch key {
case ((*kmsg.JoinGroupRequest)(nil)).Key(),
((*kmsg.SyncGroupRequest)(nil)).Key(),
((*kmsg.HeartbeatRequest)(nil)).Key():
return cfg.sessionTimeout
}
return 30 * time.Second
}
}
if cfg.dialFn == nil {
dialer := &net.Dialer{Timeout: cfg.dialTimeout}
cfg.dialFn = dialer.DialContext
if cfg.dialTLS != nil {
cfg.dialFn = func(ctx context.Context, network, host string) (net.Conn, error) {
c := cfg.dialTLS.Clone()
if c.ServerName == "" {
server, _, err := net.SplitHostPort(host)
if err != nil {
return nil, fmt.Errorf("unable to split host:port for dialing: %w", err)
}
c.ServerName = server
}
return (&tls.Dialer{
NetDialer: dialer,
Config: c,
}).DialContext(ctx, network, host)
}
}
}
ctx, cancel := context.WithCancel(context.Background())
cl := &Client{
cfg: cfg,
opts: opts,
ctx: ctx,
ctxCancel: cancel,
rng: func() func(func(*rand.Rand)) {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(fn func(*rand.Rand)) {
mu.Lock()
defer mu.Unlock()
fn(rng)
}
}(),
controllerID: unknownControllerID,
sinksAndSources: make(map[int32]sinkAndSource),
reqFormatter: kmsg.NewRequestFormatter(),
connTimeouter: connTimeouter{def: cfg.requestTimeoutOverhead},
bufPool: newBufPool(),
prsPool: newPrsPool(),
compressor: compressor,
decompressor: newDecompressor(),
coordinators: make(map[coordinatorKey]*coordinatorLoad),
updateMetadataCh: make(chan string, 1),
updateMetadataNowCh: make(chan string, 1),
blockingMetadataFnCh: make(chan func()),
metadone: make(chan struct{}),
}
// Before we start any goroutines below, we must notify any interested
// hooks of our existence.
cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookNewClient); ok {
h.OnNewClient(cl)
}
})
cl.producer.init(cl)
cl.consumer.init(cl)
cl.metawait.init()
if cfg.id != nil {
cl.reqFormatter = kmsg.NewRequestFormatter(kmsg.FormatterClientID(*cfg.id))
}
seedBrokers := make([]*broker, 0, len(seeds))
for i, seed := range seeds {
b := cl.newBroker(unknownSeedID(i), seed.host, seed.port, nil)
seedBrokers = append(seedBrokers, b)
}
cl.seeds.Store(seedBrokers)
go cl.updateMetadataLoop()
go cl.reapConnectionsLoop()
return cl, nil
}
// Opts returns the options that were used to create this client. This can be
// as a base to generate a new client, where you can add override options to
// the end of the original input list. If you want to know a specific option
// value, you can use ConfigValue or ConfigValues.
func (cl *Client) Opts() []Opt {
return cl.opts
}
func (cl *Client) loadSeeds() []*broker {
return cl.seeds.Load().([]*broker)
}
// Ping returns whether any broker is reachable, iterating over any discovered
// broker or seed broker until one returns a successful response to an
// ApiVersions request. No discovered broker nor seed broker is attempted more
// than once. If all requests fail, this returns final error.
func (cl *Client) Ping(ctx context.Context) error {
req := kmsg.NewPtrApiVersionsRequest()
req.ClientSoftwareName = cl.cfg.softwareName
req.ClientSoftwareVersion = cl.cfg.softwareVersion
cl.brokersMu.RLock()
brokers := append([]*broker(nil), cl.brokers...)
cl.brokersMu.RUnlock()
var lastErr error
for _, brs := range [2][]*broker{
brokers,
cl.loadSeeds(),
} {
for _, br := range brs {
_, err := br.waitResp(ctx, req)
if lastErr = err; lastErr == nil {
return nil
}
}
}
return lastErr
}
// PurgeTopicsFromClient internally removes all internal information about the
// input topics. If you you want to purge information for only consuming or
// only producing, see the related functions [PurgeTopicsFromConsuming] and
// [PurgeTopicsFromProducing].
//
// For producing, this clears all knowledge that these topics have ever been
// produced to. Producing to the topic again may result in out of order
// sequence number errors, or, if idempotency is disabled and the sequence
// numbers align, may result in invisibly discarded records at the broker.
// Purging a topic that was previously produced to may be useful to free up
// resources if you are producing to many disparate and short lived topic in
// the lifetime of this client and you do not plan to produce to the topic
// anymore. You may want to flush buffered records before purging if records
// for a topic you are purging are currently in flight.
//
// For consuming, this removes all concept of the topic from being consumed.
// This is different from PauseFetchTopics, which literally pauses the fetching
// of topics but keeps the topic information around for resuming fetching
// later. Purging a topic that was being consumed can be useful if you know the
// topic no longer exists, or if you are consuming via regex and know that some
// previously consumed topics no longer exist, or if you simply do not want to
// ever consume from a topic again. If you are group consuming, this function
// will likely cause a rebalance.
//
// For admin requests, this deletes the topic from the cached metadata map for
// sharded requests. Metadata for sharded admin requests is only cached for
// MetadataMinAge anyway, but the map is not cleaned up one the metadata
// expires. This function ensures the map is purged.
func (cl *Client) PurgeTopicsFromClient(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics) // for logging in the functions
cl.blockingMetadataFn(func() { // make reasoning about concurrency easier
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
cl.producer.purgeTopics(topics)
}()
go func() {
defer wg.Done()
cl.consumer.purgeTopics(topics)
}()
wg.Wait()
})
cl.mappedMetaMu.Lock()
for _, t := range topics {
delete(cl.mappedMeta, t)
}
cl.mappedMetaMu.Unlock()
}
// PurgeTopicsFromProducing internally removes all internal information for
// producing about the input topics. This runs the producer bit of logic that
// is documented in [PurgeTopicsFromClient]; see that function for more
// details.
func (cl *Client) PurgeTopicsFromProducing(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics)
cl.blockingMetadataFn(func() {
cl.producer.purgeTopics(topics)
})
}
// PurgeTopicsFromConsuming internally removes all internal information for
// consuming about the input topics. This runs the consumer bit of logic that
// is documented in [PurgeTopicsFromClient]; see that function for more
// details.
func (cl *Client) PurgeTopicsFromConsuming(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics)
cl.blockingMetadataFn(func() {
cl.consumer.purgeTopics(topics)
})
}
// Parse broker IP/host and port from a string, using the default Kafka port if
// unspecified. Supported address formats:
//
// - IPv4 host/IP without port: "127.0.0.1", "localhost"
// - IPv4 host/IP with port: "127.0.0.1:1234", "localhost:1234"
// - IPv6 IP without port: "[2001:1000:2000::1]", "::1"
// - IPv6 IP with port: "[2001:1000:2000::1]:1234"
func parseBrokerAddr(addr string) (hostport, error) {
const defaultKafkaPort = 9092
// Bracketed IPv6
if strings.IndexByte(addr, '[') == 0 {
parts := strings.Split(addr[1:], "]")
if len(parts) != 2 {
return hostport{}, fmt.Errorf("invalid addr: %s", addr)
}
// No port specified -> use default
if len(parts[1]) == 0 {
return hostport{parts[0], defaultKafkaPort}, nil
}
port, err := strconv.ParseInt(parts[1][1:], 10, 32)
if err != nil {
return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err)
}
return hostport{parts[0], int32(port)}, nil
}
// IPv4 with no port
if strings.IndexByte(addr, ':') == -1 {
return hostport{addr, defaultKafkaPort}, nil
}
// Either a IPv6 literal ("::1"), IP:port or host:port
// Try to parse as IP:port or host:port
h, p, err := net.SplitHostPort(addr)
if err != nil {
return hostport{addr, defaultKafkaPort}, nil //nolint:nilerr // ipv6 literal -- use default kafka port
}
port, err := strconv.ParseInt(p, 10, 32)
if err != nil {
return hostport{}, fmt.Errorf("unable to parse port from addr: %w", err)
}
return hostport{h, int32(port)}, nil
}
type connTimeouter struct {
def time.Duration
joinMu sync.Mutex
lastRebalanceTimeout time.Duration
}
func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
def := c.def
millis := func(m int32) time.Duration { return time.Duration(m) * time.Millisecond }
switch t := req.(type) {
default:
if timeoutRequest, ok := req.(kmsg.TimeoutRequest); ok {
timeoutMillis := timeoutRequest.Timeout()
return def + millis(timeoutMillis), def
}
return def, def
case *produceRequest:
return def + millis(t.timeout), def
case *fetchRequest:
return def + millis(t.maxWait), def
case *kmsg.FetchRequest:
return def + millis(t.MaxWaitMillis), def
// Join and sync can take a long time. Sync has no notion of
// timeouts, but since the flow of requests should be first
// join, then sync, we can stash the timeout from the join.
case *kmsg.JoinGroupRequest:
c.joinMu.Lock()
c.lastRebalanceTimeout = millis(t.RebalanceTimeoutMillis)
c.joinMu.Unlock()
return def + millis(t.RebalanceTimeoutMillis), def
case *kmsg.SyncGroupRequest:
read := def
c.joinMu.Lock()
if c.lastRebalanceTimeout != 0 {
read = c.lastRebalanceTimeout
}
c.joinMu.Unlock()
return read, def
}
}
func (cl *Client) reinitAnyBrokerOrd() {
cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...)
for i := range cl.anyBrokerOrd {
cl.anyBrokerOrd[i] = int32(i)
}
cl.rng(func(r *rand.Rand) {
r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) {
cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i]
})
})
}
// broker returns a random broker from all brokers ever known.
func (cl *Client) broker() *broker {
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()
// Every time we loop through all discovered brokers, we issue one
// request to the next seed. This ensures that if all discovered
// brokers are down, we will *eventually* loop through seeds and
// hopefully have a reachable seed.
var b *broker
if len(cl.anyBrokerOrd) > 0 {
b = cl.brokers[cl.anyBrokerOrd[0]]
cl.anyBrokerOrd = cl.anyBrokerOrd[1:]
return b
}
seeds := cl.loadSeeds()
cl.anySeedIdx %= int32(len(seeds))
b = seeds[cl.anySeedIdx]
cl.anySeedIdx++
// If we have brokers, we ranged past discovered brokers.
// We now reset the anyBrokerOrd to begin ranging through
// discovered brokers again. If there are still no brokers,
// this reinit will do nothing and we will keep looping seeds.
cl.reinitAnyBrokerOrd()
return b
}
func (cl *Client) waitTries(ctx context.Context, backoff time.Duration) bool {
after := time.NewTimer(backoff)
defer after.Stop()
select {
case <-ctx.Done():
return false
case <-cl.ctx.Done():
return false
case <-after.C:
return true
}
}
// A broker may sometimes indicate it supports offset for leader epoch v2+ when
// it does not. We need to catch that and avoid issuing offset for leader
// epoch, because we will just loop continuously failing. We do not catch every
// case, such as when a person explicitly assigns offsets with epochs, but we
// catch a few areas that would be returned from a broker itself.
//
// This function is always used *after* at least one request has been issued.
//
// NOTE: This is a weak check; we check if any broker in the cluster supports
// the request. We use this function in three locations:
//
// 1. When using the LeaderEpoch returned in a metadata response. This guards
// against buggy brokers that return 0 rather than -1 even if they do not
// support OffsetForLeaderEpoch. If any support, the cluster is in the
// middle of an upgrade and we can start using the epoch.
// 2. When deciding whether to keep LeaderEpoch from fetched offsets.
// Realistically, clients should only commit epochs if the cluster supports
// them.
// 3. When receiving OffsetOutOfRange when follower fetching and we fetched
// past the end.
//
// In any of these cases, if we OffsetForLeaderEpoch against a broker that does
// not support (even though one in the cluster does), we will loop fail until
// the rest of the cluster is upgraded and supports the request.
func (cl *Client) supportsOffsetForLeaderEpoch() bool {
return cl.supportsKeyVersion(int16(kmsg.OffsetForLeaderEpoch), 2)
}
// A broker may not support some requests we want to make. This function checks
// support. This should only be used *after* at least one successful response.
func (cl *Client) supportsKeyVersion(key, version int16) bool {
cl.brokersMu.RLock()
defer cl.brokersMu.RUnlock()
for _, brokers := range [][]*broker{
cl.brokers,
cl.loadSeeds(),
} {
for _, b := range brokers {
if v := b.loadVersions(); v != nil && v.versions[key] >= version {
return true
}
}
}
return false
}
// fetchBrokerMetadata issues a metadata request solely for broker information.
func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
cl.fetchingBrokersMu.Lock()
wait := cl.fetchingBrokers
if wait != nil {
cl.fetchingBrokersMu.Unlock()
<-wait.done
return wait.err
}
wait = &struct {
done chan struct{}
err error
}{done: make(chan struct{})}
cl.fetchingBrokers = wait
cl.fetchingBrokersMu.Unlock()
defer func() {
cl.fetchingBrokersMu.Lock()
defer cl.fetchingBrokersMu.Unlock()
cl.fetchingBrokers = nil
close(wait.done)
}()
_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true)
return wait.err
}
func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics []string) (*broker, *kmsg.MetadataResponse, error) {
req := kmsg.NewPtrMetadataRequest()
req.AllowAutoTopicCreation = cl.cfg.allowAutoTopicCreation
if all {
req.Topics = nil
} else if len(topics) == 0 {
req.Topics = []kmsg.MetadataRequestTopic{}
} else {
for _, topic := range topics {
reqTopic := kmsg.NewMetadataRequestTopic()
reqTopic.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, reqTopic)
}
}
return cl.fetchMetadata(ctx, req, true)
}
func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) {
r := cl.retryable()
// We limit retries for internal metadata refreshes, because these do
// not need to retry forever and are usually blocking *other* requests.
// e.g., producing bumps load errors when metadata returns, so 3
// failures here will correspond to 1 bumped error count. To make the
// number more accurate, we should *never* retry here, but this is
// pretty intolerant of immediately-temporary network issues. Rather,
// we use a small count of 3 retries, which with the default backoff,
// will be <2s of retrying. This is still intolerant of temporary
// failures, but it does allow recovery from a dns issue / bad path.
if limitRetries {
r.limitRetries = 3
}
meta, err := req.RequestWith(ctx, r)
if err == nil {
if meta.ControllerID >= 0 {
cl.controllerIDMu.Lock()
cl.controllerID = meta.ControllerID
cl.controllerIDMu.Unlock()
}
cl.updateBrokers(meta.Brokers)
}
return r.last, meta, err
}
// updateBrokers is called with the broker portion of every metadata response.
// All metadata responses contain all known live brokers, so we can always
// use the response.
func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
sort.Slice(brokers, func(i, j int) bool { return brokers[i].NodeID < brokers[j].NodeID })
newBrokers := make([]*broker, 0, len(brokers))
cl.brokersMu.Lock()
defer cl.brokersMu.Unlock()
if cl.stopBrokers {
return
}
for len(brokers) > 0 && len(cl.brokers) > 0 {
ob := cl.brokers[0]
nb := brokers[0]
switch {
case ob.meta.NodeID < nb.NodeID:
ob.stopForever()
cl.brokers = cl.brokers[1:]
case ob.meta.NodeID == nb.NodeID:
if !ob.meta.equals(nb) {
ob.stopForever()
ob = cl.newBroker(nb.NodeID, nb.Host, nb.Port, nb.Rack)
}
newBrokers = append(newBrokers, ob)
cl.brokers = cl.brokers[1:]
brokers = brokers[1:]
case ob.meta.NodeID > nb.NodeID:
newBrokers = append(newBrokers, cl.newBroker(nb.NodeID, nb.Host, nb.Port, nb.Rack))
brokers = brokers[1:]
}
}
for len(cl.brokers) > 0 {
ob := cl.brokers[0]
ob.stopForever()
cl.brokers = cl.brokers[1:]
}
for len(brokers) > 0 {
nb := brokers[0]
newBrokers = append(newBrokers, cl.newBroker(nb.NodeID, nb.Host, nb.Port, nb.Rack))
brokers = brokers[1:]
}
cl.brokers = newBrokers
cl.reinitAnyBrokerOrd()
}
// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
// connections and goroutines. This function is only useful if you are using
// the BlockRebalanceOnPoll option. Close itself does not allow rebalances and
// will hang if you polled, did not allow rebalances, and want to close. Close
// does not automatically allow rebalances because leaving a group causes a
// revoke, and the client does not assume that the final revoke is concurrency
// safe. The CloseAllowingRebalance function exists a a shortcut to opt into
// allowing rebalance while closing.
func (cl *Client) CloseAllowingRebalance() {
cl.AllowRebalance()
cl.Close()
}
// Close leaves any group and closes all connections and goroutines. This
// function waits for the group to be left. If you want to force leave a group
// immediately and ensure a speedy shutdown you can use LeaveGroupContext first
// (and then Close will be immediate).
//
// If you are group consuming and have overridden the default
// OnPartitionsRevoked, you must manually commit offsets before closing the
// client.
//
// If you are using the BlockRebalanceOnPoll option and have polled, this
// function does not automatically allow rebalancing. You must AllowRebalance
// before calling this function. Internally, this function leaves the group,
// and leaving a group causes a rebalance so that you can get one final
// notification of revoked partitions. If you want to automatically allow
// rebalancing, use CloseAllowingRebalance.
func (cl *Client) Close() {
cl.close(cl.ctx)
}
func (cl *Client) close(ctx context.Context) (rerr error) {
defer cl.cfg.hooks.each(func(h Hook) {
if h, ok := h.(HookClientClosed); ok {
h.OnClientClosed(cl)
}