Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -3307,13 +3307,16 @@ private UpdateRegularExpressionsResult maybeUpdateRegularExpressions(
// We also trigger a refresh of the regexes in order to resolve it.
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
requireRefresh = true;

} else {
// If the new regex is already resolved, we trigger a rebalance
// by bumping the group epoch.
if (group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent()) {
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
}
}
} else if (isNotEmpty(oldSubscribedTopicRegex)) {
updateRegularExpressionsResult = UpdateRegularExpressionsResult.REGEX_UPDATED_AND_RESOLVED;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21009,6 +21009,90 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
tasks
);
}

@Test
public void testConsumerGroupMemberJoinsWithNonEmptyRegexToEmptyRegex() {
String groupId = "fooup";
String memberId1 = Uuid.randomUuid().toString();

Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";

CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.buildCoordinatorMetadataImage(12345L);

MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withMetadataImage(metadataImage)
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
.withAssignmentEpoch(10))
.build();

// Member 1 updates its new regular expression.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("")
.setServerAssignor("range")
.setTopicPartitions(List.of()));

assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of())
),
result.response()
);

ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("")
.setServerAssignorName("range")
.build();

List<CoordinatorRecord> expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
// previous expression is deleted
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, Map.of()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1)
);

assertRecordsEquals(expectedRecords, result.records());
}

@Test
public void testConsumerGroupMemberJoinsWithUpdatedRegex() {
Expand Down