-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
sink_kafka.go
1297 lines (1143 loc) · 41.6 KB
/
sink_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package changefeedccl
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"hash/fnv"
"math"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Shopify/sarama"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
func isKafkaSink(u *url.URL) bool {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka, changefeedbase.SinkSchemeAzureKafka,
changefeedbase.SinkSchemeKafka:
return true
default:
return false
}
}
// maybeLocker is a wrapper around a Locker that allows for successive Unlocks
type maybeLocker struct {
wrapped sync.Locker
locked bool
}
func (l *maybeLocker) Lock() {
l.wrapped.Lock()
l.locked = true
}
func (l *maybeLocker) Unlock() {
if l.locked {
l.wrapped.Unlock()
l.locked = false
}
}
type kafkaLogAdapter struct {
ctx context.Context
}
type kafkaSinkKnobs struct {
OverrideClientInit func(config *sarama.Config) (kafkaClient, error)
OverrideAsyncProducerFromClient func(kafkaClient) (sarama.AsyncProducer, error)
OverrideSyncProducerFromClient func(kafkaClient) (sarama.SyncProducer, error)
}
var _ sarama.StdLogger = (*kafkaLogAdapter)(nil)
func (l *kafkaLogAdapter) Print(v ...interface{}) {
log.InfofDepth(l.ctx, 1, "", v...)
}
func (l *kafkaLogAdapter) Printf(format string, v ...interface{}) {
log.InfofDepth(l.ctx, 1, format, v...)
}
func (l *kafkaLogAdapter) Println(v ...interface{}) {
log.InfofDepth(l.ctx, 1, "", v...)
}
func init() {
// We'd much prefer to make one of these per sink, so we can use the real
// context, but quite unfortunately, sarama only has a global logger hook.
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}
// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
// those messages, but this rejection should not be done locally.
sarama.MaxRequestSize = math.MaxInt32
}
// kafkaClient is a small interface restricting the functionality in sarama.Client
type kafkaClient interface {
// Partitions returns the sorted list of all partition IDs for the given topic.
Partitions(topic string) ([]int32, error)
// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
RefreshMetadata(topics ...string) error
// Config returns the sarama config used on the client
Config() *sarama.Config
// Close closes kafka connection.
Close() error
}
// kafkaSink emits to Kafka asynchronously. It is not concurrency-safe; all
// calls to Emit and Flush should be from the same goroutine.
type kafkaSink struct {
ctx context.Context
bootstrapAddrs string
kafkaCfg *sarama.Config
client kafkaClient
producer sarama.AsyncProducer
topics *TopicNamer
lastMetadataRefresh time.Time
stopWorkerCh chan struct{}
worker sync.WaitGroup
scratch bufalloc.ByteAllocator
metrics metricsRecorder
knobs kafkaSinkKnobs
stats kafkaStats
// Only synchronized between the client goroutine and the worker goroutine.
mu struct {
syncutil.Mutex
inflight int64
flushErr error
flushCh chan struct{}
}
disableInternalRetry bool
}
func (s *kafkaSink) getConcreteType() sinkType {
return sinkTypeKafka
}
type compressionCodec sarama.CompressionCodec
var saramaCompressionCodecOptions = map[string]sarama.CompressionCodec{
"NONE": sarama.CompressionNone,
"GZIP": sarama.CompressionGZIP,
"SNAPPY": sarama.CompressionSnappy,
"LZ4": sarama.CompressionLZ4,
"ZSTD": sarama.CompressionZSTD,
}
func getValidCompressionCodecs() (codecs string) {
for codec := range saramaCompressionCodecOptions {
if codecs != "" {
codecs += ", "
}
codecs += codec
}
return codecs
}
func (j *compressionCodec) UnmarshalText(b []byte) error {
var c sarama.CompressionCodec
if err := c.UnmarshalText(bytes.ToLower(b)); err != nil {
return errors.WithHintf(err, "supported compression codecs are %s", getValidCompressionCodecs())
}
*j = compressionCodec(c)
return nil
}
type saramaConfig struct {
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
// See sarama.Config.Producer.Flush
Flush struct {
Bytes int `json:",omitempty"`
Messages int `json:",omitempty"`
Frequency jsonDuration `json:",omitempty"`
MaxMessages int `json:",omitempty"`
}
Compression compressionCodec `json:",omitempty"`
RequiredAcks string `json:",omitempty"`
Version string `json:",omitempty"`
}
func (c saramaConfig) Validate() error {
// If Flush.Bytes > 0 or Flush.Messages > 1 without
// Flush.Frequency, sarama may wait forever to flush the
// messages to Kafka. We want to guard against such
// configurations to ensure that we don't get into a situation
// where our call to Flush() would block forever.
if (c.Flush.Bytes > 0 || c.Flush.Messages > 1) && c.Flush.Frequency == 0 {
return errors.New("Flush.Frequency must be > 0 when Flush.Bytes > 0 or Flush.Messages > 1")
}
return nil
}
func defaultSaramaConfig() *saramaConfig {
config := &saramaConfig{}
// When we emit messages to sarama, they're placed in a queue
// (as does any reasonable kafka producer client). When our
// sink's Flush is called, we have to wait for all buffered
// and inflight requests to be sent and then
// acknowledged. Quite unfortunately, we have no way to hint
// to the producer that it should immediately send out
// whatever is buffered. This configuration can have a
// dramatic impact on how quickly this happens naturally (and
// some configurations will block forever!).
//
// The default configuration of all 0 values will send
// messages as quickly as possible.
config.Flush.Messages = 0
config.Flush.Frequency = jsonDuration(0)
config.Flush.Bytes = 0
// The default compression protocol is sarama.CompressionNone,
// which is 0.
config.Compression = 0
// This works around what seems to be a bug in sarama where it isn't
// computing the right value to compare against `Producer.MaxMessageBytes`
// and the server sends it back with a "Message was too large, server
// rejected it to avoid allocation" error. The other flush tunings are
// hints, but this one is a hard limit, so it's useful here as a workaround.
//
// This workaround should probably be something like setting
// `Producer.MaxMessageBytes` to 90% of it's value for some headroom, but
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000
return config
}
// Dial implements the Sink interface.
func (s *kafkaSink) Dial() error {
client, err := s.newClient(s.kafkaCfg)
if err != nil {
return err
}
producer, err := s.newAsyncProducer(client)
if err != nil {
return err
}
s.client = client
s.producer = producer
// Start the worker
s.stopWorkerCh = make(chan struct{})
s.worker.Add(1)
go s.workerLoop()
return nil
}
func (s *kafkaSink) newClient(config *sarama.Config) (kafkaClient, error) {
// Initialize client and producer
if s.knobs.OverrideClientInit != nil {
client, err := s.knobs.OverrideClientInit(config)
return client, err
}
client, err := sarama.NewClient(strings.Split(s.bootstrapAddrs, `,`), config)
if err != nil {
return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow,
`connecting to kafka: %s`, s.bootstrapAddrs)
}
return client, err
}
func (s *kafkaSink) newAsyncProducer(client kafkaClient) (sarama.AsyncProducer, error) {
var producer sarama.AsyncProducer
var err error
if s.knobs.OverrideAsyncProducerFromClient != nil {
producer, err = s.knobs.OverrideAsyncProducerFromClient(client)
} else {
producer, err = sarama.NewAsyncProducerFromClient(client.(sarama.Client))
}
if err != nil {
return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow,
`connecting to kafka: %s`, s.bootstrapAddrs)
}
return producer, nil
}
func (s *kafkaSink) newSyncProducer(client kafkaClient) (sarama.SyncProducer, error) {
var producer sarama.SyncProducer
var err error
if s.knobs.OverrideSyncProducerFromClient != nil {
producer, err = s.knobs.OverrideSyncProducerFromClient(client)
} else {
producer, err = sarama.NewSyncProducerFromClient(client.(sarama.Client))
}
if err != nil {
return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow,
`connecting to kafka: %s`, s.bootstrapAddrs)
}
return producer, nil
}
// Close implements the Sink interface.
func (s *kafkaSink) Close() error {
if s.stopWorkerCh != nil {
close(s.stopWorkerCh)
s.worker.Wait()
}
if s.producer != nil {
// Ignore errors related to outstanding messages since we're either shutting
// down or beginning to retry regardless
_ = s.producer.Close()
}
// s.client is only nil in tests.
if s.client != nil {
return s.client.Close()
}
return nil
}
type messageMetadata struct {
alloc kvevent.Alloc
updateMetrics recordOneMessageCallback
mvcc hlc.Timestamp
}
// EmitRow implements the Sink interface.
func (s *kafkaSink) EmitRow(
ctx context.Context,
topicDescr TopicDescriptor,
key, value []byte,
updated, mvcc hlc.Timestamp,
alloc kvevent.Alloc,
) error {
topic, err := s.topics.Name(topicDescr)
if err != nil {
return err
}
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordOneMessage()},
}
s.stats.startMessage(int64(msg.Key.Length() + msg.Value.Length()))
return s.emitMessage(ctx, msg)
}
// EmitResolvedTimestamp implements the Sink interface.
func (s *kafkaSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer s.metrics.recordResolvedCallback()()
// Periodically ping sarama to refresh its metadata. This means talking to
// zookeeper, so it shouldn't be done too often, but beyond that this
// constant was picked pretty arbitrarily.
//
// TODO(dan): Add a test for this. We can't right now (2018-11-13) because
// we'd need to bump sarama, but that's a bad idea while we're still
// actively working on stability. At the same time, revisit this tuning.
const metadataRefreshMinDuration = time.Minute
if timeutil.Since(s.lastMetadataRefresh) > metadataRefreshMinDuration {
if err := s.client.RefreshMetadata(s.topics.DisplayNamesSlice()...); err != nil {
return err
}
s.lastMetadataRefresh = timeutil.Now()
}
return s.topics.Each(func(topic string) error {
payload, err := encoder.EncodeResolvedTimestamp(ctx, topic, resolved)
if err != nil {
return err
}
s.scratch, payload = s.scratch.Copy(payload, 0 /* extraCap */)
// sarama caches this, which is why we have to periodically refresh the
// metadata above. Staleness here does not impact correctness. Some new
// partitions will miss this resolved timestamp, but they'll eventually
// be picked up and get later ones.
partitions, err := s.client.Partitions(topic)
if err != nil {
return err
}
for _, partition := range partitions {
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Key: nil,
Value: sarama.ByteEncoder(payload),
}
if err := s.emitMessage(ctx, msg); err != nil {
return err
}
}
return nil
})
}
// Flush implements the Sink interface.
func (s *kafkaSink) Flush(ctx context.Context) error {
defer s.metrics.recordFlushRequestCallback()()
flushCh := make(chan struct{}, 1)
var inflight int64
var flushErr error
var immediateFlush bool
func() {
s.mu.Lock()
defer s.mu.Unlock()
inflight = s.mu.inflight
flushErr = s.mu.flushErr
s.mu.flushErr = nil
immediateFlush = inflight == 0 || flushErr != nil
if !immediateFlush {
s.mu.flushCh = flushCh
}
}()
if immediateFlush {
return flushErr
}
if log.V(1) {
log.Infof(ctx, "flush waiting for %d inflight messages", inflight)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-flushCh:
s.mu.Lock()
defer s.mu.Unlock()
flushErr := s.mu.flushErr
s.mu.flushErr = nil
return flushErr
}
}
func (s *kafkaSink) startInflightMessage(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.flushErr != nil {
return s.mu.flushErr
}
s.mu.inflight++
if log.V(2) {
log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)
}
return nil
}
func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
if err := s.startInflightMessage(ctx); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case s.producer.Input() <- msg:
}
return nil
}
// isInternallyRetryable returns true if the sink should attempt to re-emit the
// messages with a non-batching config first rather than surfacing the error to
// the overarching feed.
func (s *kafkaSink) isInternalRetryable(err error) bool {
if s.disableInternalRetry || err == nil { // Avoid allocating a KError if we don't need to
return false
}
var kError sarama.KError
return errors.As(err, &kError) && kError == sarama.ErrMessageSizeTooLarge
}
func (s *kafkaSink) workerLoop() {
defer s.worker.Done()
// Locking/Unlocking of s.mu during the retry process must be done
// through a maybeLocker with a deferred Unlock to allow for mu to
// always be unlocked upon worker completion even if it is mid-internal-retry
muLocker := &maybeLocker{&s.mu, false}
defer muLocker.Unlock()
// For an error like ErrMessageSizeTooLarge, we freeze incoming messages and
// retry internally with a more finely grained client until one is found that
// can successfully send the errored messages.
var retryBuf []*sarama.ProducerMessage
var retryErr error
startInternalRetry := func(err error) {
s.mu.AssertHeld()
log.Infof(
s.ctx,
"kafka sink with flush config (%+v) beginning internal retry with %d inflight messages due to error: %s",
s.kafkaCfg.Producer.Flush,
s.mu.inflight,
err.Error(),
)
retryErr = err
// Note that even if we reserve mu.inflight space, it may not be filled as
// some inflight messages won't error (ex: they're for a different topic
// than the one with the original message that errored).
retryBuf = make([]*sarama.ProducerMessage, 0, s.mu.inflight)
}
endInternalRetry := func() {
retryErr = nil
retryBuf = nil
}
isRetrying := func() bool {
return retryErr != nil
}
for {
var ackMsg *sarama.ProducerMessage
var ackError error
select {
case <-s.ctx.Done():
return
case <-s.stopWorkerCh:
return
case m := <-s.producer.Successes():
ackMsg = m
case err := <-s.producer.Errors():
ackMsg, ackError = err.Msg, err.Err
if ackError != nil {
// Msg should never be nil but we're being defensive around a vendor library.
// Msg.Key is nil for sentinel errors (e.g. producer shutting down)
// and errors sending dummy messages used to prefetch metadata.
if err.Msg != nil && err.Msg.Key != nil && err.Msg.Value != nil {
ackError = errors.Wrapf(ackError,
"while sending message with key=%s, size=%d, stats=%s",
err.Msg.Key, err.Msg.Key.Length()+err.Msg.Value.Length(), s.stats.String())
}
}
}
// If we're in a retry we already had the lock.
if !isRetrying() {
muLocker.Lock()
}
s.mu.inflight--
if !isRetrying() && s.mu.flushErr == nil && s.isInternalRetryable(ackError) {
startInternalRetry(ackError)
}
// If we're retrying and its a valid but errored message, buffer it to be retried.
isValidMessage := ackMsg != nil && ackMsg.Key != nil && ackMsg.Value != nil
if isRetrying() && isValidMessage {
retryBuf = append(retryBuf, ackMsg)
} else {
s.finishProducerMessage(ackMsg, ackError)
}
// Once inflight messages to retry are done buffering, find a new client
// that successfully resends and continue on with it.
if isRetrying() && s.mu.inflight == 0 {
if err := s.handleBufferedRetries(retryBuf, retryErr); err != nil {
s.mu.flushErr = err
}
endInternalRetry()
}
// If we're in a retry inflight can be 0 but messages in retryBuf are yet to
// be resent.
if !isRetrying() && s.mu.inflight == 0 && s.mu.flushCh != nil {
s.mu.flushCh <- struct{}{}
s.mu.flushCh = nil
}
// If we're in a retry we keep hold of the lock to stop all other operations
// until the retry has completed.
if !isRetrying() {
muLocker.Unlock()
}
}
}
func (s *kafkaSink) finishProducerMessage(ackMsg *sarama.ProducerMessage, ackError error) {
s.mu.AssertHeld()
if m, ok := ackMsg.Metadata.(messageMetadata); ok {
if ackError == nil {
sz := ackMsg.Key.Length() + ackMsg.Value.Length()
s.stats.finishMessage(int64(sz))
m.updateMetrics(m.mvcc, sz, sinkDoesNotCompress)
}
m.alloc.Release(s.ctx)
}
if s.mu.flushErr == nil && ackError != nil {
s.mu.flushErr = ackError
}
}
func (s *kafkaSink) handleBufferedRetries(msgs []*sarama.ProducerMessage, retryErr error) error {
lastSendErr := retryErr
activeConfig := s.kafkaCfg
log.Infof(s.ctx, "kafka sink handling %d buffered messages for internal retry", len(msgs))
// Ensure memory for messages are always cleaned up
defer func() {
for _, msg := range msgs {
s.finishProducerMessage(msg, lastSendErr)
}
}()
for {
select {
case <-s.stopWorkerCh:
log.Infof(s.ctx, "kafka sink ending retries due to worker close")
return lastSendErr
default:
}
newConfig, wasReduced := reduceBatchingConfig(activeConfig)
// Surface the error if its not retryable or we weren't able to reduce the
// batching config any further
if !s.isInternalRetryable(lastSendErr) {
log.Infof(s.ctx, "kafka sink abandoning internal retry due to error: %s", lastSendErr.Error())
return lastSendErr
} else if !wasReduced {
log.Infof(s.ctx, "kafka sink abandoning internal retry due to being unable to reduce batching size")
return lastSendErr
}
log.Infof(s.ctx, "kafka sink retrying %d messages with reduced flush config: (%+v)", len(msgs), newConfig.Producer.Flush)
activeConfig = newConfig
newClient, err := s.newClient(newConfig)
if err != nil {
return err
}
newProducer, err := s.newSyncProducer(newClient)
if err != nil {
return err
}
s.metrics.recordInternalRetry(int64(len(msgs)), true /* reducedBatchSize */)
// SendMessages will attempt to send all messages into an AsyncProducer with
// the client's config and then block until the results come in.
lastSendErr = newProducer.SendMessages(msgs)
if lastSendErr != nil {
// nolint:errcmp
if sendErrs, ok := lastSendErr.(sarama.ProducerErrors); ok && len(sendErrs) > 0 {
// Just check the first error since all these messages being retried
// were likely from a single partition and therefore would've been
// marked with the same error.
lastSendErr = sendErrs[0].Err
}
}
if err := newProducer.Close(); err != nil {
log.Errorf(s.ctx, "closing of previous sarama producer for retry failed with: %s", err.Error())
}
if err := newClient.Close(); err != nil {
log.Errorf(s.ctx, "closing of previous sarama client for retry failed with: %s", err.Error())
}
if lastSendErr == nil {
log.Infof(s.ctx, "kafka sink internal retry succeeded")
return nil
}
}
}
func reduceBatchingConfig(c *sarama.Config) (*sarama.Config, bool) {
flooredHalve := func(num int) int {
if num < 2 {
return num
}
return num / 2
}
newConfig := *c
newConfig.Producer.Flush.Messages = flooredHalve(c.Producer.Flush.Messages)
// MaxMessages of 0 means unlimited, so treat "halving" it as reducing it to
// 250 (an arbitrary number)
if c.Producer.Flush.MaxMessages == 0 {
newConfig.Producer.Flush.MaxMessages = 250
} else {
newConfig.Producer.Flush.MaxMessages = flooredHalve(c.Producer.Flush.MaxMessages)
}
wasReduced := newConfig.Producer.Flush != c.Producer.Flush
return &newConfig, wasReduced
}
// Topics gives the names of all topics that have been initialized
// and will receive resolved timestamps.
func (s *kafkaSink) Topics() []string {
return s.topics.DisplayNamesSlice()
}
type changefeedPartitioner struct {
hash sarama.Partitioner
}
var _ sarama.Partitioner = &changefeedPartitioner{}
var _ sarama.PartitionerConstructor = newChangefeedPartitioner
func newChangefeedPartitioner(topic string) sarama.Partitioner {
return sarama.NewCustomHashPartitioner(fnv.New32a)(topic)
}
func (p *changefeedPartitioner) RequiresConsistency() bool { return true }
func (p *changefeedPartitioner) Partition(
message *sarama.ProducerMessage, numPartitions int32,
) (int32, error) {
if message.Key == nil {
return message.Partition, nil
}
return p.hash.Partition(message, numPartitions)
}
type jsonDuration time.Duration
func (j *jsonDuration) UnmarshalJSON(b []byte) error {
var s string
if err := json.Unmarshal(b, &s); err != nil {
return err
}
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
*j = jsonDuration(dur)
return nil
}
type tokenProvider struct {
tokenSource oauth2.TokenSource
}
var _ sarama.AccessTokenProvider = (*tokenProvider)(nil)
// Token implements the sarama.AccessTokenProvider interface. This is called by
// Sarama when connecting to the broker.
func (t *tokenProvider) Token() (*sarama.AccessToken, error) {
token, err := t.tokenSource.Token()
if err != nil {
// Errors will result in Sarama retrying the broker connection and logging
// the transient error, with a Broker connection error surfacing after retry
// attempts have been exhausted.
return nil, err
}
return &sarama.AccessToken{Token: token.AccessToken}, nil
}
func newTokenProvider(
ctx context.Context, dialConfig kafkaDialConfig,
) (sarama.AccessTokenProvider, error) {
// grant_type is by default going to be set to 'client_credentials' by the
// clientcredentials library as defined by the spec, however non-compliant
// auth server implementations may want a custom type
var endpointParams url.Values
if dialConfig.saslGrantType != `` {
endpointParams = url.Values{"grant_type": {dialConfig.saslGrantType}}
}
tokenURL, err := url.Parse(dialConfig.saslTokenURL)
if err != nil {
return nil, errors.Wrap(err, "malformed token url")
}
// the clientcredentials.Config's TokenSource method creates an
// oauth2.TokenSource implementation which returns tokens for the given
// endpoint, returning the same cached result until its expiration has been
// reached, and then once expired re-requesting a new token from the endpoint.
cfg := clientcredentials.Config{
ClientID: dialConfig.saslClientID,
ClientSecret: dialConfig.saslClientSecret,
TokenURL: tokenURL.String(),
Scopes: dialConfig.saslScopes,
EndpointParams: endpointParams,
}
return &tokenProvider{
tokenSource: cfg.TokenSource(ctx),
}, nil
}
// Apply configures provided kafka configuration struct based on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
// This is silly; This sink already manages its memory, and therefore, if we
// had enough resources to ingest and process this message, then sarama shouldn't
// get in a way. Set this limit to be just a bit under maximum request size.
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)
kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
if c.Version != "" {
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
return err
}
kafka.Version = parsedVersion
}
if c.RequiredAcks != "" {
parsedAcks, err := parseRequiredAcks(c.RequiredAcks)
if err != nil {
return err
}
kafka.Producer.RequiredAcks = parsedAcks
}
kafka.Producer.Compression = sarama.CompressionCodec(c.Compression)
return nil
}
func parseRequiredAcks(a string) (sarama.RequiredAcks, error) {
switch strings.ToUpper(a) {
case "0", "NONE":
return sarama.NoResponse, nil
case "1", "ONE":
return sarama.WaitForLocal, nil
case "-1", "ALL":
return sarama.WaitForAll, nil
default:
return sarama.WaitForLocal,
fmt.Errorf(`invalid acks value "%s", must be "NONE"/"0", "ONE"/"1", or "ALL"/"-1"`, a)
}
}
func getSaramaConfig(
jsonStr changefeedbase.SinkSpecificJSONConfig,
) (config *saramaConfig, err error) {
config = defaultSaramaConfig()
if jsonStr != `` {
err = json.Unmarshal([]byte(jsonStr), config)
}
return
}
type kafkaDialConfig struct {
tlsEnabled bool
tlsSkipVerify bool
caCert []byte
clientCert []byte
clientKey []byte
saslEnabled bool
saslHandshake bool
saslUser string
saslPassword string
saslMechanism string
saslTokenURL string
saslClientID string
saslClientSecret string
saslScopes []string
saslGrantType string
}
// buildDefaultKafkaConfig parses the given sinkURL and constructs its
// corresponding dialConfig for kafka.
func buildDefaultKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
dialConfig := kafkaDialConfig{}
if _, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil {
return kafkaDialConfig{}, err
}
if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
return kafkaDialConfig{}, err
}
if err := u.decodeBase64(changefeedbase.SinkParamCACert, &dialConfig.caCert); err != nil {
return kafkaDialConfig{}, err
}
if err := u.decodeBase64(changefeedbase.SinkParamClientCert, &dialConfig.clientCert); err != nil {
return kafkaDialConfig{}, err
}
if err := u.decodeBase64(changefeedbase.SinkParamClientKey, &dialConfig.clientKey); err != nil {
return kafkaDialConfig{}, err
}
if _, err := u.consumeBool(changefeedbase.SinkParamSASLEnabled, &dialConfig.saslEnabled); err != nil {
return kafkaDialConfig{}, err
}
if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLHandshake, &dialConfig.saslHandshake); !wasSet && err == nil {
dialConfig.saslHandshake = true
} else {
if err != nil {
return kafkaDialConfig{}, err
}
if !dialConfig.saslEnabled {
return kafkaDialConfig{}, errors.Errorf(`%s must be enabled to configure SASL handshake behavior`, changefeedbase.SinkParamSASLEnabled)
}
}
dialConfig.saslMechanism = u.consumeParam(changefeedbase.SinkParamSASLMechanism)
if dialConfig.saslMechanism != `` && !dialConfig.saslEnabled {
return kafkaDialConfig{}, errors.Errorf(`%s must be enabled to configure SASL mechanism`,
changefeedbase.SinkParamSASLEnabled)
}
if dialConfig.saslMechanism == `` {
dialConfig.saslMechanism = sarama.SASLTypePlaintext
}
switch dialConfig.saslMechanism {
case sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypeOAuth, sarama.SASLTypePlaintext:
default:
return kafkaDialConfig{}, errors.Errorf(`param %s must be one of %s, %s, %s, or %s`,
changefeedbase.SinkParamSASLMechanism,
sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512, sarama.SASLTypeOAuth, sarama.SASLTypePlaintext)
}
var requiredSASLParams []string
if dialConfig.saslMechanism == sarama.SASLTypeOAuth {
requiredSASLParams = []string{changefeedbase.SinkParamSASLClientID, changefeedbase.SinkParamSASLClientSecret,
changefeedbase.SinkParamSASLTokenURL}
} else {
requiredSASLParams = []string{changefeedbase.SinkParamSASLUser, changefeedbase.SinkParamSASLPassword}
}
for _, param := range requiredSASLParams {
if dialConfig.saslEnabled {
if len(u.q[param]) == 0 {
errStr := fmt.Sprintf(`%s must be provided when SASL is enabled`, param)
if dialConfig.saslMechanism != sarama.SASLTypePlaintext {
errStr += fmt.Sprintf(` using mechanism %s`, dialConfig.saslMechanism)
}
return kafkaDialConfig{}, errors.Errorf("%s", errStr)
}
} else {
if len(u.q[param]) > 0 {
return kafkaDialConfig{}, errors.Errorf(`%s must be enabled if %s is provided`,
changefeedbase.SinkParamSASLEnabled, param)
}
}
}
if dialConfig.saslMechanism != sarama.SASLTypeOAuth {
oauthParams := []string{changefeedbase.SinkParamSASLClientID, changefeedbase.SinkParamSASLClientSecret,
changefeedbase.SinkParamSASLTokenURL, changefeedbase.SinkParamSASLGrantType,
changefeedbase.SinkParamSASLScopes}
for _, param := range oauthParams {
if len(u.q[param]) > 0 {
return kafkaDialConfig{}, errors.Errorf(`%s is only a valid parameter for %s=%s`, param,
changefeedbase.SinkParamSASLMechanism, sarama.SASLTypeOAuth)
}
}
}
dialConfig.saslUser = u.consumeParam(changefeedbase.SinkParamSASLUser)
dialConfig.saslPassword = u.consumeParam(changefeedbase.SinkParamSASLPassword)
dialConfig.saslTokenURL = u.consumeParam(changefeedbase.SinkParamSASLTokenURL)
dialConfig.saslClientID = u.consumeParam(changefeedbase.SinkParamSASLClientID)
dialConfig.saslScopes = u.Query()[changefeedbase.SinkParamSASLScopes]
dialConfig.saslGrantType = u.consumeParam(changefeedbase.SinkParamSASLGrantType)
var decodedClientSecret []byte
if err := u.decodeBase64(changefeedbase.SinkParamSASLClientSecret, &decodedClientSecret); err != nil {
return kafkaDialConfig{}, err
}
dialConfig.saslClientSecret = string(decodedClientSecret)
return dialConfig, nil
}
// newMissingParameterError returns an error message for missing parameters in
// sinkURL.
func newMissingParameterError(scheme string, param string) error {
return errors.Newf("scheme %s requires parameter %s", scheme, param)
}
// newMissingParameterError returns an error message for using unsupported
// values in sinkURL.
func newRequiredValueError(param string, unsupportedValue, allowedValue string) error {
return errors.Newf("unsupported value %s for parameter %s, please use %s instead",
unsupportedValue, param, allowedValue)
}
// validateAndConsumeParams consumes and validates if the given sinkURL contains
// any unsupported values for the parameters using paramsWithAcceptedValues.
func validateAndConsumeParamsIfSet(u *sinkURL, paramsWithAcceptedValues map[string]string) error {
for param, allowedValue := range paramsWithAcceptedValues {