From e4ba315ecbf0878706de1e1b620911896a994bed Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Sun, 2 Aug 2020 04:28:33 +0200 Subject: [PATCH] KAFKA-10337: await async commits in commitSync even if no offsets given The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns. Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync(). Co-authored-by: Erik van Oosten Co-authored-by: Philip Nee --- .../internals/ConsumerCoordinator.java | 39 +++++++++++++++-- .../internals/ConsumerCoordinatorTest.java | 42 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 76b37d380e46..bcd4b377881e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final boolean autoCommitEnabled; private final int autoCommitIntervalMs; private final ConsumerInterceptors interceptors; + // track number of async commits for which callback must be called + // package private for testing + final AtomicInteger inFlightAsyncCommits; + // track the number of pending async commits waiting on the coordinator lookup to complete private final AtomicInteger pendingAsyncCommits; // this collection must be thread-safe because it is modified from the response handler @@ -186,6 +190,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.completedOffsetCommits = new ConcurrentLinkedQueue<>(); this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; + this.inFlightAsyncCommits = new AtomicInteger(); this.pendingAsyncCommits = new AtomicInteger(); this.asyncCommitFenced = new AtomicBoolean(false); this.groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, @@ -1125,10 +1130,13 @@ public void onFailure(RuntimeException e) { private RequestFuture doCommitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { RequestFuture future = sendOffsetCommitRequest(offsets); + inFlightAsyncCommits.incrementAndGet(); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener() { @Override public void onSuccess(Void value) { + inFlightAsyncCommits.decrementAndGet(); + if (interceptors != null) interceptors.onCommit(offsets); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); @@ -1136,6 +1144,8 @@ public void onSuccess(Void value) { @Override public void onFailure(RuntimeException e) { + inFlightAsyncCommits.decrementAndGet(); + Exception commitException = e; if (e instanceof RetriableException) { @@ -1164,8 +1174,11 @@ public void onFailure(RuntimeException e) { public boolean commitOffsetsSync(Map offsets, Timer timer) { invokeCompletedOffsetCommitCallbacks(); - if (offsets.isEmpty()) - return true; + if (offsets.isEmpty()) { + // We guarantee that the callbacks for all commitAsync() will be invoked when + // commitSync() completes, even if the user tries to commit empty offsets. + return invokePendingAsyncCommits(timer); + } do { if (coordinatorUnknownAndUnreadySync(timer)) { @@ -1223,6 +1236,26 @@ public void maybeAutoCommitOffsetsAsync(long now) { } } + private boolean invokePendingAsyncCommits(Timer timer) { + if (inFlightAsyncCommits.get() == 0) { + return true; + } + + do { + ensureCoordinatorReady(timer); + client.poll(timer); + invokeCompletedOffsetCommitCallbacks(); + + if (inFlightAsyncCommits.get() == 0) { + return true; + } + + timer.sleep(rebalanceConfig.retryBackoffMs); + } while (timer.notExpired()); + + return false; + } + private RequestFuture autoCommitOffsetsAsync() { Map allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets); @@ -1245,7 +1278,7 @@ private RequestFuture autoCommitOffsetsAsync() { private RequestFuture maybeAutoCommitOffsetsAsync() { if (autoCommitEnabled) return autoCommitOffsetsAsync(); - return null; + return null; } private class DefaultOffsetCommitCallback implements OffsetCommitCallback { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index f3c8026dff34..d3a23b1a0fa2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -570,6 +570,7 @@ public void testCommitAsyncWithUserAssignedType() { coordinator.poll(time.timer(0)); assertTrue(coordinator.coordinatorUnknown()); assertTrue(client.hasInFlightRequests()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); client.respond(groupCoordinatorResponse(node, Errors.NONE)); coordinator.poll(time.timer(0)); @@ -577,6 +578,7 @@ public void testCommitAsyncWithUserAssignedType() { // after we've discovered the coordinator we should send // out the commit request immediately assertTrue(client.hasInFlightRequests()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 1); } @Test @@ -600,6 +602,30 @@ public void testCoordinatorNotAvailable() { assertTrue(coordinator.coordinatorUnknown()); } + @Test + public void testEnsureCompletingAsyncCommitsWhenSyncCommitWithoutOffsets() { + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + TopicPartition tp = new TopicPartition("foo", 0); + Map offsets = singletonMap(tp, new OffsetAndMetadata(123)); + + final AtomicBoolean committed = new AtomicBoolean(); + coordinator.commitOffsetsAsync(offsets, (committedOffsets, exception) -> { + committed.set(true); + }); + + assertFalse(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(100L)), "expected sync commit to fail"); + assertFalse(committed.get()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 1); + + prepareOffsetCommitRequest(singletonMap(tp, 123L), Errors.NONE); + + assertTrue(coordinator.commitOffsetsSync(Collections.emptyMap(), time.timer(Long.MAX_VALUE)), "expected sync commit to succeed"); + assertTrue(committed.get(), "expected commit callback to be invoked"); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); + } + @Test public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -618,11 +644,13 @@ public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() { "Unexpected exception cause type: " + (cause == null ? null : cause.getClass())); }); } + assertEquals(coordinator.inFlightAsyncCommits.get(), numRequests); coordinator.markCoordinatorUnknown("test cause"); consumerClient.pollNoWakeup(); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(numRequests, responses.get()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); } @Test @@ -668,6 +696,7 @@ public void onFailure(RuntimeException e, RequestFuture future) { coordinator.markCoordinatorUnknown("test cause"); consumerClient.pollNoWakeup(); assertTrue(asyncCallbackInvoked.get()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); } @Test @@ -2323,6 +2352,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) MockCommitCallback secondCommitCallback = new MockCommitCallback(); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback); + assertEquals(coordinator.inFlightAsyncCommits.get(), 2); respondToOffsetCommitRequest(singletonMap(t1p, 100L), error); consumerClient.pollNoWakeup(); @@ -2332,6 +2362,7 @@ private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) assertTrue(coordinator.coordinatorUnknown()); assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException); assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); } @Test @@ -2494,6 +2525,7 @@ public void testCommitOffsetMetadata() { coordinator.commitOffsetsAsync(offsets, callback(offsets, success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); } @Test @@ -2503,6 +2535,7 @@ public void testCommitOffsetAsyncWithDefaultCallback() { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); assertNull(mockOffsetCommitCallback.exception); @@ -2533,6 +2566,7 @@ public void testCommitAfterLeaveGroup() { coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success)); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(success.get()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); } @Test @@ -2542,6 +2576,7 @@ public void testCommitOffsetAsyncFailedWithDefaultCallback() { coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException); @@ -2556,6 +2591,7 @@ public void testCommitOffsetAsyncCoordinatorNotAvailable() { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2572,6 +2608,7 @@ public void testCommitOffsetAsyncNotCoordinator() { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2588,6 +2625,7 @@ public void testCommitOffsetAsyncDisconnected() { MockCommitCallback cb = new MockCommitCallback(); prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L)); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); assertTrue(coordinator.coordinatorUnknown()); @@ -2656,6 +2694,7 @@ public void run() { } }; + assertEquals(coordinator.inFlightAsyncCommits.get(), 1); thread.start(); client.waitForRequests(2, 5000); @@ -2663,6 +2702,7 @@ public void run() { respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE); thread.join(); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets); } @@ -3415,6 +3455,7 @@ public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() { assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException); assertThrows(FencedInstanceIdException.class, () -> coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback())); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); assertThrows(FencedInstanceIdException.class, () -> coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), time.timer(Long.MAX_VALUE))); } @@ -3680,6 +3721,7 @@ private void receiveFencedInstanceIdException() { prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.FENCED_INSTANCE_ID); coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), new MockCommitCallback()); + assertEquals(coordinator.inFlightAsyncCommits.get(), 0); coordinator.invokeCompletedOffsetCommitCallbacks(); }