Skip to content

Commit

Permalink
KAFKA-4473: RecordCollector should handle retriable exceptions more s…
Browse files Browse the repository at this point in the history
…trictly

The `RecordCollectorImpl` currently drops messages on the floor if an exception is non-null in the producer callback. This will result in message loss and violates at-least-once processing.
Rather than just log an error in the callback, save the exception in a field. On subsequent calls to `send`, `flush`, `close`, first check for the existence of an exception and throw a `StreamsException` if it is non-null. Also, in the callback, if an exception has already occurred, the `offsets` map should not be updated.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2249 from dguy/kafka-4473
  • Loading branch information
dguy authored and guozhangwang committed Dec 20, 2016
1 parent a5c15ba commit 0321bf5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
Expand Up @@ -44,6 +44,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private volatile Exception sendException;


public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
Expand All @@ -60,6 +61,7 @@ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<K, V> partitioner) {
checkForException();
byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
Integer partition = record.partition();
Expand All @@ -79,10 +81,14 @@ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
if (sendException != null) {
return;
}
TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
offsets.put(tp, metadata.offset());
} else {
log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
sendException = exception;
log.error("{} Error sending record to topic {}. No more offsets will be recorded for this task and the exception will eventually be thrown", logPrefix, topic, exception);
}
}
});
Expand All @@ -98,10 +104,17 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
}
}

private void checkForException() {
if (sendException != null) {
throw new StreamsException(String.format("%s exception caught when producing", logPrefix), sendException);
}
}

@Override
public void flush() {
log.debug("{} Flushing producer", logPrefix);
this.producer.flush();
checkForException();
}

/**
Expand All @@ -110,6 +123,7 @@ public void flush() {
@Override
public void close() {
producer.close();
checkForException();
}

/**
Expand Down
Expand Up @@ -161,4 +161,53 @@ public synchronized Future<RecordMetadata> send(final ProducerRecord record, fin
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);

}

@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
}

@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.flush();
}

@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
final RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
callback.onCompletion(null, new Exception());
return null;
}
},
"test");
collector.send(new ProducerRecord<>("topic1", "3", "0"), stringSerializer, stringSerializer, streamPartitioner);
collector.close();
}

}

0 comments on commit 0321bf5

Please sign in to comment.