Skip to content

Commit

Permalink
Only create and add records when Deque was pre-created. Deque will re…
Browse files Browse the repository at this point in the history
…main null when either partition is encountered first time, batches are cleaned up after expiration OR when sender clears deque
  • Loading branch information
sudeshwasnik committed Sep 5, 2022
1 parent a0e9020 commit 772ffd8
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -271,27 +272,31 @@ public RecordAppendResult append(String topic,
// Now that we know the effective partition, let the caller know.
setPartition(callbacks, effectivePartition);

// boolean to check if a new Deque<ProducerBatch> will get created for this partition.
boolean noDqForPartition = !topicInfo.batches.containsKey(effectivePartition);

// check if we have an in-progress batch
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
RecordAppendResult appendResult;
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (topicInfo.builtInPartitioner.isPartitionChanged(partitionInfo)) {
log.trace("Partition {} for topic {} switched by a concurrent append, retrying",
partitionInfo.partition(), topic);
continue;
}
appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null && !appendResult.newBatchCreated) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster);
return appendResult;
}
}

// either 1. current topicPartition producerBatch is full - return and prepare for another batch/partition.
// 2. no producerBatch existed for this topicPartition, create a new producerBatch.
if (appendResult == null && abortOnNewBatch) {
// existing batch is full, return a result that will cause another call to append.
// noDqForPartition is true either when 1. partition was encountered for first time so no Deque existed previously.
// 2. DQ was removed due to - all batches were cleared due to expiration or sender cleared batches after draining.
// if so, abort and look to call partitioner -> onNewBatch and select other partition.
// This prevents a single partition getting re-selected after recent drain.
if (abortOnNewBatch && noDqForPartition) {
// existing batch is full or no batch exists, return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}

Expand All @@ -315,7 +320,7 @@ public RecordAppendResult append(String topic,
partitionInfo.partition(), topic);
continue;
}
appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
Expand Down Expand Up @@ -356,7 +361,7 @@ private RecordAppendResult appendNewBatch(String topic,
assert partition != RecordMetadata.UNKNOWN_PARTITION;

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null && !appendResult.newBatchCreated) {
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
Expand Down Expand Up @@ -397,15 +402,13 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, H
int initialBytes = last.estimatedSizeInBytes();
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
if (future == null) {
// producerBatch doesn't have anymore room, return then try to create a new batch.
last.closeForRecordAppends();
return null;
} else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
}
}
return new RecordAppendResult(null, false, true, false, 0);
return null;
}

