Skip to content

Commit

Permalink
KAFKA-16261: updateSubscription fails if already empty subscription (#…
Browse files Browse the repository at this point in the history
…15440)

The internal SubscriptionState object keeps track of whether the assignment is user-assigned, or auto-assigned. If there are no assigned partitions, the assignment resets to NONE. If you call SubscriptionState.assignFromSubscribed in this state it fails.

This change makes sure to check SubscriptionState.hasAutoAssignedPartitions() so that assignFromSubscribed is going to be permitted.

Also, a minor refactoring to make clearing the subscription a bit easier to follow in MembershipManagerImpl.

Testing via new unit test.

Reviewers: Bruno Cadonna <cadonna@apache.org>, Andrew Schofield <aschofield@confluent.io>
  • Loading branch information
lucasbru committed Mar 1, 2024
1 parent c8843f0 commit 8e1516f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public void transitionToFenced() {
log.error("onPartitionsLost callback invocation failed while releasing assignment" +
" after member got fenced. Member will rejoin the group anyways.", error);
}
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
clearSubscription();
if (state == MemberState.FENCED) {
transitionToJoining();
} else {
Expand Down Expand Up @@ -583,7 +583,7 @@ public void transitionToFatal() {
log.error("onPartitionsLost callback invocation failed while releasing assignment" +
"after member failed with fatal error.", error);
}
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
clearSubscription();
});
}

Expand All @@ -597,16 +597,14 @@ public void onSubscriptionUpdated() {
}

/**
* Update a new assignment by setting the assigned partitions in the member subscription.
*
* @param assignedPartitions Topic partitions to take as the new subscription assignment
* @param clearAssignments True if the pending assignments and metadata cache should be cleared
* Clear the assigned partitions in the member subscription, pending assignments and metadata cache.
*/
private void updateSubscription(SortedSet<TopicIdPartition> assignedPartitions,
boolean clearAssignments) {
Collection<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribed(assignedTopicPartitions);
updateAssignmentLocally(assignedPartitions, clearAssignments);
private void clearSubscription() {
if (subscriptions.hasAutoAssignedPartitions()) {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
updateCurrentAssignment(Collections.emptySet());
clearPendingAssignmentsAndLocalNamesCache();
}

/**
Expand All @@ -621,18 +619,7 @@ private void updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition> assi
SortedSet<TopicPartition> addedPartitions) {
Collection<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions);
updateAssignmentLocally(assignedPartitions, false);
}

/**
* Make assignment effective on the group manager.
*/
private void updateAssignmentLocally(SortedSet<TopicIdPartition> assignedPartitions,
boolean clearAssignments) {
updateCurrentAssignment(assignedPartitions);
if (clearAssignments) {
clearPendingAssignmentsAndLocalNamesCache();
}
}

/**
Expand Down Expand Up @@ -660,7 +647,7 @@ public void transitionToJoining() {
public CompletableFuture<Void> leaveGroup() {
if (isNotInGroup()) {
if (state == MemberState.FENCED) {
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
clearSubscription();
transitionTo(MemberState.UNSUBSCRIBED);
}
return CompletableFuture.completedFuture(null);
Expand All @@ -679,7 +666,7 @@ public CompletableFuture<Void> leaveGroup() {
CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment();
callbackResult.whenComplete((result, error) -> {
// Clear the subscription, no matter if the callback execution failed or succeeded.
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
clearSubscription();

// Transition to ensure that a heartbeat request is sent out to effectively leave the
// group (even in the case where the member had no assignment to release or when the
Expand Down Expand Up @@ -880,7 +867,7 @@ private void transitionToStale() {
log.error("onPartitionsLost callback invocation failed while releasing assignment" +
" after member left group due to expired poll timer.", error);
}
updateSubscription(new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR), true);
clearSubscription();
log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain " +
"in {} state until the poll timer is reset, and it will then rejoin the group",
memberId, MemberState.STALE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,21 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
}

@Test
public void testFencedWhenAssignmentEmpty() {
MembershipManager membershipManager = createMemberInStableState();

// Clear the assignment
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);

membershipManager.transitionToFenced();

// Make sure to never call `assignFromSubscribed` again
verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet());
}


@Test
public void testLeaveGroupWhenMemberAlreadyLeaving() {
MembershipManager membershipManager = createMemberInStableState();
Expand Down Expand Up @@ -1678,6 +1693,7 @@ private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(MembershipMana
public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
MembershipManagerImpl membershipManager = memberJoinWithAssignment();
clearInvocations(subscriptionState);
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand All @@ -1686,6 +1702,7 @@ public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
public void testTransitionToLeavingWhileJoiningDueToStaleMember() {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
doNothing().when(subscriptionState).assignFromSubscribed(any());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.JOINING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand All @@ -1695,6 +1712,7 @@ public void testTransitionToLeavingWhileStableDueToStaleMember() {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
membershipManager.onHeartbeatSuccess(heartbeatResponse.data());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
doNothing().when(subscriptionState).assignFromSubscribed(any());
assertEquals(MemberState.STABLE, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
Expand All @@ -1705,6 +1723,7 @@ public void testTransitionToLeavingWhileAcknowledgingDueToStaleMember() {
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(true);
doNothing().when(subscriptionState).assignFromSubscribed(any());
clearInvocations(subscriptionState);
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand Down Expand Up @@ -1775,6 +1794,7 @@ public void testStaleMemberWaitsForCallbackToRejoinWhenTimerReset() {
private MembershipManagerImpl mockStaleMember() {
MembershipManagerImpl membershipManager = createMemberInStableState();
doNothing().when(subscriptionState).assignFromSubscribed(any());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.transitionToSendingLeaveGroup(true);
membershipManager.onHeartbeatRequestSent();
return membershipManager;
Expand Down

0 comments on commit 8e1516f

Please sign in to comment.