From ca852350b9452d8411c8ba898fb0c1829f5f917e Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 10:44:09 +0530 Subject: [PATCH 1/8] KAFKA-19891: Bump group epoch when member regex subscription transitions from non-empty to empty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch fixes an issue in where was incorrectly set to when a member’s regex subscription transitions from non-empty to empty. Because does not trigger a group epoch bump in , the group metadata could remain stale. The fix updates the method to return in this case, ensuring the group epoch is correctly incremented. The patch also updates tests that relied on the previous behavior and were failing due to the corrected epoch bump logic. --- .../group/GroupMetadataManager.java | 4 + .../group/GroupMetadataManagerTest.java | 322 ++++++++++-------- 2 files changed, 185 insertions(+), 141 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 946fe7f0f24db..2be99d3867f25 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3307,6 +3307,10 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( // We also trigger a refresh of the regexes in order to resolve it. throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); requireRefresh = true; + + if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) != 0) { + updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; + } } else { // If the new regex is already resolved, we trigger a rebalance // by bumping the group epoch. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ef90ba0b63122..daad96fae1ef0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -1464,8 +1464,8 @@ public void testStaticMemberGetsBackAssignmentUponRejoin() { .setSubscribedTopicNames(List.of("foo", "bar")) .setServerAssignorName("range") .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 3, 4, 5), - mkTopicAssignment(barTopicId, 2))) + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) .build(); MetadataImage metadataImage = new MetadataImageBuilder() @@ -2611,17 +2611,17 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) - .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1), - mkTopicAssignment(barTopicId, 0))) - .setPartitionsPendingRevocation(mkAssignment( - mkTopicAssignment(fooTopicId, 2), - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 2), + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -2654,16 +2654,16 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) - .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 3), - mkTopicAssignment(barTopicId, 2))) - .setPartitionsPendingRevocation(mkAssignment( - mkTopicAssignment(fooTopicId, 4, 5))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3), + mkTopicAssignment(barTopicId, 2))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5))) + .build())), result.records() ); @@ -2686,11 +2686,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.UNRELEASED_PARTITIONS) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .build())), result.records() ); @@ -2722,14 +2722,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1), - mkTopicAssignment(barTopicId, 0))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0))) + .build())), result.records() ); @@ -2775,13 +2775,13 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.UNRELEASED_PARTITIONS) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -2841,14 +2841,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 2, 3), - mkTopicAssignment(barTopicId, 2))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 2))) + .build())), result.records() ); @@ -2883,14 +2883,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 4, 5), - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -4862,8 +4862,8 @@ public void testStaticMembersJoinGroupWithMaxSize() { // First round of join requests. This will trigger a rebalance. List firstRoundJoinResults = groupInstanceIds.stream() - .map(instanceId -> context.sendClassicGroupJoin(request.setGroupInstanceId(instanceId))) - .toList(); + .map(instanceId -> context.sendClassicGroupJoin(request.setGroupInstanceId(instanceId))) + .toList(); assertEquals(groupMaxSize, group.numMembers()); assertEquals(groupMaxSize, group.numAwaitingJoinResponse()); @@ -4881,7 +4881,7 @@ public void testStaticMembersJoinGroupWithMaxSize() { // Members which were accepted can rejoin, others are rejected, while // completing rebalance - List secondRoundJoinResults = IntStream.range(0, groupMaxSize + 1).mapToObj(i -> context.sendClassicGroupJoin( + List secondRoundJoinResults = IntStream.range(0, groupMaxSize + 1).mapToObj(i -> context.sendClassicGroupJoin( request .setMemberId(memberIds.get(i)) .setGroupInstanceId(groupInstanceIds.get(i)) @@ -4911,9 +4911,9 @@ public void testDynamicMembersCanRejoinGroupWithMaxSizeWhileRebalancing() { .build(); // First round of join requests. Generate member ids. - List firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) - .mapToObj(__ -> context.sendClassicGroupJoin(request, requiredKnownMemberId)) - .toList(); + List firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) + .mapToObj(__ -> context.sendClassicGroupJoin(request, requiredKnownMemberId)) + .toList(); assertEquals(0, group.numMembers()); assertEquals(groupMaxSize + 1, group.numPendingJoinMembers()); @@ -15678,7 +15678,7 @@ public void testShareGroupMemberJoinsEmptyGroupWithAssignments() { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build(); - + CoordinatorMetadataImage coordinatorMetadataImage = new KRaftCoordinatorMetadataImage(image); MetadataDelta delta = new MetadataDelta.Builder() @@ -16340,7 +16340,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 2, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2))) - + .build(); List expectedRecords = List.of( @@ -17207,7 +17207,7 @@ barTopicName, computeTopicHash(barTopicName, oldMetadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, newMetadataImage), barTopicName, computeTopicHash(barTopicName, newMetadataImage) - )), 0, Map.of("num.standby.replicas", "0")), + )), 0, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), @@ -20295,16 +20295,16 @@ public void testClassicGroupLeaveOnShareGroup() throws Exception { .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setSubscribedTopicNames(List.of("foo")) .build()) - .withAssignment(memberId, mkAssignment()) - .withAssignmentEpoch(1)) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) .build(); assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave( new LeaveGroupRequestData() - .setGroupId(groupId) - .setMembers(List.of( - new MemberIdentity() - .setMemberId(memberId))))); + .setGroupId(groupId) + .setMembers(List.of( + new MemberIdentity() + .setMemberId(memberId))))); } @Test @@ -21026,6 +21026,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { .buildCoordinatorMetadataImage(12345L); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) .withMetadataImage(metadataImage) @@ -21061,7 +21062,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(10) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of()) @@ -21071,7 +21072,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(10) + .setMemberEpoch(11) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -21085,6 +21086,9 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), // The previous regular expression is deleted. GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), // The member assignment is updated. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); @@ -21110,7 +21114,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) ))) @@ -21139,7 +21143,7 @@ memberId1, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(11) + .setMemberEpoch(12) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -21154,8 +21158,8 @@ memberId1, new MemberAssignmentImpl(mkAssignment( ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) + .setMemberEpoch(12) + .setPreviousMemberEpoch(11) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21171,7 +21175,7 @@ memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) )), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ); @@ -21260,15 +21264,15 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000), result.response() ); expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(1) - .setPreviousMemberEpoch(0) + .setMemberEpoch(2) + .setPreviousMemberEpoch(1) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21280,7 +21284,10 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom // The member subscription is updated. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), // The previous regex is deleted. - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); assertRecordsEquals(expectedRecords, result.records()); @@ -21303,13 +21310,13 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom new ConsumerGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setSubscribedTopicRegex("foo*|bar*")); assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000), result.response() ); @@ -21336,7 +21343,7 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) ))) @@ -21802,6 +21809,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() long barTopicHash = computeTopicHash(barTopicName, metadataImage); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); Authorizer authorizer = mock(Authorizer.class); Plugin authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -21872,7 +21880,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(10) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), @@ -21881,7 +21889,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(10) + .setMemberEpoch(11) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -21889,13 +21897,20 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() .setSubscribedTopicRegex("foo*|bar*") .setServerAssignorName("range") .build(); + long metadataHash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); - assertRecordsEquals( - List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) - ), + List> expectedRecord = List.of( + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, metadataHash)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Assignment.EMPTY.partitions()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Assignment.EMPTY.partitions())), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) + ); + + assertUnorderedRecordsEquals( + expectedRecord, result1.records() ); @@ -21915,7 +21930,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, fooTopicHash ))) )) @@ -21953,8 +21968,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) + .setMemberEpoch(13) + .setPreviousMemberEpoch(11) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21966,20 +21981,27 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(13) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), result2.response() ); + long hash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); - assertRecordsEquals( - List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) - ), + List> records = List.of( + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*")), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 13, hash)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 3, 4, 5)))), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 13)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) + + ); + + assertUnorderedRecordsEquals( + records, result2.records() ); @@ -21996,7 +22018,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 14, computeGroupHash(Map.of( fooTopicName, fooTopicHash, barTopicName, barTopicHash ))) @@ -22022,8 +22044,9 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { .buildCoordinatorMetadataImage(12345L); long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); long barTopicHash = computeTopicHash(barTopicName, metadataImage); - + GroupAssignment groupAssignment = new GroupAssignment(Map.of()); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(groupAssignment); Authorizer authorizer = mock(Authorizer.class); Plugin authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -22095,7 +22118,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(10) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), @@ -22104,7 +22127,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(10) + .setMemberEpoch(11) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -22113,16 +22136,21 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { .setServerAssignorName("range") .build(); - assertRecordsEquals( - List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) - ), + long metadataHash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); + + List> list = List.of( + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, metadataHash)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Assignment.EMPTY.partitions()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Assignment.EMPTY.partitions())), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) + ); + assertUnorderedRecordsEquals( + list, result1.records() ); - - // Execute pending tasks. assertEquals( List.of( new MockCoordinatorExecutor.ExecutorResult<>( @@ -22138,7 +22166,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, fooTopicHash ))) )) @@ -22176,8 +22204,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) + .setMemberEpoch(12) + .setPreviousMemberEpoch(11) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -22190,7 +22218,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(12) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -22200,11 +22228,17 @@ memberId2, new MemberAssignmentImpl(mkAssignment( result2.response() ); - assertRecordsEquals( + List> list1 = List.of( List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 3, 4, 5))) ), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) + + ); + assertUnorderedRecordsEquals( + list1, result2.records() ); @@ -22221,7 +22255,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 13, computeGroupHash(Map.of( fooTopicName, fooTopicHash, barTopicName, barTopicHash ))) @@ -22230,6 +22264,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( ); } + @Test public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { String groupId = "fooup"; @@ -22248,6 +22283,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { .buildCoordinatorMetadataImage(12345L); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) .withMetadataImage(metadataImage) @@ -22304,7 +22340,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(10) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of()) @@ -22330,7 +22366,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) .setInstanceId(instanceId) - .setMemberEpoch(10) + .setMemberEpoch(11) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -22347,14 +22383,18 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { // The previous member is replaced by the new one. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), - mkTopicAssignment(barTopicId, 0, 1, 2) + mkTopicAssignment(barTopicId, 0, 1, 2), + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) )), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember), // The member subscription is updated. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), - // The previous regular expression is deleted. + // The previous regular expression is deleted. add GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + // The member assignment is updated. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); @@ -22380,7 +22420,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) ))) ), @@ -22399,7 +22439,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( .setGroupId(groupId) .setInstanceId(instanceId) .setMemberId(memberId2) - .setMemberEpoch(10) + .setMemberEpoch(11) .setRebalanceTimeoutMs(5000) .setSubscribedTopicRegex("foo*") .setServerAssignor("range") @@ -22408,7 +22448,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(12) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -22421,8 +22461,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) .setInstanceId(instanceId) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) + .setMemberEpoch(12) + .setPreviousMemberEpoch(11) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -22436,7 +22476,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) )), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ); @@ -24595,9 +24635,9 @@ private static void checkJoinGroupResponse( assertTrue(group.isInState(expectedState)); Set groupInstanceIds = actualResponse.members() - .stream() - .map(JoinGroupResponseMember::groupInstanceId) - .collect(Collectors.toSet()); + .stream() + .map(JoinGroupResponseMember::groupInstanceId) + .collect(Collectors.toSet()); assertEquals(expectedGroupInstanceIds, groupInstanceIds); } From f5e7ccdc832989f3e5535c3ac32698a5cd5e0be8 Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 10:53:31 +0530 Subject: [PATCH 2/8] Follow-up: revert unintended whitespace changes introduced in KAFKA-19891 patch --- .../group/GroupMetadataManagerTest.java | 128 +++++++++--------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index daad96fae1ef0..5a2277f6b1372 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -2611,17 +2611,17 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) - .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1), - mkTopicAssignment(barTopicId, 0))) - .setPartitionsPendingRevocation(mkAssignment( - mkTopicAssignment(fooTopicId, 2), - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 2), + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -2654,16 +2654,16 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) - .setState(MemberState.UNREVOKED_PARTITIONS) - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 3), - mkTopicAssignment(barTopicId, 2))) - .setPartitionsPendingRevocation(mkAssignment( - mkTopicAssignment(fooTopicId, 4, 5))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 3), + mkTopicAssignment(barTopicId, 2))) + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5))) + .build())), result.records() ); @@ -2686,11 +2686,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.UNRELEASED_PARTITIONS) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .build())), result.records() ); @@ -2722,14 +2722,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 0, 1), - mkTopicAssignment(barTopicId, 0))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1), + mkTopicAssignment(barTopicId, 0))) + .build())), result.records() ); @@ -2775,13 +2775,13 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.UNRELEASED_PARTITIONS) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -2841,14 +2841,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(10) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 2, 3), - mkTopicAssignment(barTopicId, 2))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 2, 3), + mkTopicAssignment(barTopicId, 2))) + .build())), result.records() ); @@ -2883,14 +2883,14 @@ memberId3, new MemberAssignmentImpl(mkAssignment( ); assertRecordsEquals(List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) - .setState(MemberState.STABLE) - .setMemberEpoch(11) - .setPreviousMemberEpoch(11) - .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 4, 5), - mkTopicAssignment(barTopicId, 1))) - .build())), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, new ConsumerGroupMember.Builder(memberId3) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 4, 5), + mkTopicAssignment(barTopicId, 1))) + .build())), result.records() ); @@ -17207,7 +17207,7 @@ barTopicName, computeTopicHash(barTopicName, oldMetadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, newMetadataImage), barTopicName, computeTopicHash(barTopicName, newMetadataImage) - )), 0, Map.of("num.standby.replicas", "0")), + )), 0, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), @@ -20295,16 +20295,16 @@ public void testClassicGroupLeaveOnShareGroup() throws Exception { .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setSubscribedTopicNames(List.of("foo")) .build()) - .withAssignment(memberId, mkAssignment()) - .withAssignmentEpoch(1)) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) .build(); assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave( new LeaveGroupRequestData() - .setGroupId(groupId) - .setMembers(List.of( - new MemberIdentity() - .setMemberId(memberId))))); + .setGroupId(groupId) + .setMembers(List.of( + new MemberIdentity() + .setMemberId(memberId))))); } @Test From 2d8f6dd0b1cf4a3a4cc187cb322353bc270348e3 Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 19:02:49 +0530 Subject: [PATCH 3/8] KAFKA-19899: Return REGEX_UPDATED_AND_RESOLVED when newSubscribedTopicRegex is empty This patch corrects the return value in for the case where a member's becomes empty. Previously, the branch handling this condition returned , which does not trigger a group epoch bump during . As a result, the coordinator could retain stale group metadata when a member unsubscribed from all regex-based topics. The fix updates the method to return specifically when the new subscribed regex string is empty, ensuring that the coordinator clears the member's assignment and correctly bumps the group epoch. --- .../kafka/coordinator/group/GroupMetadataManager.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 2be99d3867f25..719ebb88358cf 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3308,9 +3308,6 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex()); requireRefresh = true; - if (isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) != 0) { - updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; - } } else { // If the new regex is already resolved, we trigger a rebalance // by bumping the group epoch. @@ -3318,7 +3315,8 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; } } - } + } else + updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; } // Conditions to trigger a refresh: From 20ed621a4d2e8377e20862009c42a688130ca076 Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 19:21:46 +0530 Subject: [PATCH 4/8] KAFKA-19899: Restore tests to original expected behavior after correcting regex empty-branch logic This commit reverts test modifications that were made under the earlier, incorrect assumption about the behavior when becomes empty. Now that the fix applies to the correct branch, the tests no longer need their previous adjustments, and their original expectations are valid again. --- .../group/GroupMetadataManagerTest.java | 190 +++++++----------- 1 file changed, 75 insertions(+), 115 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 5a2277f6b1372..8dd910dcfceed 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -1464,8 +1464,8 @@ public void testStaticMemberGetsBackAssignmentUponRejoin() { .setSubscribedTopicNames(List.of("foo", "bar")) .setServerAssignorName("range") .setAssignedPartitions(mkAssignment( - mkTopicAssignment(fooTopicId, 3, 4, 5), - mkTopicAssignment(barTopicId, 2))) + mkTopicAssignment(fooTopicId, 3, 4, 5), + mkTopicAssignment(barTopicId, 2))) .build(); MetadataImage metadataImage = new MetadataImageBuilder() @@ -4862,8 +4862,8 @@ public void testStaticMembersJoinGroupWithMaxSize() { // First round of join requests. This will trigger a rebalance. List firstRoundJoinResults = groupInstanceIds.stream() - .map(instanceId -> context.sendClassicGroupJoin(request.setGroupInstanceId(instanceId))) - .toList(); + .map(instanceId -> context.sendClassicGroupJoin(request.setGroupInstanceId(instanceId))) + .toList(); assertEquals(groupMaxSize, group.numMembers()); assertEquals(groupMaxSize, group.numAwaitingJoinResponse()); @@ -4881,7 +4881,7 @@ public void testStaticMembersJoinGroupWithMaxSize() { // Members which were accepted can rejoin, others are rejected, while // completing rebalance - List secondRoundJoinResults = IntStream.range(0, groupMaxSize + 1).mapToObj(i -> context.sendClassicGroupJoin( + List secondRoundJoinResults = IntStream.range(0, groupMaxSize + 1).mapToObj(i -> context.sendClassicGroupJoin( request .setMemberId(memberIds.get(i)) .setGroupInstanceId(groupInstanceIds.get(i)) @@ -4911,9 +4911,9 @@ public void testDynamicMembersCanRejoinGroupWithMaxSizeWhileRebalancing() { .build(); // First round of join requests. Generate member ids. - List firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) - .mapToObj(__ -> context.sendClassicGroupJoin(request, requiredKnownMemberId)) - .toList(); + List firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) + .mapToObj(__ -> context.sendClassicGroupJoin(request, requiredKnownMemberId)) + .toList(); assertEquals(0, group.numMembers()); assertEquals(groupMaxSize + 1, group.numPendingJoinMembers()); @@ -21026,7 +21026,6 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { .buildCoordinatorMetadataImage(12345L); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) .withMetadataImage(metadataImage) @@ -21062,7 +21061,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(11) + .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of()) @@ -21072,7 +21071,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(11) + .setMemberEpoch(10) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -21086,9 +21085,6 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), // The previous regular expression is deleted. GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), // The member assignment is updated. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); @@ -21114,7 +21110,7 @@ public void testConsumerGroupMemberJoinsWithUpdatedRegex() { ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) ))) @@ -21143,7 +21139,7 @@ memberId1, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(12) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -21158,8 +21154,8 @@ memberId1, new MemberAssignmentImpl(mkAssignment( ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(12) - .setPreviousMemberEpoch(11) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21175,7 +21171,7 @@ memberId1, new MemberAssignmentImpl(mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) )), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ); @@ -21264,15 +21260,15 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(2) + .setMemberEpoch(1) .setHeartbeatIntervalMs(5000), result.response() ); expectedMember1 = new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) - .setMemberEpoch(2) - .setPreviousMemberEpoch(1) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21284,10 +21280,7 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom // The member subscription is updated. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), // The previous regex is deleted. - GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), - GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*") ); assertRecordsEquals(expectedRecords, result.records()); @@ -21310,13 +21303,13 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom new ConsumerGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(2) + .setMemberEpoch(1) .setSubscribedTopicRegex("foo*|bar*")); assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(2) + .setMemberEpoch(1) .setHeartbeatIntervalMs(5000), result.response() ); @@ -21343,7 +21336,7 @@ public void testConsumerGroupMemberJoinsWithRegexAndUpdatesItBeforeResolutionCom ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) ))) @@ -21809,7 +21802,6 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() long barTopicHash = computeTopicHash(barTopicName, metadataImage); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); Authorizer authorizer = mock(Authorizer.class); Plugin authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -21880,7 +21872,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), @@ -21889,7 +21881,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(11) + .setMemberEpoch(10) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -21897,20 +21889,13 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() .setSubscribedTopicRegex("foo*|bar*") .setServerAssignorName("range") .build(); - long metadataHash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); - - List> expectedRecord = List.of( - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, metadataHash)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Assignment.EMPTY.partitions()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Assignment.EMPTY.partitions())), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) - ); - assertUnorderedRecordsEquals( - expectedRecord, + assertRecordsEquals( + List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ), result1.records() ); @@ -21930,7 +21915,7 @@ public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, fooTopicHash ))) )) @@ -21968,8 +21953,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(13) - .setPreviousMemberEpoch(11) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -21981,27 +21966,20 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(13) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), result2.response() ); - long hash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); - - List> records = List.of( - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*")), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 13, hash)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 3, 4, 5)))), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 13)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) - ); - - assertUnorderedRecordsEquals( - records, + assertRecordsEquals( + List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ), result2.records() ); @@ -22018,7 +21996,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 14, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, fooTopicHash, barTopicName, barTopicHash ))) @@ -22044,9 +22022,8 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { .buildCoordinatorMetadataImage(12345L); long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); long barTopicHash = computeTopicHash(barTopicName, metadataImage); - GroupAssignment groupAssignment = new GroupAssignment(Map.of()); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(groupAssignment); Authorizer authorizer = mock(Authorizer.class); Plugin authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -22118,7 +22095,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of())), @@ -22127,7 +22104,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(11) + .setMemberEpoch(10) .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -22136,21 +22113,16 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { .setServerAssignorName("range") .build(); - long metadataHash = computeGroupHash(Map.of(fooTopicName, fooTopicHash)); - - List> list = List.of( - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, metadataHash)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Assignment.EMPTY.partitions()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Assignment.EMPTY.partitions())), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) - ); - assertUnorderedRecordsEquals( - list, + assertRecordsEquals( + List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) + ), result1.records() ); + + // Execute pending tasks. assertEquals( List.of( new MockCoordinatorExecutor.ExecutorResult<>( @@ -22166,7 +22138,7 @@ public void testConsumerGroupMemberJoinsRefreshTopicAuthorization() { context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, fooTopicHash ))) )) @@ -22204,8 +22176,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) - .setMemberEpoch(12) - .setPreviousMemberEpoch(11) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -22218,7 +22190,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(12) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -22228,17 +22200,11 @@ memberId2, new MemberAssignmentImpl(mkAssignment( result2.response() ); - List> list1 = List.of( + assertRecordsEquals( List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 3, 4, 5))) + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)) - - ); - assertUnorderedRecordsEquals( - list1, result2.records() ); @@ -22255,7 +22221,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 13, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( fooTopicName, fooTopicHash, barTopicName, barTopicHash ))) @@ -22264,7 +22230,6 @@ memberId2, new MemberAssignmentImpl(mkAssignment( ); } - @Test public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { String groupId = "fooup"; @@ -22283,7 +22248,6 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { .buildCoordinatorMetadataImage(12345L); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); - assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) .withMetadataImage(metadataImage) @@ -22340,7 +22304,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(10) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of()) @@ -22366,7 +22330,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) .setInstanceId(instanceId) - .setMemberEpoch(11) + .setMemberEpoch(10) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -22383,18 +22347,14 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { // The previous member is replaced by the new one. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( - mkTopicAssignment(barTopicId, 0, 1, 2), - mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), + mkTopicAssignment(barTopicId, 0, 1, 2) )), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember), // The member subscription is updated. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), - // The previous regular expression is deleted. add + // The previous regular expression is deleted. GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*|bar*"), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), - // The member assignment is updated. GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) ); @@ -22420,7 +22380,7 @@ public void testStaticConsumerGroupMemberJoinsWithUpdatedRegex() { ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) ))) ), @@ -22439,7 +22399,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( .setGroupId(groupId) .setInstanceId(instanceId) .setMemberId(memberId2) - .setMemberEpoch(11) + .setMemberEpoch(10) .setRebalanceTimeoutMs(5000) .setSubscribedTopicRegex("foo*") .setServerAssignor("range") @@ -22448,7 +22408,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( assertResponseEquals( new ConsumerGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(12) + .setMemberEpoch(11) .setHeartbeatIntervalMs(5000) .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List.of( @@ -22461,8 +22421,8 @@ memberId2, new MemberAssignmentImpl(mkAssignment( ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId2) .setState(MemberState.STABLE) .setInstanceId(instanceId) - .setMemberEpoch(12) - .setPreviousMemberEpoch(11) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) .setRebalanceTimeoutMs(5000) @@ -22476,7 +22436,7 @@ memberId2, new MemberAssignmentImpl(mkAssignment( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) )), - GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 12), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) ); @@ -24635,9 +24595,9 @@ private static void checkJoinGroupResponse( assertTrue(group.isInState(expectedState)); Set groupInstanceIds = actualResponse.members() - .stream() - .map(JoinGroupResponseMember::groupInstanceId) - .collect(Collectors.toSet()); + .stream() + .map(JoinGroupResponseMember::groupInstanceId) + .collect(Collectors.toSet()); assertEquals(expectedGroupInstanceIds, groupInstanceIds); } From e2abaaa853df45e7134c7e2c0d4353f1bf6b3e0f Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 20:03:23 +0530 Subject: [PATCH 5/8] KAFKA-19891: Remove trailing whitespace introduced during earlier changes --- .../kafka/coordinator/group/GroupMetadataManagerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 8dd910dcfceed..ef90ba0b63122 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -15678,7 +15678,7 @@ public void testShareGroupMemberJoinsEmptyGroupWithAssignments() { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build(); - + CoordinatorMetadataImage coordinatorMetadataImage = new KRaftCoordinatorMetadataImage(image); MetadataDelta delta = new MetadataDelta.Builder() @@ -16340,7 +16340,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) .setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, 2, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2))) - + .build(); List expectedRecords = List.of( From 42d4276c8ac110701ce3243b7214779d902693ec Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Fri, 28 Nov 2025 23:47:30 +0530 Subject: [PATCH 6/8] KAFKA-19899: Return REGEX_UPDATED_AND_RESOLVED when newSubscribedTopicRegex is empty This patch corrects the return value in for the case where a member's becomes empty. Previously, the branch handling this condition returned , which does not trigger a group epoch bump during . As a result, the coordinator could retain stale group metadata when a member unsubscribed from all regex-based topics. The fix updates the method to return REGEX_UPDATED_AND_RESOLVED. --- .../apache/kafka/coordinator/group/GroupMetadataManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 719ebb88358cf..055d027f64b37 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3315,8 +3315,9 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions( updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; } } - } else + } else if (isNotEmpty(oldSubscribedTopicRegex)) { updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED; + } } // Conditions to trigger a refresh: From 339a299a44e7f6f32e3fdaa41396d84b70d266f6 Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Sat, 29 Nov 2025 15:46:17 +0530 Subject: [PATCH 7/8] Add test coverage for group epoch bump when member regex becomes empty --- .../group/GroupMetadataManagerTest.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index ef90ba0b63122..19439cf885b6c 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21009,6 +21009,90 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) tasks ); } + + @Test + public void testConsumerGroupMemberJoinsWithEmptyRegex() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + Uuid barTopicId = Uuid.randomUuid(); + String barTopicName = "bar"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .buildCoordinatorMetadataImage(12345L); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("foo*") + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + // Member 1 updates its new regular expression. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(10) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("") + .setServerAssignor("range") + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of()) + ), + result.response() + ); + + ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicRegex("") + .setServerAssignorName("range") + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), + // previous expression is deleted + GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } @Test public void testConsumerGroupMemberJoinsWithUpdatedRegex() { From 105dc854c57878f8e9c388e7f19efb1e3b98861e Mon Sep 17 00:00:00 2001 From: AntonVasant Date: Sat, 29 Nov 2025 15:49:53 +0530 Subject: [PATCH 8/8] Add test coverage for group epoch bump when member regex becomes empty --- .../kafka/coordinator/group/GroupMetadataManagerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 19439cf885b6c..4acfd025f7145 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -21011,7 +21011,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) } @Test - public void testConsumerGroupMemberJoinsWithEmptyRegex() { + public void testConsumerGroupMemberJoinsWithNonEmptyRegexToEmptyRegex() { String groupId = "fooup"; String memberId1 = Uuid.randomUuid().toString();