Skip to content

Commit

Permalink
KAFKA-16699: Have Streams treat InvalidPidMappingException like a Pro…
Browse files Browse the repository at this point in the history
…ducerFencedException (#15919)

KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. InvalidPidMappingException should be treated the same way.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Justine Olshan <jolshan@confluent.io>
  • Loading branch information
wcarlson5 committed May 14, 2024
1 parent df5735d commit 57d30d3
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
Expand Down Expand Up @@ -296,6 +297,7 @@ private void recordSendError(final String topic, final Exception exception, fina
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
sendException.set(new StreamsException(errorMessage, exception));
} else if (exception instanceof ProducerFencedException ||
exception instanceof InvalidPidMappingException ||
exception instanceof InvalidProducerEpochException ||
exception instanceof OutOfOrderSequenceException) {
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -239,7 +240,7 @@ private void maybeBeginTransaction() {
try {
producer.beginTransaction();
transactionInFlight = true;
} catch (final ProducerFencedException | InvalidProducerEpochException error) {
} catch (final ProducerFencedException | InvalidProducerEpochException | InvalidPidMappingException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to begin a new transaction"),
error
Expand Down Expand Up @@ -278,6 +279,7 @@ Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record,

private static boolean isRecoverable(final KafkaException uncaughtException) {
return uncaughtException.getCause() instanceof ProducerFencedException ||
uncaughtException.getCause() instanceof InvalidPidMappingException ||
uncaughtException.getCause() instanceof InvalidProducerEpochException ||
uncaughtException.getCause() instanceof UnknownProducerIdException;
}
Expand All @@ -299,7 +301,7 @@ protected void commitTransaction(final Map<TopicPartition, OffsetAndMetadata> of
producer.sendOffsetsToTransaction(offsets, maybeDowngradedGroupMetadata);
producer.commitTransaction();
transactionInFlight = false;
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException error) {
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException | InvalidPidMappingException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to commit a transaction"),
error
Expand Down Expand Up @@ -333,7 +335,7 @@ void abortTransaction() {
" Will rely on broker to eventually abort the transaction after the transaction timeout passed.",
logAndSwallow
);
} catch (final ProducerFencedException | InvalidProducerEpochException error) {
} catch (final ProducerFencedException | InvalidProducerEpochException | InvalidPidMappingException error) {
// The producer is aborting the txn when there's still an ongoing one,
// which means that we did not commit the task while closing it, which
// means that it is a dirty close. Therefore it is possible that the dirty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
Expand Down Expand Up @@ -1055,6 +1056,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedIn
testThrowTaskMigratedExceptionOnSubsequentSend(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidPidMappingInCallback() {
testThrowTaskMigratedExceptionOnSubsequentSend(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidEpochInCallback() {
testThrowTaskMigratedExceptionOnSubsequentSend(new InvalidProducerEpochException("KABOOM!"));
Expand Down Expand Up @@ -1085,6 +1091,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedI
testThrowTaskMigratedExceptionOnSubsequentFlush(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidPidMappingInCallback() {
testThrowTaskMigratedExceptionOnSubsequentFlush(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidEpochInCallback() {
testThrowTaskMigratedExceptionOnSubsequentFlush(new InvalidProducerEpochException("KABOOM!"));
Expand Down Expand Up @@ -1112,6 +1123,11 @@ public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedI
testThrowTaskMigratedExceptionOnSubsequentClose(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidPidMappingInCallback() {
testThrowTaskMigratedExceptionOnSubsequentClose(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidEpochInCallback() {
testThrowTaskMigratedExceptionOnSubsequentClose(new InvalidProducerEpochException("KABOOM!"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand Down Expand Up @@ -1646,8 +1647,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr
assertThat(producer.commitCount(), equalTo(1L));
}

@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception {
private void testThrowingDurringCommitTransactionException(final RuntimeException e) throws InterruptedException {
final StreamsConfig config = new StreamsConfig(configProps(true));
thread = createStreamThread(CLIENT_ID, config);

Expand Down Expand Up @@ -1683,13 +1683,22 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi
TestUtils.waitForCondition(() -> !producer.uncommittedRecords().isEmpty(), "Processing threads to process record");
}

producer.commitTransactionException = new ProducerFencedException("Producer is fenced");
producer.commitTransactionException = e;
assertThrows(TaskMigratedException.class, () -> thread.rebalanceListener().onPartitionsRevoked(assignedPartitions));
assertFalse(producer.transactionCommitted());
assertFalse(producer.closed());
assertEquals(1, thread.readOnlyActiveTasks().size());
}

@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks() throws Exception {
testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced"));
}
@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks() throws Exception {
testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"));
}

@Test
public void shouldReinitializeRevivedTasksInAnyState() throws Exception {
final StreamsConfig config = new StreamsConfig(configProps(false));
Expand Down Expand Up @@ -1807,8 +1816,7 @@ public void shouldReinitializeRevivedTasksInAnyState() throws Exception {
}
}

@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e) {
// only have source but no sink so that we would not get fenced in producer.send
internalTopologyBuilder.addSource(null, "source", null, null, null, topic1);

Expand Down Expand Up @@ -1840,7 +1848,7 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi
assertThat(thread.readOnlyActiveTasks().size(), equalTo(1));
final MockProducer<byte[], byte[]> producer = clientSupplier.producers.get(0);

producer.commitTransactionException = new ProducerFencedException("Producer is fenced");
producer.commitTransactionException = e;
mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 1, new byte[0], new byte[0]));
assertThrows(TaskMigratedException.class,
Expand All @@ -1862,6 +1870,16 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommi
assertEquals(1, thread.readOnlyActiveTasks().size());
}

@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting() {
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"));
}

@Test
public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting() {
testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"));
}

@Test
public void shouldNotCloseTaskProducerWhenSuspending() throws Exception {
final StreamsConfig config = new StreamsConfig(configProps(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -880,6 +881,11 @@ public void shouldThrowTaskMigratedExceptionOnEosSendProducerFenced() {
testThrowTaskMigratedExceptionOnEosSend(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnEosSendPInvalidPidMapping() {
testThrowTaskMigratedExceptionOnEosSend(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnEosSendInvalidEpoch() {
testThrowTaskMigratedExceptionOnEosSend(new InvalidProducerEpochException("KABOOM!"));
Expand Down Expand Up @@ -928,6 +934,12 @@ public void shouldThrowTaskMigrateExceptionOnEosSendOffsetProducerFenced() {
testThrowTaskMigrateExceptionOnEosSendOffset(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidPidMapping() {
// cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()`
testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidEpoch() {
// cannot use `eosMockProducer.fenceProducer()` because this would already trigger in `beginTransaction()`
Expand Down Expand Up @@ -990,6 +1002,11 @@ public void shouldThrowTaskMigratedExceptionOnEosCommitWithProducerFenced() {
testThrowTaskMigratedExceptionOnEos(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidPidMapping() {
testThrowTaskMigratedExceptionOnEos(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidEpoch() {
testThrowTaskMigratedExceptionOnEos(new InvalidProducerEpochException("KABOOM!"));
Expand Down Expand Up @@ -1048,6 +1065,11 @@ public void shouldSwallowExceptionOnEosAbortTxProducerFenced() {
testSwallowExceptionOnEosAbortTx(new ProducerFencedException("KABOOM!"));
}

@Test
public void shouldSwallowExceptionOnEosAbortTxInvalidPidMapping() {
testSwallowExceptionOnEosAbortTx(new InvalidPidMappingException("KABOOM!"));
}

@Test
public void shouldSwallowExceptionOnEosAbortTxInvalidEpoch() {
testSwallowExceptionOnEosAbortTx(new InvalidProducerEpochException("KABOOM!"));
Expand Down

0 comments on commit 57d30d3

Please sign in to comment.