private boolean isMuted(TopicPartition tp) {
Expand All @@ -432,19 +435,26 @@ public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch) {
*/
public List<ProducerBatch> expiredBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();
for (TopicInfo topicInfo : topicInfoMap.values()) {
for (Deque<ProducerBatch> deque : topicInfo.batches.values()) {
for (Entry<String, TopicInfo> entryTopicInfo : topicInfoMap.entrySet()) {
TopicInfo topicInfo = entryTopicInfo.getValue();
for (Entry<Integer, Deque<ProducerBatch>> entry : topicInfo.batches.entrySet()) {
// expire the batches in the order of sending
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
while (!deque.isEmpty()) {
ProducerBatch batch = deque.getFirst();
if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
deque.poll();
batch.abortRecordAppends();
expiredBatches.add(batch);
} else {
maybeUpdateNextBatchExpiryTime(batch);
break;
if (!deque.isEmpty()) {
while (!deque.isEmpty()) {
ProducerBatch batch = deque.getFirst();
if (batch.hasReachedDeliveryTimeout(deliveryTimeoutMs, now)) {
deque.poll();
batch.abortRecordAppends();
expiredBatches.add(batch);
} else {
maybeUpdateNextBatchExpiryTime(batch);
break;
}
}
if (deque.isEmpty()) {
clearDeque(new TopicPartition(entryTopicInfo.getKey(), entry.getKey()));
}
}
}
Expand Down Expand Up @@ -809,8 +819,10 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i
synchronized (deque) {
// invariant: !isMuted(tp,now) && deque != null
ProducerBatch first = deque.peekFirst();
if (first == null)
if (first == null) {
clearDeque(tp);
continue;
}

// first != null
boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
Expand Down Expand Up @@ -944,6 +956,13 @@ public Deque<ProducerBatch> getDeque(TopicPartition tp) {
return topicInfo.batches.get(tp.partition());
}

/* Visible for testing */
public void clearDeque(TopicPartition tp) {
TopicInfo topicInfo = topicInfoMap.get(tp.topic());
if (topicInfo != null)
topicInfo.batches.remove(tp.partition());
}

/**
* Get the deque for the given topic-partition, creating it if necessary.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ public class KafkaProducerTest {
new PartitionInfo(topic, 2, null, null, null)),
Collections.emptySet(),
Collections.emptySet());
private final Cluster fourPartitionCluster = new Cluster(
"dummy",
nodes,
Arrays.asList(
new PartitionInfo(topic, 0, null, null, null),
new PartitionInfo(topic, 1, null, null, null),
new PartitionInfo(topic, 2, null, null, null),
new PartitionInfo(topic, 3, null, null, null)),
Collections.emptySet(),
Collections.emptySet());
private TestInfo testInfo;

private static final int DEFAULT_METADATA_IDLE_MS = 5 * 60 * 1000;
Expand Down Expand Up @@ -2050,15 +2060,24 @@ public void testRoundRobinPartitionerDoesNotSkipPartition() throws InterruptedEx
RoundRobinPartitioner roundRobinPartitioner = new RoundRobinPartitioner();
ProducerMetadata metadata = mock(ProducerMetadata.class);
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
when(metadata.fetch()).thenReturn(threePartitionCluster);
when(metadata.fetch()).thenReturn(fourPartitionCluster);
KafkaProducer<String, String> producer = producerWithOverrideNewPartitioner(configs, metadata, roundRobinPartitioner);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "value");
// the first time, skip partition-0, send to partition-1.
producer.send(record);
// skip partition-2, send to partition-3.
producer.send(record);

// calling partition() again should result 1. This shows producer.send() invoked partitioner.partition() only once.
// calling partition() again should result 0. The next partition we get is partition-0
int newPartitionCount = roundRobinPartitioner.partition(topic, null,
null, null, null, threePartitionCluster);
assertEquals(1, newPartitionCount);
null, null, null, fourPartitionCluster);
assertEquals(0, newPartitionCount);

producer.send(record);
// calling partition() again should result 1. The next partition we get is partition-2
newPartitionCount = roundRobinPartitioner.partition(topic, null,
null, null, null, fourPartitionCluster);
assertEquals(2, newPartitionCount);
producer.close(Duration.ofMillis(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,12 +1249,48 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}

@Test
public void testFirstRecordCreatesProducerBatchIfNoneExists() throws InterruptedException {
public void testProducerBatchIsNotCreatedIfNoDequeExists() throws InterruptedException {
RecordAccumulator accum = createTestRecordAccumulator(DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
10L * DefaultRecordBatch.RECORD_BATCH_OVERHEAD, CompressionType.NONE, 10);
Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
assertEquals(null, partitionBatches);

// accum.append should skip the partition when a partition is encountered the first time.
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
partitionBatches = accum.getDeque(tp1);
assertEquals(0, partitionBatches.size());

// accum.append should create a new ProducerBatch if none exists even when abortOnNewBatch is true
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
partitionBatches = accum.getDeque(tp1);
assertEquals(1, partitionBatches.size());
}

@Test
public void testProducerBatchIsNotCreatedIfNoDequeExistsAfterExpire() throws InterruptedException {
long createdTimeMs = System.currentTimeMillis();
int deliveryTimeoutMs = 10;
RecordAccumulator accum = createTestRecordAccumulator(deliveryTimeoutMs, DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
10L * DefaultRecordBatch.RECORD_BATCH_OVERHEAD, CompressionType.NONE, 10);
// accum.append should skip the partition when a partition is encountered the first time.
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
Deque<ProducerBatch> partitionBatches = accum.getDeque(tp1);
assertEquals(0, partitionBatches.size());

// accum.append should create a new ProducerBatch if none exists even when abortOnNewBatch is true and deque existed
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
partitionBatches = accum.getDeque(tp1);
assertEquals(1, partitionBatches.size());

accum.expiredBatches(createdTimeMs + deliveryTimeoutMs + 10);
// accum.append should skip the partition when a partition is encountered the first time after batch is expired.
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
partitionBatches = accum.getDeque(tp1);
assertEquals(0, partitionBatches.size());

// accum.append should create a new ProducerBatch if none exists even when abortOnNewBatch is true and deque existed
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, true, time.milliseconds(), cluster);
partitionBatches = accum.getDeque(tp1);
assertEquals(1, partitionBatches.size());
}

Expand Down

0 comments on commit 772ffd8

Please sign in to comment.