Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members #16145

Original file line number Diff line number Diff line change
Expand Up @@ -325,24 +325,12 @@ private Group validateOffsetCommit(
}
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}

try {
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);
} catch (StaleMemberEpochException ex) {
// The STALE_MEMBER_EPOCH error is only returned for new consumer group (KIP-848). When
// it is, the member should be using the OffsetCommit API version >= 9. As we don't
// support upgrading from the old to the new protocol yet, we return UNSUPPORTED_VERSION
// error if an older version is used. We will revise this when the upgrade path is implemented.
if (context.header.apiVersion() >= 9) {
throw ex;
} else {
throw Errors.UNSUPPORTED_VERSION.exception();
}
}
group.validateOffsetCommit(
request.memberId(),
request.groupInstanceId(),
request.generationIdOrMemberEpoch(),
false
);

return group;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
Expand Down Expand Up @@ -792,21 +793,26 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
* @param memberEpoch The member epoch.
* @param isTransactional Whether the offset commit is transactional or not. It has no
* impact when a consumer group is used.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetCommit(
String memberId,
String groupInstanceId,
int memberEpoch,
boolean isTransactional
) throws UnknownMemberIdException, StaleMemberEpochException {
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member epoch is -1, the request comes from either the admin client
// or a consumer which does not use the group management facility. In this case,
// the request can commit offsets if the group is empty.
if (memberEpoch < 0 && members().isEmpty()) return;

final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false);
validateMemberEpoch(memberEpoch, member.memberEpoch());
validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand All @@ -815,13 +821,18 @@ public void validateOffsetCommit(
* @param memberId The member id for consumer groups.
* @param memberEpoch The member epoch for consumer groups.
* @param lastCommittedOffset The last committed offsets in the timeline.
* @throws UnknownMemberIdException If the member is not found.
* @throws StaleMemberEpochException If the member uses the consumer protocol and the provided
* member epoch doesn't match the actual member epoch.
* @throws IllegalGenerationException If the member uses the classic protocol and the provided
* generation id is not equal to the member epoch.
*/
@Override
public void validateOffsetFetch(
String memberId,
int memberEpoch,
long lastCommittedOffset
) throws UnknownMemberIdException, StaleMemberEpochException {
) throws UnknownMemberIdException, StaleMemberEpochException, IllegalGenerationException {
// When the member id is null and the member epoch is -1, the request either comes
// from the admin client or from a client which does not provide them. In this case,
// the fetch request is accepted.
Expand All @@ -832,7 +843,7 @@ public void validateOffsetFetch(
throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.",
memberId, groupId));
}
validateMemberEpoch(memberEpoch, member.memberEpoch());
validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
}

/**
Expand Down Expand Up @@ -896,16 +907,27 @@ public boolean isInStates(Set<String> statesFilter, long committedOffset) {
}

/**
* Throws a StaleMemberEpochException if the received member epoch does not match
* the expected member epoch.
* Throws an exception if the received member epoch does not match the expected member epoch.
*
* @param receivedMemberEpoch The received member epoch or generation id.
* @param expectedMemberEpoch The expected member epoch.
* @param useClassicProtocol The boolean indicating whether the checked member uses the classic protocol.
* @throws StaleMemberEpochException if the member with unmatched member epoch uses the consumer protocol.
* @throws IllegalGenerationException if the member with unmatched generation id uses the classic protocol.
*/
private void validateMemberEpoch(
int receivedMemberEpoch,
int expectedMemberEpoch
) throws StaleMemberEpochException {
int expectedMemberEpoch,
boolean useClassicProtocol
) throws StaleMemberEpochException, IllegalGenerationException {
if (receivedMemberEpoch != expectedMemberEpoch) {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
if (useClassicProtocol) {
throw new IllegalGenerationException(String.format("The received generation id %d does not match " +
"the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
} else {
throw new StaleMemberEpochException(String.format("The received member epoch %d does not match "
+ "the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
Expand All @@ -51,6 +50,7 @@
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
Expand Down Expand Up @@ -1142,14 +1142,8 @@ public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() {
assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request));
}

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short version) {
// All the newer versions are fine.
if (version >= 9) return;
// Version 0 does not support MemberId and GenerationIdOrMemberEpoch fields.
if (version == 0) return;

@Test
public void testConsumerGroupOffsetCommitWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();

// Create an empty group.
Expand All @@ -1162,27 +1156,30 @@ public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short ve
group.updateMember(new ConsumerGroupMember.Builder("member")
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

// Verify that the request is rejected with the correct exception.
assertThrows(UnsupportedVersionException.class, () -> context.commitOffset(
version,
new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
))
)
);
OffsetCommitRequestData request = new OffsetCommitRequestData()
.setGroupId("foo")
.setMemberId("member")
.setGenerationIdOrMemberEpoch(9)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("bar")
.setPartitions(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestPartition()
.setPartitionIndex(0)
.setCommittedOffset(100L)
))
));

// Verify that a smaller epoch is rejected.
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));

// Verify that a larger epoch is rejected.
request.setGenerationIdOrMemberEpoch(11);
assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));
}

@Test
Expand Down Expand Up @@ -2294,6 +2291,31 @@ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testConsumerGroupOffsetFetchWithIllegalGenerationId() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
group.updateMember(new ConsumerGroupMember.Builder("member")
.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata())
.build()
);

// Fetch offsets case.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(
new OffsetFetchRequestData.OffsetFetchRequestTopics()
.setName("foo")
.setPartitionIndexes(Collections.singletonList(0))
);

// Fetch offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));

// Fetch all offsets case.
assertThrows(IllegalGenerationException.class,
() -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
}

@Test
public void testGenericGroupOffsetDelete() {
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
Expand Down