diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 946fe7f0f24db..055d027f64b37 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3307,6 +3307,7 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( // We also trigger a refresh of the regexes in order to resolve it. throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); requireRefresh = true; + } else { // If the new regex is already resolved, we trigger a rebalance // by bumping the group epoch. @@ -3314,6 +3315,8 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; } } + } else if (isNotEmpty(oldSubscribedTopicRegex)) { + updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ef90ba0b63122..4acfd025f7145 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21009,6 +21009,90 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) tasks ); } + + @Test + public void testConsumerGroupMemberJoinsWithNonEmptyRegexToEmptyRegex() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .buildCoordinatorMetadataImage(12345L); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + // Member 1 updates its new regular expression. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("") + .setServerAssignor("range") + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of()) + ), + result.response() + ); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // previous expression is deleted + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } @Test public void testConsumerGroupMemberJoinsWithUpdatedRegex() {