Skip to content

Commit

Permalink
MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure c…
Browse files Browse the repository at this point in the history
…ompatibility with classic protocol members (#16145)

During online migration, there could be ConsumerGroup that has members that uses the classic protocol. In the current implementation, `STALE_MEMBER_EPOCH` could be thrown in ConsumerGroup offset fetch/commit validation but it's not supported by the classic protocol. Thus this patch changed `ConsumerGroup#validateOffsetCommit` and `ConsumerGroup#validateOffsetFetch` to ensure compatibility.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io>
  • Loading branch information
dongnuo123 committed Jun 5, 2024
1 parent 252c1ac commit 7ddfa64
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class OffsetCommitRequestTest(cluster: ClusterInstance) extends GroupCoordinator
topic = "foo",
partition = 0,
offset = 100L,
expectedError = Errors.NONE,
expectedError = if (useNewProtocol && version < 9) Errors.UNSUPPORTED_VERSION else Errors.NONE,
version = version.toShort
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ public static GroupType parse(String name) {
* @param generationIdOrMemberEpoch The generation id for genetic groups or the member epoch
* for consumer groups.
* @param isTransactional Whether the offset commit is transactional or not.
* @param apiVersion The api version.
*/
void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationIdOrMemberEpoch,
boolean isTransactional
boolean isTransactional,
short apiVersion

) throws KafkaException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,34 +325,25 @@ private Group validateOffsetCommit(
}
}

try {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When
// it is, the member should be using the OffsetCommit API version >= 9. As we don't
// support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION
// error if an older version is used. We will revise this when the upgrade path is implemented.
if (context.header.apiVersion() >= 9) {
throw ex;
} else {
throw Errors.UNSUPPORTED_VERSION.exception();
}
}
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false,
context.apiVersion()
);

return group;
}

/**
* Validates an TxnOffsetCommit request.
*
* @param context The request context.
* @param request The actual request.
*/
private Group validateTransactionalOffsetCommit(
RequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
Group group;
Expand All @@ -375,7 +366,8 @@ private Group validateTransactionalOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationId(),
true
true,
context.apiVersion()
);
} catch (StaleMemberEpochException ex) {
throw Errors.ILLEGAL_GENERATION.exception();
Expand Down Expand Up @@ -530,7 +522,7 @@ public CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> commitT
RequestContext context,
TxnOffsetCommitRequestData request
) throws ApiException {
validateTransactionalOffsetCommit(request);
validateTransactionalOffsetCommit(context, request);

final TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData();
final List<CoordinatorRecord> records = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,15 @@ public void validateMember(
* @param groupInstanceId The group instance id.
* @param generationId The generation id.
* @param isTransactional Whether the offset commit is transactional or not.
* @param apiVersion The api version.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int generationId,
boolean isTransactional
boolean isTransactional,
short apiVersion
) throws CoordinatorNotAvailableException, UnknownMemberIdException, IllegalGenerationException, FencedInstanceIdException {
if (isInState(DEAD)) {
throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -792,21 +794,36 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or not. It has no
* impact when a consumer group is used.
* @param apiVersion The api version.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
boolean isTransactional
) throws UnknownMemberIdException, StaleMemberEpochException {
boolean isTransactional,
short apiVersion
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;

final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
validateMemberEpoch(memberEpoch, member.memberEpoch());

// If the commit is not transactional and the member uses the new consumer protocol (KIP-848),
// the member should be using the OffsetCommit API version >= 9.
if (!isTransactional && !member.useClassicProtocol() && apiVersion < 9) {
throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " +
"by members using the consumer group protocol");
}

validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
}

/**
Expand All @@ -815,13 +832,18 @@ public void validateOffsetCommit(
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws UnknownMemberIdException, StaleMemberEpochException {
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member id is null and the member epoch is -1, the request either comes
// from the admin client or from a client which does not provide them. In this case,
// the fetch request is accepted.
Expand All @@ -832,7 +854,7 @@ public void validateOffsetFetch(
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
memberId, groupId));
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
}

/**
Expand Down Expand Up @@ -896,16 +918,27 @@ public boolean isInStates(Set<String> statesFilter, long committedOffset) {
}

/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.
* Throws an exception if the received member epoch does not match the expected member epoch.
*
* @param receivedMemberEpoch The received member epoch or generation id.
* @param expectedMemberEpoch The expected member epoch.
* @param useClassicProtocol The boolean indicating whether the checked member uses the classic protocol.
* @throws StaleMemberEpochException if the member with unmatched member epoch uses the consumer protocol.
* @throws IllegalGenerationException if the member with unmatched generation id uses the classic protocol.
*/
private void validateMemberEpoch(
int receivedMemberEpoch,
int expectedMemberEpoch
) throws StaleMemberEpochException {
int expectedMemberEpoch,
boolean useClassicProtocol
) throws StaleMemberEpochException, IllegalGenerationException {
if (receivedMemberEpoch != expectedMemberEpoch) {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
if (useClassicProtocol) {
throw new IllegalGenerationException(String.format("The received generation id %d does not match " +
"the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
} else {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
Expand All @@ -51,6 +50,7 @@
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
Expand Down Expand Up @@ -1142,14 +1142,8 @@ public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() {
assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request));
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) {
// All the newer versions are fine.
if (version >= 9) return;
// Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields.
if (version == 0) return;

@Test
public void testConsumerGroupOffsetCommitWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();

// Create an empty group.
Expand All @@ -1162,27 +1156,30 @@ public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short ve
group.updateMember(new ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

// Verify that the request is rejected with the correct exception.
assertThrows(UnsupportedVersionException.class, () -> context.commitOffset(
version,
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
))
)
);
OffsetCommitRequestData request = new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
));

// Verify that a smaller epoch is rejected.
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));

// Verify that a larger epoch is rejected.
request.setGenerationIdOrMemberEpoch(11);
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));
}

@Test
Expand Down Expand Up @@ -2294,6 +2291,30 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testConsumerGroupOffsetFetchWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
group.updateMember(new ConsumerGroupMember.Builder("member")
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);

// Fetch offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));

// Fetch all offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testGenericGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
Expand Down
Loading

0 comments on commit 7ddfa64

Please sign in to comment.