Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13891: reset generation when syncgroup failed with REBALANCE_IN_PROGRESS #12140

Merged
merged 2 commits into from Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -807,6 +807,9 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
// consumer didn't get assignment in this generation, so we need to reset generation
// to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance
resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to add a comment here to explain why we need to reset generation ID here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comment & unit test. thanks~

future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has changed we would not expect the instance id
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -488,6 +489,54 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
ensureActiveGroup(rejoinedGeneration, memberId);
}

@Test
public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException {
setupCoordinator();

String memberId = "memberId";
int generation = 5;

// Rebalance once to initialize the generation and memberId
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
expectJoinGroup("", generation, memberId);
expectSyncGroup(generation, memberId);
ensureActiveGroup(generation, memberId);

// Force a rebalance
coordinator.requestRejoin("Manual test trigger");
assertTrue(coordinator.rejoinNeededOrPending());

ExecutorService executor = Executors.newFixedThreadPool(1);
try {
// Return RebalanceInProgress in syncGroup
int rejoinedGeneration = 10;
expectJoinGroup(memberId, rejoinedGeneration, memberId);
expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId);
Future<Boolean> secondJoin = executor.submit(() ->
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));

TestUtils.waitForCondition(() -> {
AbstractCoordinator.Generation currentGeneration = coordinator.generation();
return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId &&
currentGeneration.memberId.equals(memberId);
}, 2000, "Generation should be reset");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I saw the 2000 timeout appeared in AbstractCoordinatorTest.java in many places. Could we use a static variable to replace them? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't think of any name for this 2000 timeout. so i just changed it to rebalance timeout. does that make sense? :-p

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aiquestion , the current REBALANCE_TIMEOUT_MS is 60 seconds, which means we'll wait 60 secs for generation reset. It's not correct. It should use 2 seconds as before. I think you can ignore my previous minor comment about 2000 change, and revert the REBALANCE_TIMEOUT_MS back to 2000. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, okay. reverted. Thanks~


rejoinedGeneration = 20;
expectSyncGroup(rejoinedGeneration, memberId);
mockClient.respond(joinGroupFollowerResponse(
rejoinedGeneration,
memberId,
"leaderId",
Errors.NONE,
PROTOCOL_TYPE
));
assertTrue(secondJoin.get());
} finally {
executor.shutdownNow();
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
}

@Test
public void testRejoinReason() {
setupCoordinator();
Expand Down Expand Up @@ -566,6 +615,22 @@ private void expectDisconnectInSyncGroup(
}, null, true);
}

private void expectRebalanceInProgressForSyncGroup(
int expectedGeneration,
String expectedMemberId
) {
mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
return syncGroupRequest.generationId() == expectedGeneration
&& syncGroupRequest.memberId().equals(expectedMemberId)
&& syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME));
}

private void expectDisconnectInJoinGroup(
String expectedMemberId
) {
Expand Down