Skip to content

Commit

Permalink
KAFKA-3412: multiple asynchronous commits causes send failures
Browse files Browse the repository at this point in the history
Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1108 from hachikuji/KAFKA-3412
  • Loading branch information
hachikuji authored and ewencp committed Mar 22, 2016
1 parent 4f04179 commit 8d8e3aa
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 19 deletions.
Expand Up @@ -870,7 +870,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
// must return these records to users to process before being interrupted or
// auto-committing offsets
fetcher.sendFetches(metadata.fetch());
client.quickPoll();
client.quickPoll(false);
return this.interceptors == null
? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
}
Expand Down
Expand Up @@ -345,6 +345,10 @@ public void onFailure(RuntimeException e) {
cb.onComplete(offsets, e);
}
});

// ensure commit has a chance to be transmitted (without blocking on its completion)
// note that we allow delayed tasks to be executed in case heartbeats need to be sent
client.quickPoll(true);
}

/**
Expand Down
Expand Up @@ -196,10 +196,11 @@ public void poll(long timeout) {
/**
* Poll for network IO and return immediately. This will not trigger wakeups,
* nor will it execute any delayed tasks.
* @param executeDelayedTasks Whether to allow delayed task execution (true allows)
*/
public void quickPoll() {
public void quickPoll(boolean executeDelayedTasks) {
disableWakeups();
poll(0, time.milliseconds(), false);
poll(0, time.milliseconds(), executeDelayedTasks);
enableWakeups();
}

Expand Down
Expand Up @@ -627,7 +627,6 @@ public void testCommitOffsetOnly() {

AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
consumerClient.poll(0);
assertTrue(success.get());

assertEquals(100L, subscriptions.committed(tp).offset());
Expand All @@ -644,7 +643,6 @@ public void testCommitOffsetMetadata() {

AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
consumerClient.poll(0);
assertTrue(success.get());

assertEquals(100L, subscriptions.committed(tp).offset());
Expand All @@ -658,7 +656,6 @@ public void testCommitOffsetAsyncWithDefaultCallback() {
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
consumerClient.poll(0);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertNull(defaultOffsetCommitCallback.exception);
}
Expand Down Expand Up @@ -693,7 +690,6 @@ public boolean matches(ClientRequest request) {

AtomicBoolean success = new AtomicBoolean(false);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
consumerClient.poll(0);
assertTrue(success.get());
}

Expand All @@ -704,7 +700,6 @@ public void testCommitOffsetAsyncFailedWithDefaultCallback() {
coordinator.ensureCoordinatorKnown();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
consumerClient.poll(0);
assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
}
Expand All @@ -718,7 +713,6 @@ public void testCommitOffsetAsyncCoordinatorNotAvailable() {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
consumerClient.poll(0);

assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
Expand All @@ -734,7 +728,6 @@ public void testCommitOffsetAsyncNotCoordinator() {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
consumerClient.poll(0);

assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
Expand All @@ -750,7 +743,6 @@ public void testCommitOffsetAsyncDisconnected() {
MockCommitCallback cb = new MockCommitCallback();
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
consumerClient.poll(0);

assertTrue(coordinator.coordinatorUnknown());
assertEquals(1, cb.invoked);
Expand Down
24 changes: 16 additions & 8 deletions core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
Expand Up @@ -81,7 +81,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {

// shouldn't make progress until poll is invoked
Thread.sleep(10)
assertEquals(0, commitCallback.count)
assertEquals(0, commitCallback.successCount)
awaitCommitCallback(this.consumers(0), commitCallback)
}

Expand Down Expand Up @@ -330,18 +330,26 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
records
}

protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], commitCallback: CountConsumerCommitCallback): Unit = {
val startCount = commitCallback.count
protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
commitCallback: CountConsumerCommitCallback,
count: Int = 1): Unit = {
val startCount = commitCallback.successCount
val started = System.currentTimeMillis()
while (commitCallback.count == startCount && System.currentTimeMillis() - started < 10000)
while (commitCallback.successCount < startCount + count && System.currentTimeMillis() - started < 10000)
consumer.poll(50)
assertEquals(startCount + 1, commitCallback.count)
assertEquals(startCount + count, commitCallback.successCount)
}

protected class CountConsumerCommitCallback extends OffsetCommitCallback {
var count = 0

override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = count += 1
var successCount = 0
var failCount = 0

override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
if (exception == null)
successCount += 1
else
failCount += 1
}
}

protected class ConsumerAssignmentPoller(consumer: Consumer[Array[Byte], Array[Byte]],
Expand Down
Expand Up @@ -232,6 +232,21 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(nullMetadata, this.consumers(0).committed(tp))
}

@Test
def testAsyncCommit() {
val consumer = this.consumers(0)
consumer.assign(List(tp).asJava)
consumer.poll(0)

val callback = new CountConsumerCommitCallback
val count = 5
for (i <- 1 to count)
consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)

awaitCommitCallback(consumer, callback, count=count)
assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
}

@Test
def testExpandingTopicSubscriptions() {
val otherTopic = "other"
Expand Down

0 comments on commit 8d8e3aa

Please sign in to comment.