Skip to content

Commit

Permalink
MINOR: Transform new coordinator error before returning to client (ap…
Browse files Browse the repository at this point in the history
…ache#15001)

This was missing from https://issues.apache.org/jira/browse/KAFKA-14500. The existing coordinator transforms the log append error before returning to client. Apply the same transformation.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
jeffkbkim authored and gaurav-narula committed Jan 24, 2024
1 parent cc5e161 commit 407ad22
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 5 deletions.
Expand Up @@ -2348,7 +2348,8 @@ private CoordinatorResult<Void, Record> completeGenericGroupJoin(
// We failed to write the empty group metadata. If the broker fails before another rebalance,
// the previous generation written to the log will become active again (and most likely timeout).
// This should be safe since there are no active members in an empty generation, so we just warn.
log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), t.getMessage());
Errors error = appendGroupMetadataErrorToResponseError(Errors.forException(t));
log.warn("Failed to write empty metadata for group {}: {}", group.groupId(), error.message());
}
});

Expand Down Expand Up @@ -3096,7 +3097,7 @@ public CoordinatorResult<Void, Record> genericGroupSync(
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.isInState(COMPLETING_REBALANCE) && request.generationId() == group.generationId()) {
if (t != null) {
Errors error = Errors.forException(t);
Errors error = appendGroupMetadataErrorToResponseError(Errors.forException(t));
resetAndPropagateAssignmentWithError(group, error);
maybePrepareRebalanceOrCompleteJoin(group, "Error " + error + " when storing group assignment" +
"during SyncGroup (member: " + memberId + ").");
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
Expand Down Expand Up @@ -4706,14 +4707,35 @@ public void testGenerateRecordsOnNewGenericGroup() throws Exception {
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.MEMBER_ID_REQUIRED.code(), joinResult.joinFuture.get().errorCode());

GenericGroup group = context.createGenericGroup("group-id");
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);

assertEquals(
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())),
joinResult.records
);
}

@Test
public void testGenerateRecordsOnNewGenericGroupFailureTransformsError() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();

JoinGroupRequestData request = new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withGroupInstanceId("group-instance-id")
.withDefaultProtocolTypeAndProtocols()
.build();

JoinResult joinResult = context.sendGenericGroupJoin(request, true);
assertFalse(joinResult.joinFuture.isDone());

// Simulate a failed write to the log.
joinResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException());
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.NOT_COORDINATOR.code(), joinResult.joinFuture.get().errorCode());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testReplayGroupMetadataRecords(boolean useDefaultRebalanceTimeout) {
Expand Down Expand Up @@ -6355,7 +6377,6 @@ public void testReplaceStaticMemberInStableStateWithUpdatedProtocolTriggersRebal
group.transitionTo(STABLE);

// Static member rejoins with UNKNOWN_MEMBER_ID. The selected protocol changes and triggers a rebalance.

JoinResult joinResult = context.sendGenericGroupJoin(
request.setProtocols(toProtocols("roundrobin"))
);
Expand Down Expand Up @@ -7435,7 +7456,7 @@ public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSel
assertTrue(group.isInState(STABLE));
assertEquals(Errors.NONE.code(), leaderSyncResult.syncFuture.get().errorCode());

// Sync with old member id will also not fail because the member id is not updated because of persistence failure
// Sync with old member id will also not fail as the member id is not updated due to persistence failure
SyncResult oldMemberSyncResult = context.sendGenericGroupSync(
syncRequest
.setGroupInstanceId("follower-instance-id")
Expand Down Expand Up @@ -8371,6 +8392,38 @@ public void testSyncGroupFromIllegalGeneration() throws Exception {
assertEquals(Errors.ILLEGAL_GENERATION.code(), syncResult.syncFuture.get().errorCode());
}

@Test
public void testSyncGroupAsLeaderAppendFailureTransformsError() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");

JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withGroupInstanceId("leader-instance-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build();

JoinGroupResponseData joinResponse = context.joinGenericGroupAndCompleteJoin(joinRequest, true, true);

// Send the sync group with an invalid generation
SyncResult syncResult = context.sendGenericGroupSync(
new SyncGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(joinResponse.memberId())
.withGenerationId(1)
.build()
);

assertFalse(syncResult.syncFuture.isDone());

// Simulate a failed write to the log.
syncResult.appendFuture.completeExceptionally(new NotLeaderOrFollowerException());
assertTrue(syncResult.syncFuture.isDone());
assertEquals(Errors.NOT_COORDINATOR.code(), syncResult.syncFuture.get().errorCode());
}

@Test
public void testJoinGroupFromUnchangedFollowerDoesNotRebalance() throws Exception {
// To get a group of two members:
Expand Down

0 comments on commit 407ad22

Please sign in to comment.