From 57d30d3450998465177f92516a41218dbe8d4340 Mon Sep 17 00:00:00 2001 From: Walker Carlson <18128741+wcarlson5@users.noreply.github.com> Date: Tue, 14 May 2024 11:07:55 -0500 Subject: [PATCH] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException (#15919) KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. InvalidPidMappingException should be treated the same way. Reviewers: Anna Sophie Blee-Goldman , Justine Olshan --- .../internals/RecordCollectorImpl.java | 2 ++ .../processor/internals/StreamsProducer.java | 8 +++-- .../internals/RecordCollectorTest.java | 16 ++++++++++ .../processor/internals/StreamThreadTest.java | 30 +++++++++++++++---- .../internals/StreamsProducerTest.java | 22 ++++++++++++++ 5 files changed, 69 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 8abf5e897623..6b559180484a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; @@ -296,6 +297,7 @@ private void recordSendError(final String topic, final Exception exception, fina errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."; sendException.set(new StreamsException(errorMessage, exception)); } else if (exception instanceof ProducerFencedException || + exception instanceof InvalidPidMappingException || exception instanceof InvalidProducerEpochException || exception instanceof OutOfOrderSequenceException) { errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index bc8a3118fe7b..21acb407afaf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; @@ -239,7 +240,7 @@ private void maybeBeginTransaction() { try { producer.beginTransaction(); transactionInFlight = true; - } catch (final ProducerFencedException | InvalidProducerEpochException error) { + } catch (final ProducerFencedException | InvalidProducerEpochException | InvalidPidMappingException error) { throw new TaskMigratedException( formatException("Producer got fenced trying to begin a new transaction"), error @@ -278,6 +279,7 @@ Future send(final ProducerRecord record, private static boolean isRecoverable(final KafkaException uncaughtException) { return uncaughtException.getCause() instanceof ProducerFencedException || + uncaughtException.getCause() instanceof InvalidPidMappingException || uncaughtException.getCause() instanceof InvalidProducerEpochException || uncaughtException.getCause() instanceof UnknownProducerIdException; } @@ -299,7 +301,7 @@ protected void commitTransaction(final Map of producer.sendOffsetsToTransaction(offsets, maybeDowngradedGroupMetadata); producer.commitTransaction(); transactionInFlight = false; - } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException error) { + } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) { throw new TaskMigratedException( formatException("Producer got fenced trying to commit a transaction"), error @@ -333,7 +335,7 @@ void abortTransaction() { " Will rely on broker to eventually abort the transaction after the transaction timeout passed.", logAndSwallow ); - } catch (final ProducerFencedException | InvalidProducerEpochException error) { + } catch (final ProducerFencedException | InvalidProducerEpochException | InvalidPidMappingException error) { // The producer is aborting the txn when there's still an ongoing one, // which means that we did not commit the task while closing it, which // means that it is a dirty close. Therefore it is possible that the dirty diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index e1c73d579ecd..253b44f5d9ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.SerializationException; @@ -1055,6 +1056,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedIn testThrowTaskMigratedExceptionOnSubsequentSend(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidPidMappingInCallback() { + testThrowTaskMigratedExceptionOnSubsequentSend(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidEpochInCallback() { testThrowTaskMigratedExceptionOnSubsequentSend(new InvalidProducerEpochException("KABOOM!")); @@ -1085,6 +1091,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedI testThrowTaskMigratedExceptionOnSubsequentFlush(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidPidMappingInCallback() { + testThrowTaskMigratedExceptionOnSubsequentFlush(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidEpochInCallback() { testThrowTaskMigratedExceptionOnSubsequentFlush(new InvalidProducerEpochException("KABOOM!")); @@ -1112,6 +1123,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedI testThrowTaskMigratedExceptionOnSubsequentClose(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidPidMappingInCallback() { + testThrowTaskMigratedExceptionOnSubsequentClose(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidEpochInCallback() { testThrowTaskMigratedExceptionOnSubsequentClose(new InvalidProducerEpochException("KABOOM!")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 44644ba1577b..b6c28376c46b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -1646,8 +1647,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr assertThat(producer.commitCount(), equalTo(1L)); } - @Test - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception { + private void testThrowingDurringCommitTransactionException(final RuntimeException e) throws InterruptedException { final StreamsConfig config = new StreamsConfig(configProps(true)); thread = createStreamThread(CLIENT_ID, config); @@ -1683,13 +1683,22 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi TestUtils.waitForCondition(() -> !producer.uncommittedRecords().isEmpty(), "Processing threads to process record"); } - producer.commitTransactionException = new ProducerFencedException("Producer is fenced"); + producer.commitTransactionException = e; assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions)); assertFalse(producer.transactionCommitted()); assertFalse(producer.closed()); assertEquals(1, thread.readOnlyActiveTasks().size()); } + @Test + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception { + testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced")); + } + @Test + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks() throws Exception { + testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid")); + } + @Test public void shouldReinitializeRevivedTasksInAnyState() throws Exception { final StreamsConfig config = new StreamsConfig(configProps(false)); @@ -1807,8 +1816,7 @@ public void shouldReinitializeRevivedTasksInAnyState() throws Exception { } } - @Test - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() { + private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e) { // only have source but no sink so that we would not get fenced in producer.send internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); @@ -1840,7 +1848,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi assertThat(thread.readOnlyActiveTasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); - producer.commitTransactionException = new ProducerFencedException("Producer is fenced"); + producer.commitTransactionException = e; mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0])); assertThrows(TaskMigratedException.class, @@ -1862,6 +1870,16 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi assertEquals(1, thread.readOnlyActiveTasks().size()); } + @Test + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced")); + } + + @Test + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting() { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid")); + } + @Test public void shouldNotCloseTaskProducerWhenSuspending() throws Exception { final StreamsConfig config = new StreamsConfig(configProps(true)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java index 1c239f1da9e3..2cbaec7666e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidPidMappingException; import org.apache.kafka.common.errors.InvalidProducerEpochException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; @@ -880,6 +881,11 @@ public void shouldThrowTaskMigratedExceptionOnEosSendProducerFenced() { testThrowTaskMigratedExceptionOnEosSend(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigratedExceptionOnEosSendPInvalidPidMapping() { + testThrowTaskMigratedExceptionOnEosSend(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigratedExceptionOnEosSendInvalidEpoch() { testThrowTaskMigratedExceptionOnEosSend(new InvalidProducerEpochException("KABOOM!")); @@ -928,6 +934,12 @@ public void shouldThrowTaskMigrateExceptionOnEosSendOffsetProducerFenced() { testThrowTaskMigrateExceptionOnEosSendOffset(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidPidMapping() { + // cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()` + testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidEpoch() { // cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()` @@ -990,6 +1002,11 @@ public void shouldThrowTaskMigratedExceptionOnEosCommitWithProducerFenced() { testThrowTaskMigratedExceptionOnEos(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidPidMapping() { + testThrowTaskMigratedExceptionOnEos(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidEpoch() { testThrowTaskMigratedExceptionOnEos(new InvalidProducerEpochException("KABOOM!")); @@ -1048,6 +1065,11 @@ public void shouldSwallowExceptionOnEosAbortTxProducerFenced() { testSwallowExceptionOnEosAbortTx(new ProducerFencedException("KABOOM!")); } + @Test + public void shouldSwallowExceptionOnEosAbortTxInvalidPidMapping() { + testSwallowExceptionOnEosAbortTx(new InvalidPidMappingException("KABOOM!")); + } + @Test public void shouldSwallowExceptionOnEosAbortTxInvalidEpoch() { testSwallowExceptionOnEosAbortTx(new InvalidProducerEpochException("KABOOM!"));