Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16933: New consumer unsubscribe close commit fixes #16272

Merged
merged 16 commits into from
Jun 14, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -1278,12 +1278,10 @@ void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstEx
autoCommitSync(timer);

applicationEventHandler.add(new CommitOnCloseEvent());
completeQuietly(
() -> {
maybeRevokePartitions();
applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer)));
},
"Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
completeQuietly(() -> maybeRevokePartitions(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the point here seems to be, if maybeRevokePartitions fails, likely due to rebalancelistener, still leave the group.

It looks like a good change to me. I'm just surprised to see that the legacy consumer does not seem to do this? If onPrepareLeave in AbstractCoordinator fails, we won't reach maybeLeaveGroup.

So is this a bug also in the legacy consumer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got it right, in this case the legacy consumer won't send a leave group either. We discussed it with @dajac and it seemed right to ensure the new consumer leaves on close (even if the callbacks fail). No changes on the legacy for now though (would require more thought).

"Failed to execute callback to release assignment", firstException);
completeQuietly(() -> applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer))),
"Failed to send leave group heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
}

// Visible for testing
Expand Down Expand Up @@ -1324,6 +1322,7 @@ void completeQuietly(final Utils.ThrowingRunnable function,
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could complete.", msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update firstException here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally, good catch.

} catch (Exception e) {
log.error(msg, e);
firstException.compareAndSet(null, e);
}
}
Expand Down Expand Up @@ -1502,7 +1501,8 @@ public void unsubscribe() {
Timer timer = time.timer(Long.MAX_VALUE);
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
log.info("Unsubscribing all topics or patterns and assigned partitions");
log.info("Unsubscribing all topics or patterns and assigned partitions {}",
subscriptions.assignedPartitions());

try {
processBackgroundEvents(unsubscribeEvent.future(), timer);
Expand All @@ -1512,7 +1512,9 @@ public void unsubscribe() {
}
resetGroupMetadata();
}
subscriptions.unsubscribe();
} catch (Exception e) {
log.error("Unsubscribe failed", e);
throw e;
} finally {
release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public class CommitRequestManager implements RequestManager, MemberStateListener
final PendingRequests pendingRequests;
private boolean closing = false;

/**
* Last member epoch sent in a commit request. Empty if no epoch was included in the last
* request. Used for logging.
*/
private Optional<Integer> lastEpochSentOnCommit;

/**
* Latest member ID and epoch received via the {@link #onMemberEpochUpdated(Optional, Optional)},
* to be included in the OffsetFetch and OffsetCommit requests if present. This will have
Expand Down Expand Up @@ -156,6 +162,7 @@ public CommitRequestManager(
this.memberInfo = new MemberInfo();
this.metricsManager = new OffsetCommitMetricsManager(metrics);
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.lastEpochSentOnCommit = Optional.empty();
}

/**
Expand Down Expand Up @@ -330,7 +337,8 @@ private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
log.debug("Auto-commit sync before revocation failed because topic or partition were deleted");
result.completeExceptionally(error);
} else {
// Make sure the auto-commit is retries with the latest offsets
// Make sure the auto-commit is retried with the latest offsets
log.debug("Retrying auto-commit of latest offsets after receiving retriable error {}", error.getMessage());
requestAttempt.offsets = subscriptions.allConsumed();
requestAttempt.resetFuture();
autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
Expand Down Expand Up @@ -567,6 +575,10 @@ private void handleCoordinatorDisconnect(Throwable exception, long currentTimeMs
*/
@Override
public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) {
if (!memberEpoch.isPresent() && memberInfo.memberEpoch.isPresent()) {
log.debug("Member {} has left the group so it won't include member epoch on the " +
"offset commit/fetch requests anymore.", memberInfo.memberId.orElse("unknown"));
}
memberInfo.memberId = memberId;
memberInfo.memberEpoch = memberEpoch;
}
Expand Down Expand Up @@ -679,6 +691,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
}
if (memberInfo.memberEpoch.isPresent()) {
data = data.setGenerationIdOrMemberEpoch(memberInfo.memberEpoch.get());
lastEpochSentOnCommit = Optional.of(memberInfo.memberEpoch.get());
}

OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
Expand Down Expand Up @@ -740,6 +753,8 @@ public void onResponse(final ClientResponse response) {
"failed with unknown member ID. " + error.message()));
return;
} else if (error == Errors.STALE_MEMBER_EPOCH) {
log.error("OffsetCommit failed with stale member epoch error. Last epoch sent: {}",
lastEpochSentOnCommit.isPresent() ? lastEpochSentOnCommit.get() : "None");
future.completeExceptionally(error.exception());
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public void resetPollTimer(final long pollMs) {
pollTimer.update(pollMs);
if (pollTimer.isExpired()) {
logger.warn("Time between subsequent calls to poll() was longer than the configured " +
"max.poll.interval.ms, exceeded approximately by {} ms.", pollTimer.isExpiredBy());
"max.poll.interval.ms, exceeded approximately by {} ms. Member {} will rejoin the group now.",
pollTimer.isExpiredBy(), membershipManager.memberId());
membershipManager.maybeRejoinStaleMember();
}
pollTimer.reset(maxPollIntervalMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,15 @@ private void replaceTargetAssignmentWithNewAssignment(
@Override
public void transitionToFenced() {
if (state == MemberState.PREPARE_LEAVING) {
log.debug("Member {} with epoch {} got fenced but it is already preparing to leave " +
log.info("Member {} with epoch {} got fenced but it is already preparing to leave " +
"the group, so it will stop sending heartbeat and won't attempt to rejoin.",
memberId, memberEpoch);
// Transition to UNSUBSCRIBED, ensuring that the member (that is not part of the
// group anymore from the broker point of view) will stop sending heartbeats while it
// completes the ongoing leaving operation.
// Briefly transition to LEAVING to ensure all required actions are applied even
// though there is no need to send a leave group heartbeat (ex. clear epoch and
// notify epoch listeners). Then transition to UNSUBSCRIBED, ensuring that the member
// (that is not part of the group anymore from the broker point of view) will stop
// sending heartbeats while it completes the ongoing leaving operation.
transitionToSendingLeaveGroup(false);
transitionTo(MemberState.UNSUBSCRIBED);
return;
}
Expand Down Expand Up @@ -664,6 +667,7 @@ public CompletableFuture<Void> leaveGroup() {
if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) {
// Member already leaving. No-op and return existing leave group future that will
// complete when the ongoing leave operation completes.
log.debug("Leave group operation already in progress for member {}", memberId);
return leaveGroupInProgress.get();
}

Expand All @@ -673,7 +677,15 @@ public CompletableFuture<Void> leaveGroup() {

CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment();
callbackResult.whenComplete((result, error) -> {
if (error != null) {
log.error("Member {} callback to release assignment failed. Member will proceed " +
"to send leave group heartbeat", memberId, error);
} else {
log.debug("Member {} completed callback to release assignment and will send leave " +
"group heartbeat", memberId);
}
// Clear the subscription, no matter if the callback execution failed or succeeded.
subscriptions.unsubscribe();
clearSubscription();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name clearSubscription misleading? It seems like it clears the assignment, not the subscription.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it's clearing assignments, renamed.


// Transition to ensure that a heartbeat request is sent out to effectively leave the
Expand Down Expand Up @@ -705,6 +717,9 @@ private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignme
SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(subscriptions.assignedPartitions());

log.debug("Member {} is triggering callbacks to release assignment {} and leave group",
memberId, droppedPartitions);

CompletableFuture<Void> callbackResult;
if (droppedPartitions.isEmpty()) {
// No assignment to release.
Expand Down Expand Up @@ -764,7 +779,7 @@ public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
* This also includes the latest member ID in the notification. If the member fails or leaves
* the group, this will be invoked with empty epoch and member ID.
*/
private void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) {
void notifyEpochChange(Optional<Integer> epoch, Optional<String> memberId) {
stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, memberId));
}

Expand Down Expand Up @@ -794,8 +809,12 @@ public void onHeartbeatRequestSent() {
}
} else if (state == MemberState.LEAVING) {
if (isPollTimerExpired) {
log.debug("Member {} sent heartbeat to leave due to expired poll timer. It will " +
"remain stale (no heartbeat) until it rejoins the group on the next consumer " +
"poll.", memberId);
transitionToStale();
} else {
log.debug("Member {} sent heartbeat to leave group.", memberId);
transitionToUnsubscribed();
}
}
Expand Down Expand Up @@ -939,11 +958,13 @@ void maybeReconcile() {
revokedPartitions.addAll(ownedPartitions);
revokedPartitions.removeAll(assignedTopicPartitions);

log.info("Updating assignment with local epoch {}\n" +
log.info("Reconciling assignment with local epoch {}\n" +
"\tMember: {}\n" +
"\tAssigned partitions: {}\n" +
"\tCurrent owned partitions: {}\n" +
"\tAdded partitions (assigned - owned): {}\n" +
"\tRevoked partitions (owned - assigned): {}\n",
memberId,
resolvedAssignment.localEpoch,
assignedTopicPartitions,
ownedPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void shouldIgnoreGroupInstanceIdForEmptyGroupId(GroupProtocol groupProtoc
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
public void testSubscription(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,12 @@ public class AsyncKafkaConsumerTest {
public void resetAll() {
backgroundEventQueue.clear();
if (consumer != null) {
consumer.close(Duration.ZERO);
try {
consumer.close(Duration.ZERO);
} catch (Exception e) {
// best effort to clean up after each test, but may throw (ex. if callbacks where
// throwing errors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this before but was asked to not do it, see https://github.com/apache/kafka/pull/15613/files/9fb917e4b1e60f238183c92d1ad3bc2565a7e1ea#r1559907295. That's why I added a "clean-up close" in the tests where close fails, with an expected exception (search for "clean-up" in this file).

I'd also be fine with your (and my original approach) to have a best-effort clean up and ignore exceptions here. But then, let's remove the "clean-up close" code in the other tests. Any consistent approach is fine with me here.

Copy link
Collaborator Author

@lianetm lianetm Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, good point. Agree with going with a consistent approach here, but could I bring it in a follow-up PR right after this? (it wouldn't need to get into 3.8). The trick is that now the close may throw for any of the tests that throw on the callback, so I would prefer to play safe, make sure the close does not throw here, and I'll review all the tests that we expect could rightfully throw on close now (Also the other clean-up logic has some Timeout related checks so better not removing blindly, could have some value it seems)

}
}
consumer = null;
Mockito.framework().clearInlineMocks();
Expand Down Expand Up @@ -832,8 +837,8 @@ public void testPartitionRevocationOnClose() {

@Test
public void testFailedPartitionRevocationOnClose() {
// If rebalance listener failed to execute during close, we will skip sending leave group and proceed with
// closing the consumer.
// If rebalance listener failed to execute during close, we still send the leave group,
// and proceed with closing the consumer.
ConsumerRebalanceListener listener = mock(ConsumerRebalanceListener.class);
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
consumer = newConsumer(
Expand All @@ -848,7 +853,7 @@ public void testFailedPartitionRevocationOnClose() {
subscriptions.assignFromSubscribed(singleton(tp));
doThrow(new KafkaException()).when(listener).onPartitionsRevoked(eq(singleton(tp)));
assertThrows(KafkaException.class, () -> consumer.close(Duration.ZERO));
verify(applicationEventHandler, never()).addAndGet(any(LeaveOnCloseEvent.class));
verify(applicationEventHandler).addAndGet(any(LeaveOnCloseEvent.class));
verify(listener).onPartitionsRevoked(eq(singleton(tp)));
assertEquals(emptySet(), subscriptions.assignedPartitions());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public void testFencingWhenStateIsPrepareLeaving() {
// because member is already out of the group in the broker).
completeCallback(callbackEvent, membershipManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, membershipManager.memberEpoch());
verify(membershipManager).notifyEpochChange(Optional.empty(), Optional.empty());
assertTrue(membershipManager.shouldSkipHeartbeat());
}

Expand Down