Skip to content

Commit

Permalink
KAFKA-8325; Remove batch from in-flight requests on MESSAGE_TOO_LARGE…
Browse files Browse the repository at this point in the history
… errors (#7176)

This patch fixes a bug in the handling of MESSAGE_TOO_LARGE errors. The large batch is split, the smaller batches are re-added to the accumulator, and the batch is deallocated, but it was not removed from the list of in-flight batches. When the batch was eventually expired from the in-flight batches, the producer would try to deallocate it a second time, causing an error. This patch changes the behavior to correctly remove the batch from the list of in-flight requests.

Reviewers: Luke Stephenson, Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
bob-barrett authored and hachikuji committed Aug 22, 2019
1 parent e323f4b commit db8cb96
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public List<ProducerBatch> inFlightBatches(TopicPartition tp) {
return inFlightBatches.containsKey(tp) ? inFlightBatches.get(tp) : new ArrayList<>();
}

public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
private void maybeRemoveFromInflightBatches(ProducerBatch batch) {
List<ProducerBatch> batches = inFlightBatches.get(batch.topicPartition);
if (batches != null) {
batches.remove(batch);
Expand All @@ -169,6 +169,11 @@ public void maybeRemoveFromInflightBatches(ProducerBatch batch) {
}
}

private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
}

/**
* Get the in-flight batches that has reached delivery timeout.
*/
Expand Down Expand Up @@ -625,7 +630,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
if (canRetry(batch, response, now)) {
Expand Down Expand Up @@ -700,8 +705,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
}

if (batch.done(response.baseOffset, response.logAppendTime, null)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
}
}

Expand All @@ -724,8 +728,7 @@ private void failBatch(ProducerBatch batch,
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);

if (batch.done(baseOffset, logAppendTime, exception)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
maybeRemoveAndDeallocateBatch(batch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2109,15 +2109,16 @@ public void testExpiredBatchDoesNotRetry() throws Exception {
@Test
public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
long deliverTimeoutMs = 1500L;
// create a producer batch with more than one record so it is eligible to split
// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> request2 =
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;

sender.runOnce(); // send request
// send request
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
// return a MESSAGE_TOO_LARGE error
client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
Expand Down Expand Up @@ -2309,6 +2310,45 @@ public void testDoNotPollWhenNoRequestSent() {
verify(client, times(2)).poll(eq(RETRY_BACKOFF_MS), anyLong());
}

@Test
public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
TransactionManager txnManager = new TransactionManager(logContext, "testSplitBatchAndSend", 60000, 100);

setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.maybeAddPartitionToTransaction(tp0);
client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

// create a producer batch with more than one record so it is eligible for splitting
Future<RecordMetadata> request1 =
accumulator.append(tp0, time.milliseconds(), "key1".getBytes(), "value1".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;
Future<RecordMetadata> request2 =
accumulator.append(tp0, time.milliseconds(), "key2".getBytes(), "value2".getBytes(), null, null,
MAX_BLOCK_TIMEOUT).future;

// send request
sender.runOnce();
assertEquals(1, sender.inFlightBatches(tp0).size());
// return a MESSAGE_TOO_LARGE error
client.respond(produceResponse(tp0, -1, Errors.MESSAGE_TOO_LARGE, -1));
sender.runOnce();

// process retried response
sender.runOnce();
client.respond(produceResponse(tp0, 0, Errors.NONE, 0));
sender.runOnce();

// In-flight batches should be empty. Sleep past the expiration time of the batch and run once, no error should be thrown
assertEquals(0, sender.inFlightBatches(tp0).size());
time.sleep(2000);
sender.runOnce();
}

class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {

private TransactionResult requiredResult;
Expand Down

0 comments on commit db8cb96

Please sign in to comment.