Skip to content

Commit

Permalink
Revert "KAFKA-13891: reset generation when syncgroup failed with REBA…
Browse files Browse the repository at this point in the history
…LANCE_IN_PROGRESS (apache#12140)" (apache#12794)

This reverts commit c23d60d.

Reviewers: Luke Chen <showuon@gmail.com>
  • Loading branch information
aiquestion authored and guozhangwang committed Jan 25, 2023
1 parent 61f8c19 commit 988a8ed
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 68 deletions.
Expand Up @@ -825,9 +825,6 @@ public void handle(SyncGroupResponse syncResponse,
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
// consumer didn't get assignment in this generation, so we need to reset generation
// to avoid joinGroup with out-of-data ownedPartitions in cooperative rebalance
resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, false);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has changed we would not expect the instance id
Expand Down
Expand Up @@ -67,7 +67,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -504,54 +503,6 @@ public void testRetainMemberIdAfterSyncGroupDisconnect() {
ensureActiveGroup(rejoinedGeneration, memberId);
}

@Test
public void testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws InterruptedException, ExecutionException {
setupCoordinator();

String memberId = "memberId";
int generation = 5;

// Rebalance once to initialize the generation and memberId
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
expectJoinGroup("", generation, memberId);
expectSyncGroup(generation, memberId);
ensureActiveGroup(generation, memberId);

// Force a rebalance
coordinator.requestRejoin("Manual test trigger");
assertTrue(coordinator.rejoinNeededOrPending());

ExecutorService executor = Executors.newFixedThreadPool(1);
try {
// Return RebalanceInProgress in syncGroup
int rejoinedGeneration = 10;
expectJoinGroup(memberId, rejoinedGeneration, memberId);
expectRebalanceInProgressForSyncGroup(rejoinedGeneration, memberId);
Future<Boolean> secondJoin = executor.submit(() ->
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));

TestUtils.waitForCondition(() -> {
AbstractCoordinator.Generation currentGeneration = coordinator.generation();
return currentGeneration.generationId == AbstractCoordinator.Generation.NO_GENERATION.generationId &&
currentGeneration.memberId.equals(memberId);
}, 2000, "Generation should be reset");

rejoinedGeneration = 20;
expectSyncGroup(rejoinedGeneration, memberId);
mockClient.respond(joinGroupFollowerResponse(
rejoinedGeneration,
memberId,
"leaderId",
Errors.NONE,
PROTOCOL_TYPE
));
assertTrue(secondJoin.get());
} finally {
executor.shutdownNow();
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
}
}

@Test
public void testRejoinReason() {
setupCoordinator();
Expand Down Expand Up @@ -639,22 +590,6 @@ private void expectDisconnectInSyncGroup(
}, null, true);
}

private void expectRebalanceInProgressForSyncGroup(
int expectedGeneration,
String expectedMemberId
) {
mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) body).data();
return syncGroupRequest.generationId() == expectedGeneration
&& syncGroupRequest.memberId().equals(expectedMemberId)
&& syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, PROTOCOL_NAME));
}

private void expectDisconnectInJoinGroup(
String expectedMemberId
) {
Expand Down

0 comments on commit 988a8ed

Please sign in to comment.