Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10337: await async commits in commitSync even if no offsets given #13678

Merged
merged 1 commit into from Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final boolean autoCommitEnabled;
private final int autoCommitIntervalMs;
private final ConsumerInterceptors<?, ?> interceptors;
// track number of async commits for which callback must be called
// package private for testing
final AtomicInteger inFlightAsyncCommits;
// track the number of pending async commits waiting on the coordinator lookup to complete
private final AtomicInteger pendingAsyncCommits;

// this collection must be thread-safe because it is modified from the response handler
Expand Down Expand Up @@ -186,6 +190,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.inFlightAsyncCommits = new AtomicInteger();
this.pendingAsyncCommits = new AtomicInteger();
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId,
Expand Down Expand Up @@ -1125,17 +1130,22 @@ public void onFailure(RuntimeException e) {

private RequestFuture<Void> doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
inFlightAsyncCommits.incrementAndGet();
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
inFlightAsyncCommits.decrementAndGet();

if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}

@Override
public void onFailure(RuntimeException e) {
inFlightAsyncCommits.decrementAndGet();

Exception commitException = e;

if (e instanceof RetriableException) {
Expand Down Expand Up @@ -1164,8 +1174,11 @@ public void onFailure(RuntimeException e) {
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();

if (offsets.isEmpty())
return true;
if (offsets.isEmpty()) {
// We guarantee that the callbacks for all commitAsync() will be invoked when
// commitSync() completes, even if the user tries to commit empty offsets.
return invokePendingAsyncCommits(timer);
}

do {
if (coordinatorUnknownAndUnreadySync(timer)) {
Expand Down Expand Up @@ -1223,6 +1236,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
}
}

private boolean invokePendingAsyncCommits(Timer timer) {
if (inFlightAsyncCommits.get() == 0) {
return true;
}

do {
ensureCoordinatorReady(timer);
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
client.poll(timer);
invokeCompletedOffsetCommitCallbacks();

if (inFlightAsyncCommits.get() == 0) {
return true;
}

timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());

return false;
}

private RequestFuture<Void> autoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
Expand All @@ -1245,7 +1278,7 @@ private RequestFuture<Void> autoCommitOffsetsAsync() {
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
return null;
return null;
}

private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
Expand Down
Expand Up @@ -570,13 +570,15 @@ public void testCommitAsyncWithUserAssignedType() {
coordinator.poll(time.timer(0));
assertTrue(coordinator.coordinatorUnknown());
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

client.respond(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
assertFalse(coordinator.coordinatorUnknown());
// after we've discovered the coordinator we should send
// out the commit request immediately
assertTrue(client.hasInFlightRequests());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
}

@Test
Expand All @@ -600,6 +602,30 @@ public void testCoordinatorNotAvailable() {
assertTrue(coordinator.coordinatorUnknown());
}

@Test
public void testEnsureCompletingAsyncCommitsWhenSyncCommitWithoutOffsets() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));

TopicPartition tp = new TopicPartition("foo", 0);
Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));

final AtomicBoolean committed = new AtomicBoolean();
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> {
committed.set(true);
});

assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail");
assertFalse(committed.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 1);

prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE);

assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed");
assertTrue(committed.get(), "expected commit callback to be invoked");
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
Expand All @@ -618,11 +644,13 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
"Unexpected exception cause type: " + (cause == null ? null : cause.getClass()));
});
}
assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests);

coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(numRequests, responses.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand Down Expand Up @@ -668,6 +696,7 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) {
coordinator.markCoordinatorUnknown("test cause");
consumerClient.pollNoWakeup();
assertTrue(asyncCallbackInvoked.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
}

@Test
Expand Down Expand Up @@ -2323,6 +2352,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)
MockCommitCallback secondCommitCallback = new MockCommitCallback();
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 2);

respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
consumerClient.pollNoWakeup();
Expand All @@ -2332,6 +2362,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error)
assertTrue(coordinator.coordinatorUnknown());
assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException);
assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand Down Expand Up @@ -2494,6 +2525,7 @@ public void testCommitOffsetMetadata() {
coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand All @@ -2503,6 +2535,7 @@ public void testCommitOffsetAsyncWithDefaultCallback() {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertNull(mockOffsetCommitCallback.exception);
Expand Down Expand Up @@ -2533,6 +2566,7 @@ public void testCommitAfterLeaveGroup() {
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
coordinator.invokeCompletedOffsetCommitCallbacks();
assertTrue(success.get());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
}

@Test
Expand All @@ -2542,6 +2576,7 @@ public void testCommitOffsetAsyncFailedWithDefaultCallback() {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
Expand All @@ -2556,6 +2591,7 @@ public void testCommitOffsetAsyncCoordinatorNotAvailable() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand All @@ -2572,6 +2608,7 @@ public void testCommitOffsetAsyncNotCoordinator() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand All @@ -2588,6 +2625,7 @@ public void testCommitOffsetAsyncDisconnected() {
MockCommitCallback cb = new MockCommitCallback();
prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();

assertTrue(coordinator.coordinatorUnknown());
Expand Down Expand Up @@ -2656,13 +2694,15 @@ public void run() {
}
};

assertEquals(coordinator.inFlightAsyncCommits.get(), 1);
thread.start();

client.waitForRequests(2, 5000);
respondToOffsetCommitRequest(singletonMap(t1p, firstOffset.offset()), Errors.NONE);
respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE);

thread.join();
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
}
Expand Down Expand Up @@ -3415,6 +3455,7 @@ public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()));
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
assertThrows(FencedInstanceIdException.class, () ->
coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE)));
}
Expand Down Expand Up @@ -3680,6 +3721,7 @@ private void receiveFencedInstanceIdException() {
prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID);

coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
coordinator.invokeCompletedOffsetCommitCallbacks();
}

Expand Down