Skip to content

Commit

Permalink
spring-projectsGH-62; commitSync() By Default
Browse files Browse the repository at this point in the history
Fixes spring-projects#62
Resolves spring-projects#72

See the discussion on spring-projectsGH-62 `commitAsync()` is not currenly reliable.
Use `commitSync()` by default; add `syncCommits` property to the containers
(default true).

Also allow a user-injected commit callback (spring-projectsGH-72)
  • Loading branch information
garyrussell committed May 12, 2016
1 parent 964beb6 commit d93d91b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.regex.Pattern;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.core.ConsumerFactory;
Expand Down Expand Up @@ -61,6 +62,10 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis

private ConsumerRebalanceListener consumerRebalanceListener;

private OffsetCommitCallback commitCallback;

private boolean syncCommits;

/**
* Construct an instance with the supplied configuration properties and specific
* topics/partitions - when using this constructor, {@link #setRecentOffset(long)
Expand Down Expand Up @@ -146,6 +151,26 @@ public void setConsumerRebalanceListener(ConsumerRebalanceListener consumerRebal
this.consumerRebalanceListener = consumerRebalanceListener;
}

/**
* Set the commit callback; by default a simple logging callback is used to
* log success at DEBUG level and failures at ERROR level.
* @param commitCallback the callback.
*/
public void setCommitCallback(OffsetCommitCallback commitCallback) {
this.commitCallback = commitCallback;
}

/**
* Set whether or not to call consumer.commitSync() or commitAsync() when
* the container is responsible for commits. Default true. See
* https://github.com/spring-projects/spring-kafka/issues/62
* At the time of writing, async commits are not entirely reliable.
* @param syncCommits true to use commitSync().
*/
public void setSyncCommits(boolean syncCommits) {
this.syncCommits = syncCommits;
}

/**
* Return the list of {@link KafkaMessageListenerContainer}s created by
* this container.
Expand Down Expand Up @@ -180,6 +205,8 @@ protected void doStart() {
container = new KafkaMessageListenerContainer<>(this.consumerFactory, this.consumerRebalanceListener,
this.topics, this.topicPattern, partitionSubset(i));
}
container.setCommitCallback(this.commitCallback);
container.setSyncCommits(this.syncCommits);
container.setAckMode(getAckMode());
container.setAckCount(getAckCount());
container.setAckTime(getAckTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener

private final TopicPartition[] partitions;

private final ConsumerRebalanceListener consumerRebalanceListener;

private ListenerConsumer listenerConsumer;

private long recentOffset;
Expand All @@ -76,7 +78,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener

private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;

private final ConsumerRebalanceListener consumerRebalanceListener;
private OffsetCommitCallback commitCallback;

private boolean syncCommits = true;

/**
* Construct an instance with the supplied configuration properties and specific
Expand Down Expand Up @@ -195,6 +199,25 @@ public void setRecentOffset(long recentOffset) {
this.recentOffset = recentOffset;
}

/**
* Set the commit callback; by default a simple logging callback is used to
* log success at DEBUG level and failures at ERROR level.
* @param commitCallback the callback.
*/
public void setCommitCallback(OffsetCommitCallback commitCallback) {
this.commitCallback = commitCallback;
}

/**
* Set whether or not to call consumer.commitSync() or commitAsync() when
* the container is responsible for commits. Default true. See
* https://github.com/spring-projects/spring-kafka/issues/62
* At the time of writing, async commits are not entirely reliable.
* @param syncCommits true to use commitSync().
*/
public void setSyncCommits(boolean syncCommits) {
this.syncCommits = syncCommits;
}

