Skip to content

Commit

Permalink
add test and add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
aiquestion committed May 14, 2022
1 parent bc1d809 commit 8d425b0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
Expand Up @@ -807,6 +807,8 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
// consumer didn't get assignment in this generation, so we need to reset generation
// to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance
resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -488,6 +489,54 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
ensureActiveGroup(rejoinedGeneration, memberId);
}

@Test
public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException {
setupCoordinator();

String memberId = "memberId";
int generation = 5;

// Rebalance once to initialize the generation and memberId
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
expectJoinGroup("", generation, memberId);
expectSyncGroup(generation, memberId);
ensureActiveGroup(generation, memberId);

// Force a rebalance
coordinator.requestRejoin("Manual test trigger");
assertTrue(coordinator.rejoinNeededOrPending());

ExecutorService executor = Executors.newFixedThreadPool(1);
try {
// Return RebalanceInProgress in syncGroup
int rejoinedGeneration = 10;
expectJoinGroup(memberId, rejoinedGeneration, memberId);
expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId);
Future<Boolean> secondJoin = executor.submit(() ->
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));

TestUtils.waitForCondition(() -> {
AbstractCoordinator.Generation currentGeneration = coordinator.generation();
return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId &&
currentGeneration.memberId.equals(memberId);
}, 2000, "Generation should be reset");

rejoinedGeneration = 20;
expectSyncGroup(rejoinedGeneration, memberId);
mockClient.respond(joinGroupFollowerResponse(
rejoinedGeneration,
memberId,
"leaderId",
Errors.NONE,
PROTOCOL_TYPE
));
assertTrue(secondJoin.get());
} finally {
executor.shutdownNow();
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
}

@Test
public void testRejoinReason() {
setupCoordinator();
Expand Down Expand Up @@ -566,6 +615,22 @@ private void expectDisconnectInSyncGroup(
}, null, true);
}

private void expectRebalanceInProgressForSyncGroup(
int expectedGeneration,
String expectedMemberId
) {
mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
return syncGroupRequest.generationId() == expectedGeneration
&& syncGroupRequest.memberId().equals(expectedMemberId)
&& syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME));
}

private void expectDisconnectInJoinGroup(
String expectedMemberId
) {
Expand Down

0 comments on commit 8d425b0

Please sign in to comment.