From 4974842e629c098b0d36bc42e189bff211e7faac Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Sep 2017 17:09:30 -0700 Subject: [PATCH 1/3] KAFKA-5957: Prevent second deallocate if produce response for aborted batch returns --- .../producer/internals/ProducerBatch.java | 6 +- .../clients/producer/internals/Sender.java | 9 +- .../producer/internals/ProducerBatchTest.java | 44 ++++-- .../producer/internals/SenderTest.java | 149 ++++++++---------- 4 files changed, 111 insertions(+), 97 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 93c843b9163a1..ea0f0f7dff953 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -163,8 +163,9 @@ public void abort(RuntimeException exception) { * @param baseOffset The base offset of the messages assigned by the server * @param logAppendTime The log append time or -1 if CreateTime is being used * @param exception The exception that occurred (or null if the request was successful) + * @return true if the batch was completed successfully and false if the batch was previously aborted */ - public void done(long baseOffset, long logAppendTime, RuntimeException exception) { + public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { final FinalState finalState; if (exception == null) { log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset); @@ -177,13 +178,14 @@ public void done(long baseOffset, long logAppendTime, RuntimeException exception if (!this.finalState.compareAndSet(null, finalState)) { if (this.finalState.get() == FinalState.ABORTED) { log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition); - return; + return false; } else { throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get()); } } completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); + return true; } private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index d71046a80cc5d..fe699987f1314 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -183,6 +183,7 @@ public void run() { if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. + log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { @@ -589,8 +590,8 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons transactionManager.removeInFlightBatch(batch); } - batch.done(response.baseOffset, response.logAppendTime, null); - this.accumulator.deallocate(batch); + if (batch.done(response.baseOffset, response.logAppendTime, null)) + this.accumulator.deallocate(batch); } private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) { @@ -625,8 +626,8 @@ private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); - batch.done(baseOffset, logAppendTime, exception); - this.accumulator.deallocate(batch); + if (batch.done(baseOffset, logAppendTime, exception)) + this.accumulator.deallocate(batch); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 41aa5c63e331c..2f89d7949e702 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.clients.producer.internals; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -33,7 +34,6 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Deque; -import java.util.Iterator; import java.util.concurrent.ExecutionException; import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; @@ -64,15 +64,20 @@ public void testChecksumNullForMagicV2() { @Test public void testBatchAbort() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); KafkaException exception = new KafkaException(); batch.abort(exception); assertTrue(future.isDone()); + assertEquals(1, callback.invocations); + assertEquals(exception, callback.exception); + assertNull(callback.metadata); // subsequent completion should be ignored - batch.done(500L, 2342342341L, null); - batch.done(-1, -1, new KafkaException()); + assertFalse(batch.done(500L, 2342342341L, null)); + assertFalse(batch.done(-1, -1, new KafkaException())); + assertEquals(1, callback.invocations); assertTrue(future.isDone()); try { @@ -86,9 +91,13 @@ public void testBatchAbort() throws Exception { @Test public void testBatchCannotAbortTwice() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); KafkaException exception = new KafkaException(); batch.abort(exception); + assertEquals(1, callback.invocations); + assertEquals(exception, callback.exception); + assertNull(callback.metadata); try { batch.abort(new KafkaException()); @@ -97,6 +106,7 @@ public void testBatchCannotAbortTwice() throws Exception { // expected } + assertEquals(1, callback.invocations); assertTrue(future.isDone()); try { future.get(); @@ -109,8 +119,12 @@ public void testBatchCannotAbortTwice() throws Exception { @Test public void testBatchCannotCompleteTwice() throws Exception { ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); - FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + MockCallback callback = new MockCallback(); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now); batch.done(500L, 10L, null); + assertEquals(1, callback.invocations); + assertNull(callback.exception); + assertNotNull(callback.metadata); try { batch.done(1000L, 20L, null); @@ -166,9 +180,7 @@ public void testSplitPreservesHeaders() { for (ProducerBatch splitProducerBatch : batches) { for (RecordBatch splitBatch : splitProducerBatch.records().batches()) { - Iterator iter = splitBatch.iterator(); - while (iter.hasNext()) { - Record record = iter.next(); + for (Record record : splitBatch) { assertTrue("Header size should be 1.", record.headers().length == 1); assertTrue("Header key should be 'header-key'.", record.headers()[0].key().equals("header-key")); assertTrue("Header value should be 'header-value'.", new String(record.headers()[0].value()).equals("header-value")); @@ -260,4 +272,18 @@ public void testShouldNotAttemptAppendOnceRecordsBuilderIsClosedForAppends() { assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10], Record.EMPTY_HEADERS)); assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS, null, now + 1)); } + + private static class MockCallback implements Callback { + private int invocations = 0; + private RecordMetadata metadata; + private Exception exception; + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + invocations++; + this.metadata = metadata; + this.exception = exception; + } + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index e1ea10a76c538..458543fa42bb1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -92,7 +92,6 @@ public class SenderTest { private static final int MAX_REQUEST_SIZE = 1024 * 1024; private static final short ACKS_ALL = -1; - private static final int MAX_RETRIES = 0; private static final String CLIENT_ID = "clientId"; private static final double EPS = 0.0001; private static final int MAX_BLOCK_TIMEOUT = 1000; @@ -679,14 +678,7 @@ public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenc sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L); sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches. - - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised an error"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof RecordTooLargeException); - } + assertFutureFailure(request1, RecordTooLargeException.class); assertEquals(1, client.inFlightRequestCount()); assertEquals(-1, transactionManager.lastAckedSequence(tp0)); @@ -753,14 +745,7 @@ public void testMustNotRetryOutOfOrderSequenceForNextBatch() throws Exception { sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L); sender.run(time.milliseconds()); - assertTrue(request2.isDone()); - - try { - request2.get(); - fail("Expected an OutOfOrderSequenceException"); - } catch (ExecutionException e) { - assert e.getCause() instanceof OutOfOrderSequenceException; - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); } @Test @@ -932,13 +917,7 @@ public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws sender.run(time.milliseconds()); - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertFalse(transactionManager.hasUnresolvedSequence(tp0)); } @@ -970,13 +949,7 @@ public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatch client.blackout(node, 10); sender.run(time.milliseconds()); // now expire the first batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -1034,13 +1007,7 @@ public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws E client.blackout(node, 10); sender.run(time.milliseconds()); // now expire the first batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); // let's enqueue another batch, which should not be dequeued until the unresolved state is clear. Future request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future; @@ -1052,14 +1019,7 @@ public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws E sender.run(time.milliseconds()); // send second request sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1); sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state. - assertTrue(request2.isDone()); - - try { - request2.get(); - fail("should have failed with an exception"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof OutOfOrderSequenceException); - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); Deque batches = accumulator.batches().get(tp0); @@ -1098,13 +1058,7 @@ public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Ex sender.run(time.milliseconds()); // now expire the batch. - assertTrue(request1.isDone()); - try { - request1.get(); - fail("Should have raised timeout exception"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof TimeoutException); - } + assertFutureFailure(request1, TimeoutException.class); assertTrue(transactionManager.hasUnresolvedSequence(tp0)); assertFalse(client.hasInFlightRequests()); Deque batches = accumulator.batches().get(tp0); @@ -1410,14 +1364,7 @@ public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L); sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset - - assertTrue(request2.isDone()); - try { - request2.get(); - fail("Should have raised an OutOfOrderSequenceException"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof OutOfOrderSequenceException); - } + assertFutureFailure(request2, OutOfOrderSequenceException.class); } void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) { @@ -1463,19 +1410,56 @@ public boolean matches(AbstractRequest body) { }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised ClusterAuthorizationException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof ClusterAuthorizationException); - } + assertFutureFailure(future, ClusterAuthorizationException.class); // cluster authorization errors are fatal, so we should continue seeing it on future sends assertTrue(transactionManager.hasFatalError()); assertSendFailure(ClusterAuthorizationException.class); } + @Test + public void testCancelInFlightRequestAfterFatalError() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + // cluster authorization is a fatal error for the producer + Future future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + Future future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + + client.respond(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0)); + + sender.run(time.milliseconds()); + assertTrue(transactionManager.hasFatalError()); + assertFutureFailure(future1, ClusterAuthorizationException.class); + + sender.run(time.milliseconds()); + assertFutureFailure(future2, ClusterAuthorizationException.class); + + // Should be fine if the second response eventually returns + client.respond(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }, produceResponse(tp1, 0, Errors.NONE, 0)); + sender.run(time.milliseconds()); + } + @Test public void testUnsupportedForMessageFormatInProduceRequest() throws Exception { final long producerId = 343434L; @@ -1496,13 +1480,7 @@ public boolean matches(AbstractRequest body) { }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0)); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised UnsupportedForMessageFormat"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof UnsupportedForMessageFormatException); - } + assertFutureFailure(future, UnsupportedForMessageFormatException.class); // unsupported for message format is not a fatal error assertFalse(transactionManager.hasError()); @@ -1528,13 +1506,7 @@ public boolean matches(AbstractRequest body) { }); sender.run(time.milliseconds()); - assertTrue(future.isDone()); - try { - future.get(); - fail("Future should have raised UnsupportedVersionException"); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof UnsupportedVersionException); - } + assertFutureFailure(future, UnsupportedVersionException.class); // unsupported version errors are fatal, so we should continue seeing it on future sends assertTrue(transactionManager.hasFatalError()); @@ -1867,4 +1839,17 @@ private void prepareInitPidResponse(Errors error, long pid, short epoch) { client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch)); } + + private void assertFutureFailure(Future future, Class expectedExceptionType) + throws InterruptedException { + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised " + expectedExceptionType.getName()); + } catch (ExecutionException e) { + assertTrue("Unexpected cause " + e.getClass(), + expectedExceptionType.isAssignableFrom(e.getCause().getClass())); + } + } + } From 24c99b840f5e97d580cf93d5e56213d7ce8c155e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Sep 2017 17:24:57 -0700 Subject: [PATCH 2/3] Tweak failure message in to use cause --- .../apache/kafka/clients/producer/internals/SenderTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 458543fa42bb1..1e29f794a4e47 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -1847,8 +1847,8 @@ private void assertFutureFailure(Future future, Class ex future.get(); fail("Future should have raised " + expectedExceptionType.getName()); } catch (ExecutionException e) { - assertTrue("Unexpected cause " + e.getClass(), - expectedExceptionType.isAssignableFrom(e.getCause().getClass())); + Class causeType = e.getCause().getClass(); + assertTrue("Unexpected cause " + causeType, expectedExceptionType.isAssignableFrom(causeType)); } } From c1291c8cd462c43cc3fdba992a4324474d969f0e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Sep 2017 17:38:33 -0700 Subject: [PATCH 3/3] Some additional cleanup --- .../clients/producer/internals/SenderTest.java | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 1e29f794a4e47..ab0dffba8aaf3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.TimeoutException; @@ -345,7 +346,7 @@ public void testRetries() throws Exception { sender.run(time.milliseconds()); // resend } sender.run(time.milliseconds()); - completedWithError(future, Errors.NETWORK_EXCEPTION); + assertFutureFailure(future, NetworkException.class); } finally { m.close(); } @@ -1758,16 +1759,6 @@ public boolean matches(AbstractRequest body) { }; } - private void completedWithError(Future future, Errors error) throws Exception { - assertTrue("Request should be completed", future.isDone()); - try { - future.get(); - fail("Should have thrown an exception."); - } catch (ExecutionException e) { - assertEquals(error.exception().getClass(), e.getCause().getClass()); - } - } - private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) { ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset); Map partResp = Collections.singletonMap(tp, resp); @@ -1839,7 +1830,6 @@ private void prepareInitPidResponse(Errors error, long pid, short epoch) { client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch)); } - private void assertFutureFailure(Future future, Class expectedExceptionType) throws InterruptedException { assertTrue(future.isDone()); @@ -1848,7 +1838,7 @@ private void assertFutureFailure(Future future, Class ex fail("Future should have raised " + expectedExceptionType.getName()); } catch (ExecutionException e) { Class causeType = e.getCause().getClass(); - assertTrue("Unexpected cause " + causeType, expectedExceptionType.isAssignableFrom(causeType)); + assertTrue("Unexpected cause " + causeType.getName(), expectedExceptionType.isAssignableFrom(causeType)); } }