/**
* Return the {@link TopicPartition}s currently assigned to this container,
Expand Down Expand Up @@ -275,7 +298,9 @@ private class ListenerConsumer implements SchedulingAwareRunnable {

private final Log logger = LogFactory.getLog(ListenerConsumer.class);

private final CommitCallback callback = new CommitCallback();
private final OffsetCommitCallback commitCallback = KafkaMessageListenerContainer.this.commitCallback != null
? KafkaMessageListenerContainer.this.commitCallback
: new LoggingCommitCallback();

private final Consumer<K, V> consumer;

Expand All @@ -293,6 +318,8 @@ private class ListenerConsumer implements SchedulingAwareRunnable {

private final AckMode ackMode = getAckMode();

private final boolean syncCommits = KafkaMessageListenerContainer.this.syncCommits;

private Thread consumerThread;

private volatile Collection<TopicPartition> definedPartitions;
Expand Down Expand Up @@ -376,7 +403,7 @@ public void run() {
if (!this.autoCommit && this.ackMode.equals(AckMode.RECORD)) {
this.consumer.commitAsync(
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)), this.callback);
new OffsetAndMetadata(record.offset() + 1)), this.commitCallback);
}
}
if (!this.autoCommit) {
Expand Down Expand Up @@ -446,7 +473,7 @@ private void ackImmediate(final ConsumerRecord<K, V> record) {
}
if (ListenerConsumer.this.ackMode.equals(AckMode.MANUAL_IMMEDIATE)) {
ListenerConsumer.this.consumer.commitAsync(commits,
ListenerConsumer.this.callback);
ListenerConsumer.this.commitCallback);
}
else {
ListenerConsumer.this.consumer.commitSync(commits);
Expand Down Expand Up @@ -485,10 +512,15 @@ private void processCommits(final AckMode ackMode, ConsumerRecords<K, V> records
long now;
if (ackMode.equals(AckMode.BATCH)) {
if (!records.isEmpty()) {
this.consumer.commitAsync(this.callback);
if (this.syncCommits) {
this.consumer.commitSync();
}
else {
this.consumer.commitAsync(this.commitCallback);
}
}
}
else if (!ackMode.equals(AckMode.MANUAL_IMMEDIATE)) {
else if (!ackMode.equals(AckMode.MANUAL_IMMEDIATE) && !ackMode.equals(AckMode.MANUAL_IMMEDIATE_SYNC)) {
if (!ackMode.equals(AckMode.MANUAL)) {
updatePendingOffsets(records);
}
Expand Down Expand Up @@ -575,14 +607,19 @@ private void commitIfNecessary() {
this.logger.debug("Committing: " + commits);
}
if (!commits.isEmpty()) {
this.consumer.commitAsync(commits, this.callback);
if (this.syncCommits) {
this.consumer.commitSync(commits);
}
else {
this.consumer.commitAsync(commits, this.commitCallback);
}
}
}
}

private static final class CommitCallback implements OffsetCommitCallback {
private static final class LoggingCommitCallback implements OffsetCommitCallback {

private static final Log logger = LogFactory.getLog(OffsetCommitCallback.class);
private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;

import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -332,10 +333,10 @@ public void onMessage(ConsumerRecord<Integer, String> message) {
@Test
public void testManualCommit() throws Exception {
testManualCommitGuts(AckMode.MANUAL, topic4);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5);
// to be sure the commits worked ok so run the tests again and the second tests start at the committed offset.
testManualCommitGuts(AckMode.MANUAL, topic4);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE, topic5);
testManualCommitGuts(AckMode.MANUAL_IMMEDIATE_SYNC, topic5);
}

private void testManualCommitGuts(AckMode ackMode, String topic) throws Exception {
Expand Down Expand Up @@ -375,6 +376,7 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
}

@Test
@Ignore // TODO https://github.com/spring-projects/spring-kafka/issues/62 using SYNC for avoidance
public void testManualCommitExisting() throws Exception {
logger.info("Start MANUAL_IMMEDIATE with Existing");
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
Expand Down Expand Up @@ -405,6 +407,19 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
container.setConcurrency(1);
container.setAckMode(AckMode.MANUAL_IMMEDIATE);
container.setBeanName("testManualExisting");
final CountDownLatch commits = new CountDownLatch(8);
final AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>();
container.setCommitCallback(new OffsetCommitCallback() {

@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
commits.countDown();
if (exception != null) {
exceptionRef.compareAndSet(null, exception);
}
}

});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
template.send(0, "fooo");
Expand All @@ -413,6 +428,8 @@ public void onMessage(ConsumerRecord<Integer, String> message, Acknowledgment ac
template.send(2, "quxx");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(commits.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(exceptionRef.get()).isNull();
container.stop();
logger.info("Stop MANUAL_IMMEDIATE with Existing");
}
Expand Down

0 comments on commit d93d91b

Please sign in to comment.