Skip to content

Commit

Permalink
KAFKA-16277: AbstractStickyAssignor - Sort owned TopicPartitions by p…
Browse files Browse the repository at this point in the history
…artition when reassigning (#15416)

Treats KAFKA-16277 - CooperativeStickyAssignor does not spread topics evenly among consumer group

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
  • Loading branch information
credpath-seek committed Feb 26, 2024
1 parent 9bc9fae commit 027fad4
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ private void assignOwnedPartitions() {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue().stream()
.filter(tp -> !rackInfo.racksMismatch(consumer, tp))
.sorted(Comparator.comparing(TopicPartition::partition).thenComparing(TopicPartition::topic))
.collect(Collectors.toList());

List<TopicPartition> consumerAssignment = assignment.get(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,59 @@ public void testAddRemoveConsumerOneTopic(RackConfig rackConfig) {
assertTrue(isFullyBalanced(assignment));
}

@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
@EnumSource(RackConfig.class)
public void testTopicBalanceAfterReassignment(RackConfig rackConfig) {
initializeRacks(rackConfig);
List<String> allTopics = topics(topic1, topic2);
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic1, partitionInfos(topic1, 12));
partitionsPerTopic.put(topic2, partitionInfos(topic2, 12));
subscriptions.put(consumer1, subscription(allTopics, 0));
Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assignment.forEach((consumer, tps) -> assertEquals(12, tps.stream().filter(tp -> tp.topic().equals(topic1)).count()));
assignment.forEach((consumer, tps) -> assertEquals(12, tps.stream().filter(tp -> tp.topic().equals(topic2)).count()));
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
assertTrue(isFullyBalanced(assignment));

// Add another consumer
subscriptions.put(consumer1, buildSubscriptionV2Above(allTopics, assignment.get(consumer1), generationId, 0));
subscriptions.put(consumer2, buildSubscriptionV2Above(allTopics, Collections.emptyList(), generationId, 1));
assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assignment.forEach((consumer, tps) -> assertEquals(6, tps.stream().filter(tp -> tp.topic().equals(topic1)).count()));
assignment.forEach((consumer, tps) -> assertEquals(6, tps.stream().filter(tp -> tp.topic().equals(topic2)).count()));
assertTrue(isFullyBalanced(assignment));

// Add two more consumers
subscriptions.put(consumer1, buildSubscriptionV2Above(allTopics, assignment.get(consumer1), generationId, 0));
subscriptions.put(consumer2, buildSubscriptionV2Above(allTopics, assignment.get(consumer2), generationId, 1));
subscriptions.put(consumer3, buildSubscriptionV2Above(allTopics, Collections.emptyList(), generationId, 2));
subscriptions.put(consumer4, buildSubscriptionV2Above(allTopics, Collections.emptyList(), generationId, 3));
assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assignment.forEach((consumer, tps) -> assertEquals(3, tps.stream().filter(tp -> tp.topic().equals(topic1)).count()));
assignment.forEach((consumer, tps) -> assertEquals(3, tps.stream().filter(tp -> tp.topic().equals(topic2)).count()));
assertTrue(isFullyBalanced(assignment));

// remove 2 consumers
subscriptions.remove(consumer1);
subscriptions.remove(consumer2);
subscriptions.put(consumer3, buildSubscriptionV2Above(allTopics, assignment.get(consumer3), generationId, 2));
subscriptions.put(consumer4, buildSubscriptionV2Above(allTopics, assignment.get(consumer4), generationId, 3));
assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assignment.forEach((consumer, tps) -> assertEquals(6, tps.stream().filter(tp -> tp.topic().equals(topic1)).count()));
assignment.forEach((consumer, tps) -> assertEquals(6, tps.stream().filter(tp -> tp.topic().equals(topic2)).count()));
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());
assertTrue(isFullyBalanced(assignment));
}

@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
@EnumSource(RackConfig.class)
public void testAddRemoveTwoConsumersTwoTopics(RackConfig rackConfig) {
Expand Down Expand Up @@ -442,15 +495,15 @@ public void testAddRemoveTwoConsumersTwoTopics(RackConfig rackConfig) {
assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);

Map<TopicPartition, String> expectedPartitionsTransferringOwnership = new HashMap<>();
expectedPartitionsTransferringOwnership.put(tp(topic2, 1), consumer3);
expectedPartitionsTransferringOwnership.put(tp(topic1, 2), consumer3);
expectedPartitionsTransferringOwnership.put(tp(topic2, 3), consumer3);
expectedPartitionsTransferringOwnership.put(tp(topic2, 2), consumer4);
assertEquals(expectedPartitionsTransferringOwnership, assignor.partitionsTransferringOwnership);

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertEquals(partitions(tp(topic1, 0), tp(topic1, 2)), assignment.get(consumer1));
assertEquals(partitions(tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer2));
assertEquals(partitions(tp(topic2, 1), tp(topic2, 3)), assignment.get(consumer3));
assertEquals(partitions(tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer1));
assertEquals(partitions(tp(topic2, 0), tp(topic1, 1)), assignment.get(consumer2));
assertEquals(partitions(tp(topic1, 2), tp(topic2, 3)), assignment.get(consumer3));
assertEquals(partitions(tp(topic2, 2)), assignment.get(consumer4));
assertTrue(isFullyBalanced(assignment));

Expand All @@ -460,8 +513,8 @@ public void testAddRemoveTwoConsumersTwoTopics(RackConfig rackConfig) {
subscriptions.put(consumer3, buildSubscriptionV2Above(allTopics, assignment.get(consumer3), generationId, 2));
subscriptions.put(consumer4, buildSubscriptionV2Above(allTopics, assignment.get(consumer4), generationId, 3));
assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertEquals(partitions(tp(topic2, 1), tp(topic2, 3), tp(topic1, 0), tp(topic2, 0)), assignment.get(consumer3));
assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic1, 2)), assignment.get(consumer4));
assertEquals(partitions(tp(topic1, 2), tp(topic2, 3), tp(topic1, 0), tp(topic2, 1)), assignment.get(consumer3));
assertEquals(partitions(tp(topic2, 2), tp(topic1, 1), tp(topic2, 0)), assignment.get(consumer4));

assertTrue(assignor.partitionsTransferringOwnership.isEmpty());

Expand Down

0 comments on commit 027fad4

Please sign in to comment.