diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 54f34d5c6aeb..7c2954798707 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; @@ -777,11 +778,87 @@ public ClassicGroup classicGroup( } } + /** + * Validates the online downgrade if a consumer member is fenced from the consumer group. + * + * @param consumerGroup The ConsumerGroup. + * @param memberId The fenced member id. + * @return A boolean indicating whether it's valid to online downgrade the consumer group. + */ + private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { + if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", + consumerGroup.groupId()); + return false; + } else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() <= 1) { + log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", + consumerGroup.groupId()); + } + return true; + } + + /** + * Creates a ClassicGroup corresponding to the given ConsumerGroup. + * + * @param consumerGroup The converted ConsumerGroup. + * @param leavingMemberId The leaving member that triggers the downgrade validation. + * @param records The list of Records. + * @return An appendFuture of the conversion. + */ + private CompletableFuture convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List records) { + consumerGroup.createGroupTombstoneRecords(records); + + ClassicGroup classicGroup; + try { + classicGroup = ClassicGroup.fromConsumerGroup( + consumerGroup, + leavingMemberId, + logContext, + time, + metrics, + consumerGroupSessionTimeoutMs, + metadataImage + ); + } catch (SchemaException e) { + log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + + "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + + throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", + consumerGroup.groupId(), e.getMessage())); + } + classicGroup.createClassicGroupRecords(metadataImage.features().metadataVersion(), records); + + // Directly update the states instead of replaying the records because + // the classicGroup reference is needed for triggering the rebalance. + // Set the appendFuture to prevent the records from being replayed. + removeGroup(consumerGroup.groupId()); + groups.put(consumerGroup.groupId(), classicGroup); + metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + + classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); + prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId())); + + CompletableFuture appendFuture = new CompletableFuture<>(); + appendFuture.exceptionally(__ -> { + metrics.onClassicGroupStateTransition(classicGroup.currentState(), null); + return null; + }); + return appendFuture; + } + /** * Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request. * * @param classicGroup A ClassicGroup. - * @return The boolean indicating whether it's valid to online upgrade the classic group. + * @return A boolean indicating whether it's valid to online upgrade the classic group. */ private boolean validateOnlineUpgrade(ClassicGroup classicGroup) { if (!consumerGroupMigrationPolicy.isUpgradeEnabled()) { @@ -1421,11 +1498,12 @@ private CoordinatorResult consumerGr int memberEpoch ) throws ApiException { ConsumerGroup group = consumerGroup(groupId); - List records; + List records = new ArrayList<>(); + CompletableFuture appendFuture = null; if (instanceId == null) { ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); - records = consumerGroupFenceMember(group, member); + appendFuture = consumerGroupFenceMember(group, member, records); } else { ConsumerGroupMember member = group.staticMember(instanceId); throwIfStaticMemberIsUnknown(member, instanceId); @@ -1437,12 +1515,17 @@ private CoordinatorResult consumerGr } else { log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group.", group.groupId(), memberId, instanceId); - records = consumerGroupFenceMember(group, member); + appendFuture = consumerGroupFenceMember(group, member, records); } } - return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData() - .setMemberId(memberId) - .setMemberEpoch(memberEpoch)); + + return new CoordinatorResult<>( + records, + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(memberEpoch), + appendFuture + ); } /** @@ -1470,42 +1553,45 @@ private List consumerGroupStaticMemberGroupLeave( } /** - * Fences a member from a consumer group. - * - * @param group The group. - * @param member The member. + * Fences a member from a consumer group and maybe downgrade the consumer group to a classic group. * - * @return A list of records to be applied to the state. + * @param group The group. + * @param member The member. + * @param records The list of records to be applied to the state. + * @return The append future to be applied. */ - private List consumerGroupFenceMember( + private CompletableFuture consumerGroupFenceMember( ConsumerGroup group, - ConsumerGroupMember member + ConsumerGroupMember member, + List records ) { - List records = new ArrayList<>(); - - removeMember(records, group.groupId(), member.memberId()); + if (validateOnlineDowngrade(group, member.memberId())) { + return convertToClassicGroup(group, member.memberId(), records); + } else { + removeMember(records, group.groupId(), member.memberId()); - // We update the subscription metadata without the leaving member. - Map subscriptionMetadata = group.computeSubscriptionMetadata( - member, - null, - metadataImage.topics(), - metadataImage.cluster() - ); + // We update the subscription metadata without the leaving member. + Map subscriptionMetadata = group.computeSubscriptionMetadata( + member, + null, + metadataImage.topics(), + metadataImage.cluster() + ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { - log.info("[GroupId {}] Computed new subscription metadata: {}.", - group.groupId(), subscriptionMetadata); - records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); - } + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId {}] Computed new subscription metadata: {}.", + group.groupId(), subscriptionMetadata); + records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); + } - // We bump the group epoch. - int groupEpoch = group.groupEpoch() + 1; - records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); + // We bump the group epoch. + int groupEpoch = group.groupEpoch() + 1; + records.add(newGroupEpochRecord(group.groupId(), groupEpoch)); - cancelTimers(group.groupId(), member.memberId()); + cancelTimers(group.groupId(), member.memberId()); - return records; + return null; + } } /** @@ -1549,7 +1635,10 @@ private void scheduleConsumerGroupSessionTimeout( ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); log.info("[GroupId {}] Member {} fenced from the group because its session expired.", groupId, memberId); - return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); + + List records = new ArrayList<>(); + CompletableFuture appendFuture = consumerGroupFenceMember(group, member, records); + return new CoordinatorResult<>(records, appendFuture); } catch (GroupIdNotFoundException ex) { log.debug("[GroupId {}] Could not fence {} because the group does not exist.", groupId, memberId); @@ -1599,7 +1688,10 @@ private void scheduleConsumerGroupRebalanceTimeout( log.info("[GroupId {}] Member {} fenced from the group because " + "it failed to transition from epoch {} within {}ms.", groupId, memberId, memberEpoch, rebalanceTimeoutMs); - return new CoordinatorResult<>(consumerGroupFenceMember(group, member)); + + List records = new ArrayList<>(); + CompletableFuture appendFuture = consumerGroupFenceMember(group, member, records); + return new CoordinatorResult<>(records, appendFuture); } else { log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " + "left the epoch {}.", groupId, memberId, memberEpoch); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 6baaed32be98..0b354d70553f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -16,7 +16,10 @@ */ package org.apache.kafka.coordinator.group.classic; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.FencedInstanceIdException; @@ -32,15 +35,22 @@ import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.coordinator.group.OffsetExpirationCondition; import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl; import org.apache.kafka.coordinator.group.Record; import org.apache.kafka.coordinator.group.RecordHelpers; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.server.common.MetadataVersion; import org.slf4j.Logger; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -1347,6 +1357,116 @@ public Map groupAssignment() { )); } + /** + * Convert the given ConsumerGroup to a corresponding ClassicGroup. + * The member with leavingMemberId will not be converted to the new ClassicGroup as it's the last + * member using new consumer protocol that left and triggered the downgrade. + * + * @param consumerGroup The converted ConsumerGroup. + * @param leavingMemberId The member that will not be converted in the ClassicGroup. + * @param logContext The logContext to create the ClassicGroup. + * @param time The time to create the ClassicGroup. + * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs. + * @param metadataImage The MetadataImage. + * @return The created ClassicGroup. + */ + public static ClassicGroup fromConsumerGroup( + ConsumerGroup consumerGroup, + String leavingMemberId, + LogContext logContext, + Time time, + GroupCoordinatorMetricsShard metrics, + int consumerGroupSessionTimeoutMs, + MetadataImage metadataImage + ) { + ClassicGroup classicGroup = new ClassicGroup( + logContext, + consumerGroup.groupId(), + ClassicGroupState.STABLE, + time, + metrics, + consumerGroup.groupEpoch(), + Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE), + Optional.empty(), + Optional.empty(), + Optional.of(time.milliseconds()) + ); + + consumerGroup.members().forEach((memberId, member) -> { + if (!memberId.equals(leavingMemberId)) { + classicGroup.add( + new ClassicGroupMember( + memberId, + Optional.ofNullable(member.instanceId()), + member.clientId(), + member.clientHost(), + member.rebalanceTimeoutMs(), + consumerGroupSessionTimeoutMs, + ConsumerProtocol.PROTOCOL_TYPE, + member.supportedJoinGroupRequestProtocols(), + null + ) + ); + } + }); + + classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol())); + classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics()); + + classicGroup.allMembers().forEach(classicGroupMember -> { + // Set the assignment with serializing the ConsumerGroup's targetAssignment. + // The serializing version should align with that of the member's JoinGroupRequestProtocol. + byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment( + new ConsumerPartitionAssignor.Assignment(ClassicGroup.topicPartitionListFromMap( + consumerGroup.targetAssignment().get(classicGroupMember.memberId()).partitions(), + metadataImage.topics() + )), + ConsumerProtocol.deserializeVersion( + ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().orElse(""))) + ) + )); + + classicGroupMember.setAssignment(assignment); + }); + + return classicGroup; + } + + /** + * Populate the record list with the records needed to create the given classic group. + * + * @param metadataVersion The MetadataVersion. + * @param records The list to which the new records are added. + */ + public void createClassicGroupRecords( + MetadataVersion metadataVersion, + List records + ) { + Map assignments = new HashMap<>(); + allMembers().forEach(classicGroupMember -> + assignments.put(classicGroupMember.memberId(), classicGroupMember.assignment()) + ); + + records.add(RecordHelpers.newGroupMetadataRecord(this, assignments, metadataVersion)); + } + + /** + * @return The list of TopicPartition converted from the map of topic id and partition set. + */ + private static List topicPartitionListFromMap( + Map> topicPartitions, + TopicsImage topicsImage + ) { + List topicPartitionList = new ArrayList<>(); + topicPartitions.forEach((topicId, partitionSet) -> { + TopicImage topicImage = topicsImage.getTopic(topicId); + if (topicImage != null) { + partitionSet.forEach(partition -> topicPartitionList.add(new TopicPartition(topicImage.name(), partition))); + } + }); + return topicPartitionList; + } + /** * Checks whether the transition to the target state is valid. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 7fabc657245d..952dd43f273b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -771,7 +771,19 @@ public void validateDeleteGroup() throws ApiException { */ @Override public void createGroupTombstoneRecords(List records) { + members().forEach((memberId, member) -> + records.add(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId(), memberId)) + ); + + members().forEach((memberId, member) -> + records.add(RecordHelpers.newTargetAssignmentTombstoneRecord(groupId(), memberId)) + ); records.add(RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId())); + + members().forEach((memberId, member) -> + records.add(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId(), memberId)) + ); + records.add(RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId())); records.add(RecordHelpers.newGroupEpochTombstoneRecord(groupId())); } @@ -1212,9 +1224,13 @@ public boolean supportsClassicProtocols(String memberProtocolType, Set m } /** + * Checks whether all the members use the classic protocol except the given member. + * + * @param memberId The member to remove. * @return A boolean indicating whether all the members use the classic protocol. */ - public boolean allMembersUseClassicProtocol() { - return numClassicProtocolMembers() == members().size(); + public boolean allMembersUseClassicProtocolExcept(String memberId) { + return numClassicProtocolMembers() == members().size() - 1 && + !getOrMaybeCreateMember(memberId, false).useClassicProtocol(); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index bb1fab0006f3..3f0ed31f7dde 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -479,6 +479,22 @@ public Map> partitionsPendingRevocation() { return partitionsPendingRevocation; } + /** + * @return The supported classic protocols converted to JoinGroupRequestProtocolCollection. + */ + public JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedJoinGroupRequestProtocols() { + JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = + new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); + supportedClassicProtocols().ifPresent(classicProtocols -> classicProtocols.forEach(protocol -> + protocols.add( + new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName(protocol.name()) + .setMetadata(protocol.metadata()) + ) + )); + return protocols; + } + /** * @return The classicMemberMetadata if the consumer uses the classic protocol. */ @@ -554,7 +570,7 @@ private static String lookupTopicNameById( } /** - * @return The boolean indicating whether the member uses the classic protocol. + * @return A boolean indicating whether the member uses the classic protocol. */ public boolean useClassicProtocol() { return classicMemberMetadata != null; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java index cf859d8a1a14..cc365fbbe37a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java @@ -16,13 +16,22 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.opentest4j.AssertionFailedError; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -198,6 +207,43 @@ private static void assertApiMessageAndVersionEquals( } } } + } else if (actual.message() instanceof GroupMetadataValue) { + GroupMetadataValue expectedValue = (GroupMetadataValue) expected.message().duplicate(); + GroupMetadataValue actualValue = (GroupMetadataValue) actual.message().duplicate(); + + expectedValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId)); + actualValue.members().sort(Comparator.comparing(GroupMetadataValue.MemberMetadata::memberId)); + try { + Arrays.asList(expectedValue, actualValue).forEach(value -> + value.members().forEach(memberMetadata -> { + // Sort topics and ownedPartitions in Subscription. + ConsumerPartitionAssignor.Subscription subscription = + ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberMetadata.subscription())); + subscription.topics().sort(String::compareTo); + subscription.ownedPartitions().sort( + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition) + ); + memberMetadata.setSubscription(Utils.toArray(ConsumerProtocol.serializeSubscription( + subscription, + ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.subscription())) + ))); + + // Sort partitions in Assignment. + ConsumerPartitionAssignor.Assignment assignment = + ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(memberMetadata.assignment())); + assignment.partitions().sort( + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition) + ); + memberMetadata.setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment( + assignment, + ConsumerProtocol.deserializeVersion(ByteBuffer.wrap(memberMetadata.assignment())) + ))); + }) + ); + } catch (SchemaException ex) { + fail("Failed deserialization: " + ex.getMessage()); + } + assertEquals(expectedValue, actualValue); } else { assertEquals(expected.message(), actual.message()); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 21fb536a43f5..83460c4e0d7a 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -68,6 +68,7 @@ import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.consumer.MemberState; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; @@ -110,6 +111,7 @@ import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError; +import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupJoinKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupRebalanceTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupSessionTimeoutKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT; @@ -10331,6 +10333,594 @@ public void testClassicGroupOnUnloadedCompletingRebalance() throws Exception { .setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get()); } + @Test + public void testLastConsumerProtocolMemberLeavingConsumerGroup() { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ) + )))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSupportedProtocols(protocols)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })); + + context.commit(); + ConsumerGroup consumerGroup = context.groupMetadataManager.consumerGroup(groupId); + + // Member 2 leaves the consumer group, triggering the downgrade. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + + byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + )))); + Map assignments = new HashMap() { + { + put(memberId1, assignment); + } + }; + + ClassicGroup expectedClassicGroup = new ClassicGroup( + new LogContext(), + groupId, + STABLE, + context.time, + context.metrics, + 10, + Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE), + Optional.ofNullable("range"), + Optional.ofNullable(memberId1), + Optional.of(context.time.milliseconds()) + ); + expectedClassicGroup.add( + new ClassicGroupMember( + memberId1, + Optional.ofNullable(member1.instanceId()), + member1.clientId(), + member1.clientHost(), + member1.rebalanceTimeoutMs(), + 45000, + ConsumerProtocol.PROTOCOL_TYPE, + member1.supportedJoinGroupRequestProtocols(), + assignment + ) + ); + + List expectedRecords = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId), + + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId), + RecordHelpers.newGroupEpochTombstoneRecord(groupId), + + RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()) + ); + + assertUnorderedListEquals(expectedRecords.subList(0, 2), result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), result.records().subList(2, 4)); + assertRecordEquals(expectedRecords.get(4), result.records().get(4)); + assertUnorderedListEquals(expectedRecords.subList(5, 7), result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(7, 9), result.records().subList(7, 9)); + + verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); + verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE); + + // The new classic member 1 has a heartbeat timeout. + ScheduledTimeout heartbeatTimeout = context.timer.timeout( + classicGroupHeartbeatKey(groupId, memberId1) + ); + assertNotNull(heartbeatTimeout); + // The new rebalance has a groupJoin timeout. + ScheduledTimeout groupJoinTimeout = context.timer.timeout( + classicGroupJoinKey(groupId) + ); + assertNotNull(groupJoinTimeout); + + // A new rebalance is triggered. + ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); + + // Simulate a failed write to the log. + result.appendFuture().completeExceptionally(new NotLeaderOrFollowerException()); + context.rollback(); + + // The group is reverted back to the consumer group. + assertEquals(consumerGroup, context.groupMetadataManager.consumerGroup(groupId)); + verify(context.metrics, times(1)).onClassicGroupStateTransition(PREPARING_REBALANCE, null); + } + + @Test + public void testLastConsumerProtocolMemberSessionTimeoutInConsumerGroup() { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ) + )))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSupportedProtocols(protocols)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(45000) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + } + })); + + context.commit(); + + // Session timer is scheduled on the heartbeat. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + + // Verify that there is a session timeout. + context.assertSessionTimeout(groupId, memberId2, 45000); + + // Advance time past the session timeout. + // Member 2 should be fenced from the group, thus triggering the downgrade. + MockCoordinatorTimer.ExpiredTimeout timeout = context.sleep(45000 + 1).get(0); + assertEquals(consumerGroupSessionTimeoutKey(groupId, memberId2), timeout.key); + + byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + )))); + Map assignments = new HashMap() { + { + put(memberId1, assignment); + } + }; + + ClassicGroup expectedClassicGroup = new ClassicGroup( + new LogContext(), + groupId, + STABLE, + context.time, + context.metrics, + 10, + Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE), + Optional.ofNullable("range"), + Optional.ofNullable(memberId1), + Optional.of(context.time.milliseconds()) + ); + expectedClassicGroup.add( + new ClassicGroupMember( + memberId1, + Optional.ofNullable(member1.instanceId()), + member1.clientId(), + member1.clientHost(), + member1.rebalanceTimeoutMs(), + 45000, + ConsumerProtocol.PROTOCOL_TYPE, + member1.supportedJoinGroupRequestProtocols(), + assignment + ) + ); + List expectedRecords = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId), + + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId), + RecordHelpers.newGroupEpochTombstoneRecord(groupId), + + RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()) + ); + + assertUnorderedListEquals(expectedRecords.subList(0, 2), timeout.result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), timeout.result.records().subList(2, 4)); + assertRecordEquals(expectedRecords.get(4), timeout.result.records().get(4)); + assertUnorderedListEquals(expectedRecords.subList(5, 7), timeout.result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(7, 9), timeout.result.records().subList(7, 9)); + + verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.STABLE, null); + verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE); + + // The new classic member 1 has a heartbeat timeout. + ScheduledTimeout heartbeatTimeout = context.timer.timeout( + classicGroupHeartbeatKey(groupId, memberId1) + ); + assertNotNull(heartbeatTimeout); + // The new rebalance has a groupJoin timeout. + ScheduledTimeout groupJoinTimeout = context.timer.timeout( + classicGroupJoinKey(groupId) + ); + assertNotNull(groupJoinTimeout); + + // A new rebalance is triggered. + ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); + } + + @Test + public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { + String groupId = "group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + Uuid zarTopicId = Uuid.randomUuid(); + String zarTopicName = "zar"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + List protocols = Collections.singletonList( + new ConsumerGroupMemberMetadataValue.ClassicProtocol() + .setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Arrays.asList(fooTopicName, barTopicName), + null, + Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + ) + )))) + ); + + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(30000) + .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() + .setSupportedProtocols(protocols)) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .build(); + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .setServerAssignorName("range") + .setRebalanceTimeoutMs(30000) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .build(); + + // Consumer group with two members. + // Member 1 uses the classic protocol and member 2 uses the consumer protocol. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(member1) + .withMember(member2) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1))) + .withAssignment(memberId2, mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) + .withAssignmentEpoch(10)) + .build(); + + context.replay(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6, mkMapOfPartitionRacks(6))); + put(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3, mkMapOfPartitionRacks(3))); + put(zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1, mkMapOfPartitionRacks(1))); + } + })); + + context.commit(); + + // Prepare the new assignment. + assignor.prepareGroupAssignment(new GroupAssignment( + new HashMap() { + { + put(memberId1, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2), + mkTopicAssignment(barTopicId, 0, 1) + ))); + put(memberId2, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 3, 4, 5) + ))); + } + } + )); + + // Member 2 heartbeats with a different subscribedTopicNames. The assignor computes a new assignment + // where member 2 will need to revoke topic partition bar-2 thus transitions to the REVOKING state. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(10) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(3, 4, 5)), + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(barTopicId) + .setPartitions(Arrays.asList(2)) + )) + ); + + // Verify that there is a rebalance timeout. + context.assertRebalanceTimeout(groupId, memberId2, 30000); + + // Advance time past the session timeout. + // Member 2 should be fenced from the group, thus triggering the downgrade. + MockCoordinatorTimer.ExpiredTimeout timeout = context.sleep(30000 + 1).get(0); + assertEquals(consumerGroupRebalanceTimeoutKey(groupId, memberId2), timeout.key); + + byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Arrays.asList( + new TopicPartition(fooTopicName, 0), + new TopicPartition(fooTopicName, 1), + new TopicPartition(fooTopicName, 2), + new TopicPartition(barTopicName, 0), + new TopicPartition(barTopicName, 1) + )))); + Map assignments = new HashMap() { + { + put(memberId1, assignment); + } + }; + + ClassicGroup expectedClassicGroup = new ClassicGroup( + new LogContext(), + groupId, + STABLE, + context.time, + context.metrics, + 11, + Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE), + Optional.ofNullable("range"), + Optional.ofNullable(memberId1), + Optional.of(context.time.milliseconds()) + ); + expectedClassicGroup.add( + new ClassicGroupMember( + memberId1, + Optional.ofNullable(member1.instanceId()), + member1.clientId(), + member1.clientHost(), + member1.rebalanceTimeoutMs(), + 45000, + ConsumerProtocol.PROTOCOL_TYPE, + member1.supportedJoinGroupRequestProtocols(), + assignment + ) + ); + + List expectedRecords = Arrays.asList( + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, memberId2), + + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId1), + RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, memberId2), + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId), + + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId1), + RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId2), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId), + RecordHelpers.newGroupEpochTombstoneRecord(groupId), + + RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, assignments, MetadataVersion.latestTesting()) + ); + + assertUnorderedListEquals(expectedRecords.subList(0, 2), timeout.result.records().subList(0, 2)); + assertUnorderedListEquals(expectedRecords.subList(2, 4), timeout.result.records().subList(2, 4)); + assertRecordEquals(expectedRecords.get(4), timeout.result.records().get(4)); + assertUnorderedListEquals(expectedRecords.subList(5, 7), timeout.result.records().subList(5, 7)); + assertRecordsEquals(expectedRecords.subList(7, 9), timeout.result.records().subList(7, 9)); + + verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.RECONCILING, null); + verify(context.metrics, times(1)).onClassicGroupStateTransition(null, STABLE); + + // The new classic member 1 has a heartbeat timeout. + ScheduledTimeout heartbeatTimeout = context.timer.timeout( + classicGroupHeartbeatKey(groupId, memberId1) + ); + assertNotNull(heartbeatTimeout); + // The new rebalance has a groupJoin timeout. + ScheduledTimeout groupJoinTimeout = context.timer.timeout( + classicGroupJoinKey(groupId) + ); + assertNotNull(groupJoinTimeout); + + // A new rebalance is triggered. + ClassicGroup classicGroup = context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false); + assertTrue(classicGroup.isInState(PREPARING_REBALANCE)); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index d60ec65ae86c..55596fa46b11 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -528,7 +528,9 @@ public CoordinatorResult consumerGro request ); - result.records().forEach(this::replay); + if (result.appendFuture() == null) { + result.records().forEach(this::replay); + } return result; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 8a25a752ccf5..e265d1541416 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -1111,7 +1111,7 @@ public void testClassicMembersSupportedProtocols() { } @Test - public void testAllMembersUseClassicProtocol() { + public void testNumClassicProtocolMembers() { ConsumerGroup consumerGroup = createConsumerGroup("foo"); List protocols = new ArrayList<>(); protocols.add(new ConsumerGroupMemberMetadataValue.ClassicProtocol() @@ -1125,27 +1125,30 @@ public void testAllMembersUseClassicProtocol() { .build(); consumerGroup.updateMember(member1); assertEquals(1, consumerGroup.numClassicProtocolMembers()); - assertTrue(consumerGroup.allMembersUseClassicProtocol()); // The group has member 1 (using the classic protocol) and member 2 (using the consumer protocol). ConsumerGroupMember member2 = new ConsumerGroupMember.Builder("member-2") .build(); consumerGroup.updateMember(member2); assertEquals(1, consumerGroup.numClassicProtocolMembers()); - assertFalse(consumerGroup.allMembersUseClassicProtocol()); + assertFalse(consumerGroup.allMembersUseClassicProtocolExcept("member-1")); + assertTrue(consumerGroup.allMembersUseClassicProtocolExcept("member-2")); - // The group has member 2 (using the consumer protocol). + // The group has member 2 (using the consumer protocol) and member 3 (using the consumer protocol). consumerGroup.removeMember(member1.memberId()); + ConsumerGroupMember member3 = new ConsumerGroupMember.Builder("member-3") + .build(); + consumerGroup.updateMember(member3); assertEquals(0, consumerGroup.numClassicProtocolMembers()); - assertFalse(consumerGroup.allMembersUseClassicProtocol()); + assertFalse(consumerGroup.allMembersUseClassicProtocolExcept("member-2")); // The group has member 2 (using the classic protocol). + consumerGroup.removeMember(member2.memberId()); member2 = new ConsumerGroupMember.Builder("member-2") .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() .setSupportedProtocols(protocols)) .build(); consumerGroup.updateMember(member2); assertEquals(1, consumerGroup.numClassicProtocolMembers()); - assertTrue(consumerGroup.allMembersUseClassicProtocol()); } }