/
RecordAccumulator.java
1238 lines (1127 loc) · 58.2 KB
/
RecordAccumulator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer.internals;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
/**
* This class acts as a queue that accumulates records into {@link MemoryRecords}
* instances to be sent to the server.
* <p>
* The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
* this behavior is explicitly disabled.
*/
public class RecordAccumulator {
private final LogContext logContext;
private final Logger log;
private volatile boolean closed;
private final AtomicInteger flushesInProgress;
private final AtomicInteger appendsInProgress;
private final int batchSize;
private final CompressionType compression;
private final int lingerMs;
private final long retryBackoffMs;
private final int deliveryTimeoutMs;
private final long partitionAvailabilityTimeoutMs; // latency threshold for marking partition temporary unavailable
private final boolean enableAdaptivePartitioning;
private final BufferPool free;
private final Time time;
private final ApiVersions apiVersions;
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
private final IncompleteBatches incomplete;
// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private final Map<String, Integer> nodesDrainIndex;
private final TransactionManager transactionManager;
private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
/**
* Create a new record accumulator
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param compression The compression codec for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
* @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
* exhausting all retries in a short period of time.
* @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery
* @param partitionerConfig Partitioner config
* @param metrics The metrics
* @param metricGrpName The metric group name
* @param time The time instance to use
* @param apiVersions Request API versions for current connected brokers
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
* numbers per partition.
* @param bufferPool The buffer pool
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
PartitionerConfig partitionerConfig,
Metrics metrics,
String metricGrpName,
Time time,
ApiVersions apiVersions,
TransactionManager transactionManager,
BufferPool bufferPool) {
this.logContext = logContext;
this.log = logContext.logger(RecordAccumulator.class);
this.closed = false;
this.flushesInProgress = new AtomicInteger(0);
this.appendsInProgress = new AtomicInteger(0);
this.batchSize = batchSize;
this.compression = compression;
this.lingerMs = lingerMs;
this.retryBackoffMs = retryBackoffMs;
this.deliveryTimeoutMs = deliveryTimeoutMs;
this.enableAdaptivePartitioning = partitionerConfig.enableAdaptivePartitioning;
this.partitionAvailabilityTimeoutMs = partitionerConfig.partitionAvailabilityTimeoutMs;
this.free = bufferPool;
this.incomplete = new IncompleteBatches();
this.muted = new HashSet<>();
this.time = time;
this.apiVersions = apiVersions;
nodesDrainIndex = new HashMap<>();
this.transactionManager = transactionManager;
registerMetrics(metrics, metricGrpName);
}
/**
* Create a new record accumulator with default partitioner config
*
* @param logContext The log context used for logging
* @param batchSize The size to use when allocating {@link MemoryRecords} instances
* @param compression The compression codec for the records
* @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
* sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
* latency for potentially better throughput due to more batching (and hence fewer, larger requests).
* @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
* exhausting all retries in a short period of time.
* @param deliveryTimeoutMs An upper bound on the time to report success or failure on record delivery
* @param metrics The metrics
* @param metricGrpName The metric group name
* @param time The time instance to use
* @param apiVersions Request API versions for current connected brokers
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
* numbers per partition.
* @param bufferPool The buffer pool
*/
public RecordAccumulator(LogContext logContext,
int batchSize,
CompressionType compression,
int lingerMs,
long retryBackoffMs,
int deliveryTimeoutMs,
Metrics metrics,
String metricGrpName,
Time time,
ApiVersions apiVersions,
TransactionManager transactionManager,
BufferPool bufferPool) {
this(logContext,
batchSize,
compression,
lingerMs,
retryBackoffMs,
deliveryTimeoutMs,
new PartitionerConfig(),
metrics,
metricGrpName,
time,
apiVersions,
transactionManager,
bufferPool);
}
private void registerMetrics(Metrics metrics, String metricGrpName) {
metrics.addMetric(
metrics.metricName("waiting-threads", metricGrpName,
"The number of user threads blocked waiting for buffer memory to enqueue their records"),
(config, now) -> free.queued());
metrics.addMetric(
metrics.metricName("buffer-total-bytes", metricGrpName,
"The maximum amount of buffer memory the client can use (whether or not it is currently used)."),
(config, now) -> free.totalMemory());
metrics.addMetric(
metrics.metricName("buffer-available-bytes", metricGrpName,
"The total amount of buffer memory that is not being used (either unallocated or in the free list)."),
(config, now) -> free.availableMemory());
}
private void setPartition(AppendCallbacks callbacks, int partition) {
if (callbacks != null)
callbacks.setPartition(partition);
}
/**
* Check if partition concurrently changed, or we need to complete previously disabled partition change.
*
* @param topic The topic
* @param topicInfo The topic info
* @param partitionInfo The built-in partitioner's partition info
* @param deque The partition queue
* @param nowMs The current time, in milliseconds
* @param cluster THe cluster metadata
* @return 'true' if partition changed and we need to get new partition info and retry,
* 'false' otherwise
*/
private boolean partitionChanged(String topic,
TopicInfo topicInfo,
BuiltInPartitioner.StickyPartitionInfo partitionInfo,
Deque<ProducerBatch> deque, long nowMs,
Cluster cluster) {
if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
partitionInfo.partition(), topic);
return true;
}
// We might have disabled partition switch if the queue had incomplete batches.
// Check if all batches are full now and switch .
if (allBatchesFull(deque)) {
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, 0, cluster, true);
if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
log.trace("Completed previously disabled switch for topic {} partition {}, retrying",
topic, partitionInfo.partition());
return true;
}
}
return false;
}
/**
* Add a record to the accumulator, return the append result
* <p>
* The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
* <p>
*
* @param topic The topic to which this record is being sent
* @param partition The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION
* if any partition could be used
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
* @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
* running the partitioner's onNewBatch method before trying to append again
* @param nowMs The current time, in milliseconds
* @param cluster The cluster metadata
*/
public RecordAppendResult append(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// Loop to retry in case we encounter partitioner's race conditions.
while (true) {
// If the message doesn't have any partition affinity, so we pick a partition based on the broker
// availability and performance. Note, that here we peek current partition before we hold the
// deque lock, so we'll need to make sure that it's not changed while we were waiting for the
// deque lock.
final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
final int effectivePartition;
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
}
// Now that we know the effective partition, let the caller know.
setPartition(callbacks, effectivePartition);
// check if we have an in-progress batch
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
// This call may block if we exhausted buffer space.
buffer = free.allocate(size, maxTimeToBlock);
// Update the current time in case the buffer allocation blocked above.
// NOTE: getting time may be expensive, so calling it under a lock
// should be avoided.
nowMs = time.milliseconds();
}
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
} finally {
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
/**
* Append a new batch to the queue
*
* @param topic The topic
* @param partition The partition (cannot be RecordMetadata.UNKNOWN_PARTITION)
* @param dq The queue
* @param timestamp The timestamp of the record
* @param key The key for the record
* @param value The value for the record
* @param headers the Headers for the record
* @param callbacks The callbacks to execute
* @param buffer The buffer for the new batch
* @param nowMs The current time, in milliseconds
*/
private RecordAppendResult appendNewBatch(String topic,
int partition,
Deque<ProducerBatch> dq,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
AppendCallbacks callbacks,
ByteBuffer buffer,
long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
dq.addLast(batch);
incomplete.add(batch);
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false, batch.estimatedSizeInBytes());
}
private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
"support the required message format (v2). The broker must be version 0.11 or later.");
}
return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
}
/**
* Check if all batches in the queue are full.
*/
private boolean allBatchesFull(Deque<ProducerBatch> deque) {
// Only the last batch may be incomplete, so we just check that.
ProducerBatch last = deque.peekLast();
return last == null || last.isFull();
}
/**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
ProducerBatch last = deque.peekLast();
if (last != null) {
int initialBytes = last.estimatedSizeInBytes();
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null) {
last.closeForRecordAppends();
} else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
}
}
return null;
}
private boolean isMuted(TopicPartition tp) {
return muted.contains(tp);
}
public void resetNextBatchExpiryTime() {
nextBatchExpiryTimeMs = Long.MAX_VALUE;
}
public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
if (batch.createdMs + deliveryTimeoutMs > 0) {
// the non-negative check is to guard us against potential overflow due to setting
// a large value for deliveryTimeoutMs
nextBatchExpiryTimeMs = Math.min(nextBatchExpiryTimeMs, batch.createdMs + deliveryTimeoutMs);
} else {
log.warn("Skipping next batch expiry time update due to addition overflow: "
+ "batch.createMs={}, deliveryTimeoutMs={}", batch.createdMs, deliveryTimeoutMs);
}
}
/**
* Get a list of batches which have been sitting in the accumulator too long and need to be expired.
*/
public List<ProducerBatch> expiredBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();
for (TopicInfo topicInfo : topicInfoMap.values()) {
for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {
// expire the batches in the order of sending
synchronized (deque) {
while (!deque.isEmpty()) {
ProducerBatch batch = deque.getFirst();
if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
deque.poll();
batch.abortRecordAppends();
expiredBatches.add(batch);
} else {
maybeUpdateNextBatchExpiryTime(batch);
break;
}
}
}
}
}
return expiredBatches;
}
public long getDeliveryTimeoutMs() {
return deliveryTimeoutMs;
}
/**
* Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check
* whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here.
*/
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);
}
}
/**
* Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.
* @return the number of split batches.
*/
public int splitAndReenqueue(ProducerBatch bigBatch) {
// Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever
// is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure
// the split doesn't happen too often.
CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression,
Math.max(1.0f, (float) bigBatch.compressionRatio()));
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize);
int numSplitBatches = dq.size();
Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition);
while (!dq.isEmpty()) {
ProducerBatch batch = dq.pollLast();
incomplete.add(batch);
// We treat the newly split batches as if they are not even tried.
synchronized (partitionDequeue) {
if (transactionManager != null) {
// We should track the newly created batches since they already have assigned sequences.
transactionManager.addInFlightBatch(batch);
insertInSequenceOrder(partitionDequeue, batch);
} else {
partitionDequeue.addFirst(batch);
}
}
}
return numSplitBatches;
}
// We will have to do extra work to ensure the queue is in order when requests are being retried and there are
// multiple requests in flight to that partition. If the first in flight request fails to append, then all the
// subsequent in flight requests will also fail because the sequence numbers will not be accepted.
//
// Further, once batches are being retried, we are reduced to a single in flight request for that partition. So when
// the subsequent batches come back in sequence order, they will have to be placed further back in the queue.
//
// Note that this assumes that all the batches in the queue which have an assigned sequence also have the current
// producer id. We will not attempt to reorder messages if the producer id has changed, we will throw an
// IllegalStateException instead.
private void insertInSequenceOrder(Deque<ProducerBatch> deque, ProducerBatch batch) {
// When we are re-enqueueing and have enabled idempotence, the re-enqueued batch must always have a sequence.
if (batch.baseSequence() == RecordBatch.NO_SEQUENCE)
throw new IllegalStateException("Trying to re-enqueue a batch which doesn't have a sequence even " +
"though idempotency is enabled.");
if (!transactionManager.hasInflightBatches(batch.topicPartition))
throw new IllegalStateException("We are re-enqueueing a batch which is not tracked as part of the in flight " +
"requests. batch.topicPartition: " + batch.topicPartition + "; batch.baseSequence: " + batch.baseSequence());
ProducerBatch firstBatchInQueue = deque.peekFirst();
if (firstBatchInQueue != null && firstBatchInQueue.hasSequence() && firstBatchInQueue.baseSequence() < batch.baseSequence()) {
// The incoming batch can't be inserted at the front of the queue without violating the sequence ordering.
// This means that the incoming batch should be placed somewhere further back.
// We need to find the right place for the incoming batch and insert it there.
// We will only enter this branch if we have multiple inflights sent to different brokers and we need to retry
// the inflight batches.
//
// Since we reenqueue exactly one batch a time and ensure that the queue is ordered by sequence always, it
// is a simple linear scan of a subset of the in flight batches to find the right place in the queue each time.
List<ProducerBatch> orderedBatches = new ArrayList<>();
while (deque.peekFirst() != null && deque.peekFirst().hasSequence() && deque.peekFirst().baseSequence() < batch.baseSequence())
orderedBatches.add(deque.pollFirst());
log.debug("Reordered incoming batch with sequence {} for partition {}. It was placed in the queue at " +
"position {}", batch.baseSequence(), batch.topicPartition, orderedBatches.size());
// Either we have reached a point where there are batches without a sequence (ie. never been drained
// and are hence in order by default), or the batch at the front of the queue has a sequence greater
// than the incoming batch. This is the right place to add the incoming batch.
deque.addFirst(batch);
// Now we have to re insert the previously queued batches in the right order.
for (int i = orderedBatches.size() - 1; i >= 0; --i) {
deque.addFirst(orderedBatches.get(i));
}
// At this point, the incoming batch has been queued in the correct place according to its sequence.
} else {
deque.addFirst(batch);
}
}
/**
* Add the leader to the ready nodes if the batch is ready
*
* @param nowMs The current time
* @param exhausted 'true' is the buffer pool is exhausted
* @param part The partition
* @param leader The leader for the partition
* @param waitedTimeMs How long batch waited
* @param backingOff Is backing off
* @param full Is batch full
* @param nextReadyCheckDelayMs The delay for next check
* @param readyNodes The set of ready nodes (to be filled in)
* @return The delay for next check
*/
private long batchReady(long nowMs, boolean exhausted, TopicPartition part, Node leader,
long waitedTimeMs, boolean backingOff, boolean full,
long nextReadyCheckDelayMs, Set<Node> readyNodes) {
if (!readyNodes.contains(leader) && !isMuted(part)) {
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
boolean sendable = full
|| expired
|| exhausted
|| closed
|| flushInProgress()
|| transactionCompleting;
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
return nextReadyCheckDelayMs;
}
/**
* Iterate over partitions to see which one have batches ready and collect leaders of those partitions
* into the set of ready nodes. If partition has no leader, add the topic to the set of topics with
* no leader. This function also calculates stats for adaptive partitioning.
*
* @param cluster The cluster metadata
* @param nowMs The current time
* @param topic The topic
* @param topicInfo The topic info
* @param nextReadyCheckDelayMs The delay for next check
* @param readyNodes The set of ready nodes (to be filled in)
* @param unknownLeaderTopics The set of topics with no leader (to be filled in)
* @return The delay for next check
*/
private long partitionReady(Cluster cluster, long nowMs, String topic,
TopicInfo topicInfo,
long nextReadyCheckDelayMs, Set<Node> readyNodes, Set<String> unknownLeaderTopics) {
ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches;
// Collect the queue sizes for available partitions to be used in adaptive partitioning.
int[] queueSizes = null;
int[] partitionIds = null;
if (enableAdaptivePartitioning && batches.size() >= cluster.partitionsForTopic(topic).size()) {
// We don't do adaptive partitioning until we scheduled at least a batch for all
// partitions (i.e. we have the corresponding entries in the batches map), we just
// do uniform. The reason is that we build queue sizes from the batches map,
// and if an entry is missing in the batches map, then adaptive partitioning logic
// won't know about it and won't switch to it.
queueSizes = new int[batches.size()];
partitionIds = new int[queueSizes.length];
}
int queueSizesIndex = -1;
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<Integer, Deque<ProducerBatch>> entry : batches.entrySet()) {
TopicPartition part = new TopicPartition(topic, entry.getKey());
// Advance queueSizesIndex so that we properly index available
// partitions. Do it here so that it's done for all code paths.
Node leader = cluster.leaderFor(part);
if (leader != null && queueSizes != null) {
++queueSizesIndex;
assert queueSizesIndex < queueSizes.length;
partitionIds[queueSizesIndex] = part.partition();
}
Deque<ProducerBatch> deque = entry.getValue();
final long waitedTimeMs;
final boolean backingOff;
final int dequeSize;
final boolean full;
// This loop is especially hot with large partition counts.
// We are careful to only perform the minimum required inside the
// synchronized block, as this lock is also used to synchronize producer threads
// attempting to append() to a partition/batch.
synchronized (deque) {
// Deques are often empty in this path, esp with large partition counts,
// so we exit early if we can.
ProducerBatch batch = deque.peekFirst();
if (batch == null) {
continue;
}
waitedTimeMs = batch.waitedTimeMs(nowMs);
backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
dequeSize = deque.size();
full = dequeSize > 1 || batch.isFull();
}
if (leader == null) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else {
if (queueSizes != null)
queueSizes[queueSizesIndex] = dequeSize;
if (partitionAvailabilityTimeoutMs > 0) {
// Check if we want to exclude the partition from the list of available partitions
// if the broker hasn't responded for some time.
NodeLatencyStats nodeLatencyStats = nodeStats.get(leader.id());
if (nodeLatencyStats != null) {
// NOTE: there is no synchronization between reading metrics,
// so we read ready time first to avoid accidentally marking partition
// unavailable if we read while the metrics are being updated.
long readyTimeMs = nodeLatencyStats.readyTimeMs;
if (readyTimeMs - nodeLatencyStats.drainTimeMs > partitionAvailabilityTimeoutMs)
--queueSizesIndex;
}
}
nextReadyCheckDelayMs = batchReady(nowMs, exhausted, part, leader, waitedTimeMs, backingOff,
full, nextReadyCheckDelayMs, readyNodes);
}
}
// We've collected the queue sizes for partitions of this topic, now we can calculate
// load stats. NOTE: the stats are calculated in place, modifying the
// queueSizes array.
topicInfo.builtInPartitioner.updatePartitionLoadStats(queueSizes, partitionIds, queueSizesIndex + 1);
return nextReadyCheckDelayMs;
}
/**
* Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
* partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
* partition batches.
* <p>
* A destination node is ready to send data if:
* <ol>
* <li>There is at least one partition that is not backing off its send
* <li><b>and</b> those partitions are not muted (to prevent reordering if
* {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
* is set to one)</li>
* <li><b>and <i>any</i></b> of the following are true</li>
* <ul>
* <li>The record set is full</li>
* <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
* <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
* are immediately considered ready).</li>
* <li>The accumulator has been closed</li>
* </ul>
* </ol>
*/
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
// Go topic by topic so that we can get queue sizes for partitions in a topic and calculate
// cumulative frequency table (used in partitioner).
for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
final String topic = topicInfoEntry.getKey();
nextReadyCheckDelayMs = partitionReady(cluster, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
/**
* Check whether there are any batches which haven't been drained
*/
public boolean hasUndrained() {
for (TopicInfo topicInfo : topicInfoMap.values()) {
for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {
synchronized (deque) {
if (!deque.isEmpty())
return true;
}
}
}
return false;
}
private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
ProducerIdAndEpoch producerIdAndEpoch = null;
if (transactionManager != null) {
if (!transactionManager.isSendToPartitionAllowed(tp))
return true;
producerIdAndEpoch = transactionManager.producerIdAndEpoch();
if (!producerIdAndEpoch.isValid())
// we cannot send the batch until we have refreshed the producer id
return true;
if (!first.hasSequence()) {
if (transactionManager.hasInflightBatches(tp) && transactionManager.hasStaleProducerIdAndEpoch(tp)) {
// Don't drain any new batches while the partition has in-flight batches with a different epoch
// and/or producer ID. Otherwise, a batch with a new epoch and sequence number
// 0 could be written before earlier batches complete, which would cause out of sequence errors
return true;
}
if (transactionManager.hasUnresolvedSequence(first.topicPartition))
// Don't drain any new batches while the state of previous sequence numbers
// is unknown. The previous batches would be unknown if they were aborted
// on the client after being sent to the broker at least once.
return true;
}
int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
&& first.baseSequence() != firstInFlightSequence)
// If the queued batch already has an assigned sequence, then it is being retried.
// In this case, we wait until the next immediate batch is ready and drain that.
// We only move on when the next in line batch is complete (either successfully or due to
// a fatal broker error). This effectively reduces our in flight request count to 1.
return true;
}
return false;
}
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely each node has it's own drainIndex */
int drainIndex = getDrainIndex(node.idString());
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
updateDrainIndex(node.idString(), drainIndex);
drainIndex = (drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
final ProducerBatch batch;
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
if (first == null)
continue;
// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
// Only drain the batch if it is not during backoff period.
if (backoff)
continue;
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due to
// compression; in this case we will still eventually send this batch in a single request
break;
} else {
if (shouldStopDrainBatchesForPartition(first, tp))
break;
}
batch = deque.pollFirst();
boolean isTransactional = transactionManager != null && transactionManager.isTransactional();
ProducerIdAndEpoch producerIdAndEpoch =
transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
if (producerIdAndEpoch != null && !batch.hasSequence()) {
// If the producer id/epoch of the partition do not match the latest one
// of the producer, we update it and reset the sequence. This should be
// only done when all its in-flight batches have completed. This is guarantee
// in `shouldStopDrainBatchesForPartition`.
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
// If the batch already has an assigned sequence, then we should not change the producer id and
// sequence number, since this may introduce duplicates. In particular, the previous attempt
// may actually have been accepted, and if we change the producer id and sequence here, this
// attempt will also be accepted, causing a duplicate.
//
// Additionally, we update the next sequence number bound for the partition, and also have
// the transaction manager track the batch so as to ensure that sequence ordering is maintained
// even if we receive out of order responses.
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
"{} being sent to partition {}", producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, batch.baseSequence(), tp);
transactionManager.addInFlightBatch(batch);
}
}
// the rest of the work by processing outside the lock
// close() is particularly expensive
batch.close();
size += batch.records().sizeInBytes();
ready.add(batch);
batch.drained(now);
} while (start != drainIndex);
return ready;
}
private int getDrainIndex(String idString) {
return nodesDrainIndex.computeIfAbsent(idString, s -> 0);
}
private void updateDrainIndex(String idString, int drainIndex) {
nodesDrainIndex.put(idString, drainIndex);
}
/**
* Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
*
* @param cluster The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
*/
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
public void updateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain) {
// Don't bother with updating stats if the feature is turned off.
if (partitionAvailabilityTimeoutMs <= 0)
return;
// When the sender gets a node (returned by the ready() function) that has data to send
// but the node is not ready (and so we cannot drain the data), we only update the
// ready time, then the difference would reflect for how long a node wasn't ready
// to send the data. Then we can temporarily remove partitions that are handled by the
// node from the list of available partitions so that the partitioner wouldn't pick
// this partition.
// NOTE: there is no synchronization for metric updates, so drainTimeMs is updated
// first to avoid accidentally marking a partition unavailable if the reader gets
// values between updates.
NodeLatencyStats nodeLatencyStats = nodeStats.computeIfAbsent(nodeId, id -> new NodeLatencyStats(nowMs));
if (canDrain)
nodeLatencyStats.drainTimeMs = nowMs;
nodeLatencyStats.readyTimeMs = nowMs;
}
/* Visible for testing */
public NodeLatencyStats getNodeLatencyStats(Integer nodeId) {
return nodeStats.get(nodeId);
}
/* Visible for testing */
public BuiltInPartitioner getBuiltInPartitioner(String topic) {
return topicInfoMap.get(topic).builtInPartitioner;
}
/**
* The earliest absolute time a batch will expire (in milliseconds)
*/
public long nextExpiryTimeMs() {
return this.nextBatchExpiryTimeMs;
}
/* Visible for testing */
public Deque<ProducerBatch> getDeque(TopicPartition tp) {
TopicInfo topicInfo = topicInfoMap.get(tp.topic());
if (topicInfo == null)
return null;
return topicInfo.batches.get(tp.partition());
}
/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(logContext, k, batchSize));
return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque<>());
}
/**
* Deallocate the record batch
*/
public void deallocate(ProducerBatch batch) {
incomplete.remove(batch);