Skip to content

Commit

Permalink
KAFKA-16903: Consider produce error of different task (apache#16222)
Browse files Browse the repository at this point in the history
A task does not know anything about a produce error thrown
by a different task. That might lead to a InvalidTxnStateException
when a task attempts to do a transactional operation on a producer
that failed due to a different task.

This commit stores the produce exception in the streams producer
on completion of a send instead of the record collector since the
record collector is on task level whereas the stream producer
is on stream thread level. Since all tasks use the same streams
producer the error should be correctly propagated across tasks
of the same stream thread.

For EOS alpha, this commit does not change anything because
each task uses its own producer. The send error is still
on task level but so is also the transaction.

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
cadonna authored Jun 6, 2024
1 parent 7d832cf commit 8a2bc3a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Sensor droppedRecordsSensor;
private final Map<String, Sensor> producedSensorByTopic = new HashMap<>();

private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
private final AtomicReference<KafkaException> sendException;

/**
* @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn)
Expand All @@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext,
this.log = logContext.logger(getClass());
this.taskId = taskId;
this.streamsProducer = streamsProducer;
this.sendException = streamsProducer.sendException();
this.productionExceptionHandler = productionExceptionHandler;
this.eosEnabled = streamsProducer.eosEnabled();
this.streamsMetrics = streamsMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class StreamsProducer {
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private double oldProducerTotalBlockedTime = 0;
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);

public StreamsProducer(final StreamsConfig config,
final String threadId,
Expand Down Expand Up @@ -254,6 +256,10 @@ private void maybeBeginTransaction() {
}
}

AtomicReference<KafkaException> sendException() {
return sendException;
}

Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record,
final Callback callback) {
maybeBeginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -754,7 +755,7 @@ public void shouldForwardFlushToStreamsProducer() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(false);
doNothing().when(streamsProducer).flush();

when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
final ProcessorTopology topology = mock(ProcessorTopology.class);
when(topology.sinkTopics()).thenReturn(Collections.emptySet());

Expand All @@ -774,6 +775,7 @@ public void shouldForwardFlushToStreamsProducer() {
public void shouldForwardFlushToStreamsProducerEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
doNothing().when(streamsProducer).flush();
final ProcessorTopology topology = mock(ProcessorTopology.class);

Expand Down Expand Up @@ -802,6 +804,7 @@ public void shouldClearOffsetsOnCloseDirty() {
private void shouldClearOffsetsOnClose(final boolean clean) {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
final long offset = 1234L;
final RecordMetadata metadata = new RecordMetadata(
new TopicPartition(topic, 0),
Expand Down Expand Up @@ -853,7 +856,7 @@ private void shouldClearOffsetsOnClose(final boolean clean) {
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);

when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
final ProcessorTopology topology = mock(ProcessorTopology.class);

final RecordCollector collector = new RecordCollectorImpl(
Expand All @@ -872,8 +875,8 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
doNothing().when(streamsProducer).abortTransaction();

final ProcessorTopology topology = mock(ProcessorTopology.class);

final RecordCollector collector = new RecordCollectorImpl(
Expand Down Expand Up @@ -1514,6 +1517,64 @@ public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
}
}

@Test
public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 1);
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
when(streamsProducer.send(any(), any())).thenAnswer(
invocation -> {
final Callback callback = invocation.getArgument(1);
callback.onCompletion(null, new ProducerFencedException("KABOOM!"));
return null;
}
);
final RecordCollector collector1 = new RecordCollectorImpl(
logContext,
taskId1,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector1.initialize();
final RecordCollector collector2 = new RecordCollectorImpl(
logContext,
taskId2,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector2.initialize();
collector1.send(
topic,
"key",
"val",
null,
0,
null,
stringSerializer,
stringSerializer,
sinkNodeName,
context
);
assertThrows(StreamsException.class, () -> collector2.send(
topic,
"key",
"val",
null,
1,
null,
stringSerializer,
stringSerializer,
sinkNodeName,
context
));
}

private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) {
return new RecordCollectorImpl(
logContext,
Expand Down

0 comments on commit 8a2bc3a

Please sign in to comment.