Skip to content

Commit

Permalink
KAFKA-10337: await async commits in commitSync even if no offsets given
Browse files Browse the repository at this point in the history
The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns.
Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync().

Co-authored-by: Erik van Oosten <e.vanoosten@grons.nl>
Co-authored-by: Philip Nee <philipnee@gmail.com>
  • Loading branch information
3 people committed Jun 7, 2023
1 parent 6f19730 commit e4ba315
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
Expand Up @@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final boolean autoCommitEnabled;
private final int autoCommitIntervalMs;
private final ConsumerInterceptors<?, ?> interceptors;
// track number of async commits for which callback must be called
// package private for testing
final AtomicInteger inFlightAsyncCommits;
// track the number of pending async commits waiting on the coordinator lookup to complete
private final AtomicInteger pendingAsyncCommits;

// this collection must be thread-safe because it is modified from the response handler
Expand Down Expand Up @@ -186,6 +190,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.inFlightAsyncCommits = new AtomicInteger();
this.pendingAsyncCommits = new AtomicInteger();
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
Expand Down Expand Up @@ -1125,17 +1130,22 @@ public void onFailure(RuntimeException e) {

private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
inFlightAsyncCommits.incrementAndGet();
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
inFlightAsyncCommits.decrementAndGet();

if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}

@Override
public void onFailure(RuntimeException e) {
inFlightAsyncCommits.decrementAndGet();

Exception commitException = e;

if (e instanceof RetriableException) {
Expand Down Expand Up @@ -1164,8 +1174,11 @@ public void onFailure(RuntimeException e) {
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())
return true;
if (offsets.isEmpty()) {
// We guarantee that the callbacks for all commitAsync() will be invoked when
// commitSync() completes, even if the user tries to commit empty offsets.
return invokePendingAsyncCommits(timer);
}

do {
if (coordinatorUnknownAndUnreadySync(timer)) {
Expand Down Expand Up @@ -1223,6 +1236,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
}
}

private boolean invokePendingAsyncCommits(Timer timer) {
if (inFlightAsyncCommits.get() == 0) {
return true;
}

do {
ensureCoordinatorReady(timer);
client.poll(timer);
invokeCompletedOffsetCommitCallbacks();

if (inFlightAsyncCommits.get() == 0) {
return true;
}

timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());

return false;
}

private RequestFuture<Void> autoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
Expand All @@ -1245,7 +1278,7 @@ private RequestFuture<Void> autoCommitOffsetsAsync() {
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
return null;
return null;
}

private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
Expand Down
Expand Up @@ -570,13 +570,15 @@ public void testCommitAsyncWithUserAssignedType() {
coordinator.poll(time.timer(0));
assertTrue(coordinator.coordinatorUnknown());
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

client.respond(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
assertFalse(coordinator.coordinatorUnknown());
// after we've discovered the coordinator we should send
// out the commit request immediately
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
}

@Test
Expand All @@ -600,6 +602,30 @@ public void testCoordinatorNotAvailable() {
assertTrue(coordinator.coordinatorUnknown());
}

@Test
public void testEnsureCompletingAsyncCommitsWhenSyncCommitWithoutOffsets() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

TopicPartition tp = new TopicPartition("foo", 0);
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));

final AtomicBoolean committed = new AtomicBoolean();
coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> {
committed.set(true);
});

assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail");
assertFalse(committed.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);

prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE);

assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed");
assertTrue(committed.get(), "expected commit callback to be invoked");
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand All @@ -618,11 +644,13 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
"Unexpected exception cause type: " + (cause == null ? null : cause.getClass()));
});
}
assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests);

coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(numRequests, responses.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand Down Expand Up @@ -668,6 +696,7 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) {
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
assertTrue(asyncCallbackInvoked.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand Down Expand Up @@ -2323,6 +2352,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)
MockCommitCallback secondCommitCallback = new MockCommitCallback();
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 2);

respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
consumerClient.pollNoWakeup();
Expand All @@ -2332,6 +2362,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)
assertTrue(coordinator.coordinatorUnknown());
assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException);
assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand Down Expand Up @@ -2494,6 +2525,7 @@ public void testCommitOffsetMetadata() {
coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand All @@ -2503,6 +2535,7 @@ public void testCommitOffsetAsyncWithDefaultCallback() {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertNull(mockOffsetCommitCallback.exception);
Expand Down Expand Up @@ -2533,6 +2566,7 @@ public void testCommitAfterLeaveGroup() {
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand All @@ -2542,6 +2576,7 @@ public void testCommitOffsetAsyncFailedWithDefaultCallback() {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
Expand All @@ -2556,6 +2591,7 @@ public void testCommitOffsetAsyncCoordinatorNotAvailable() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand All @@ -2572,6 +2608,7 @@ public void testCommitOffsetAsyncNotCoordinator() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand All @@ -2588,6 +2625,7 @@ public void testCommitOffsetAsyncDisconnected() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand Down Expand Up @@ -2656,13 +2694,15 @@ public void run() {
}
};

assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
thread.start();

client.waitForRequests(2, 5000);
respondToOffsetCommitRequest(singletonMap(t1p, firstOffset.offset()), Errors.NONE);
respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE);

thread.join();
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
}
Expand Down Expand Up @@ -3415,6 +3455,7 @@ public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()));
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
}
Expand Down Expand Up @@ -3680,6 +3721,7 @@ private void receiveFencedInstanceIdException() {
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);

coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
}

Expand Down

0 comments on commit e4ba315

Please sign in to comment.