Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasbru committed Feb 29, 2024
1 parent d1f9cbf commit 651e244
Showing 1 changed file with 7 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
Expand All @@ -40,7 +39,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -767,37 +765,14 @@ public void testLeaveGroupWhenMemberOwnsAssignment() {
}

@Test
public void testLeaveGroupWhenAssignmentEmpty() {
String topicName = "topic1";
TopicPartition ownedPartition = new TopicPartition(topicName, 0);
public void testFencedWhenAssignmentEmpty() {
MembershipManager membershipManager = createMemberInStableState();

// We own a partition and rebalance listener is registered
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
when(subscriptionState.rebalanceListener()).thenReturn(Optional.of(mock(ConsumerRebalanceListener.class)));
doNothing().when(subscriptionState).markPendingRevocation(anySet());

// Trigger leave group
CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();

// Rebalance listener not yet triggered
final ArgumentCaptor<ConsumerRebalanceListenerCallbackNeededEvent> consumerRebalanceListener =
ArgumentCaptor.forClass(ConsumerRebalanceListenerCallbackNeededEvent.class);
verify(backgroundEventHandler).add(consumerRebalanceListener.capture());
final ConsumerRebalanceListenerCallbackNeededEvent callbackEvent = consumerRebalanceListener.getValue();
assertFalse(leaveResult1.isDone());
assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());

// Clear the assignment
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);

// Complete the callback
callbackEvent.future().complete(null);

// Completed the listener
assertFalse(leaveResult1.isDone());
membershipManager.transitionToFenced();

// Make sure to never call `assignFromSubscribed` again
verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet());
Expand Down Expand Up @@ -1709,6 +1684,7 @@ private void assertLeaveGroupDueToExpiredPollAndTransitionToStale(MembershipMana
public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
MembershipManagerImpl membershipManager = memberJoinWithAssignment();
clearInvocations(subscriptionState);
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand All @@ -1717,6 +1693,7 @@ public void testTransitionToLeavingWhileReconcilingDueToStaleMember() {
public void testTransitionToLeavingWhileJoiningDueToStaleMember() {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
doNothing().when(subscriptionState).assignFromSubscribed(any());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.JOINING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand All @@ -1726,6 +1703,7 @@ public void testTransitionToLeavingWhileStableDueToStaleMember() {
MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup();
ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null);
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
doNothing().when(subscriptionState).assignFromSubscribed(any());
assertEquals(MemberState.STABLE, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
Expand All @@ -1736,6 +1714,7 @@ public void testTransitionToLeavingWhileAcknowledgingDueToStaleMember() {
MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(true);
doNothing().when(subscriptionState).assignFromSubscribed(any());
clearInvocations(subscriptionState);
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertLeaveGroupDueToExpiredPollAndTransitionToStale(membershipManager);
}
Expand Down Expand Up @@ -1806,6 +1785,7 @@ public void testStaleMemberWaitsForCallbackToRejoinWhenTimerReset() {
private MembershipManagerImpl mockStaleMember() {
MembershipManagerImpl membershipManager = createMemberInStableState();
doNothing().when(subscriptionState).assignFromSubscribed(any());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.transitionToSendingLeaveGroup(true);
membershipManager.onHeartbeatRequestSent();
return membershipManager;
Expand Down

0 comments on commit 651e244

Please sign in to comment.