-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9987: optimize sticky assignment algorithm for same-subscription case #8668
KAFKA-9987: optimize sticky assignment algorithm for same-subscription case #8668
Conversation
@@ -303,79 +469,17 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) { | |||
Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) { | |||
List<TopicPartition> sortedPartitions = new ArrayList<>(); | |||
|
|||
if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove all this since we checked for identical subscriptions at the start, so we know that they are not
635f8f0
to
afbed93
Compare
@@ -169,10 +169,10 @@ public void testAssignmentWithConflictingPreviousGenerations() { | |||
TopicPartition tp5 = new TopicPartition(topic, 5); | |||
|
|||
List<TopicPartition> c1partitions0 = partitions(tp0, tp1, tp4); | |||
List<TopicPartition> c2partitions0 = partitions(tp0, tp2, tp3); | |||
List<TopicPartition> c2partitions0 = partitions(tp0, tp1, tp2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was testing an illegal state to begin with: you should never have two consumers in the same generation claim to own the same partition. That fact is the entire reason for the generation field to be added to the StickyAssignor's subscription userdata to begin with.
@@ -582,35 +578,6 @@ public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() { | |||
assertTrue(assignment.get(consumerId).isEmpty()); | |||
} | |||
|
|||
@Test | |||
public void testConflictingPreviousAssignments() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment above: this test was starting from an illegal state. Also, it doesn't make sense to place this in the AbstractStickyAssignorTest as the cooperative assignor can't have conflicting previous assignments. If a member thinks it still owns a partition that now belongs to another member, it will have to invoke onPartitionsLost
before rejoining the group
@@ -425,8 +422,36 @@ public void testSameSubscriptions() { | |||
assertTrue(assignor.isSticky()); | |||
} | |||
|
|||
@Test(timeout = 30 * 1000) | |||
public void testLargeAssignmentAndGroupWithUniformSubscription() { | |||
int topicCount = 200; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On trunk, this test fails (hits the 30s timeout) even when you reduce the number of topics to just 1
call for review any of @guozhangwang @hachikuji @vvcephei |
test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just one nit comment.
// other generations are, consider it as having lost its owned partition | ||
if (!memberData.generation.isPresent() && maxGeneration > 0 | ||
|| memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) { | ||
consumerToOwnedPartitions.put(consumer, new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: to be consistent, we can just add consumer
to membersWithOldGeneration
and then let them to be cleared at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, it seems odd to clear it at the end since it's definitely already empty. Note, we're not overwriting the current partitions with an empty array, we're just initializing the assignment for this consumer. I'll add a comment though
for (String consumer : unfilledMembers) { | ||
List<TopicPartition> consumerAssignment = assignment.get(consumer); | ||
int remainingCapacity = minQuota - consumerAssignment.size(); | ||
while (remainingCapacity > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that this unfilled consumer has N+1 remaining capacity, while there's only N max consumer only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM, I realized it should never happen.
|
||
// Keep track of the partitions being migrated from one consumer to another during assignment | ||
// so the cooperative assignor can adjust the assignment | ||
protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an optimization for the cooperative case: I found that the assignment time for the eager and cooperative assignor began to diverge once you reached partition counts in the millions. At 10 million partitions for example, the eager assignor hovered around 30s but the cooperative assignor was upwards of 5-6 minutes.
The discrepancy was entirely due to the adjustAssignment
method needing to compute the set of partitions transferring ownership in the completed assignment. But we can build up this map during assignment much more efficiently, by taking advantage of the additional context we have at various steps in the algorithm. Tracking and exposing this set to the cooperative assignor cut the assignment time for large partition numbers pretty drastically, putting the cooperative assignor on par with the eager assignor.
} else { | ||
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " | ||
+ "general case assignment algorithm"); | ||
partitionsTransferringOwnership = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't bother to include this optimization for the general case. We know that the assignment algorithm itself becomes a bottleneck at only 2,000 partitions, so there's no point optimizing something that only becomes a bottleneck in the millions of partitions
partitionsPerTopic.put(topic, 3); | ||
partitionsPerTopic.put(otherTopic, 3); | ||
subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was also starting with an illegal state -- partitionsPerTopic
only contains metadata for topics included in the subscription. I noticed that we don't seem to be testing the actual valid case, where some consumers have ownedPartitions
which are no longer in the subscription, so I just adapted this test for the related purpose
@guozhangwang made a few more changes, ready for another review |
From what I can tell, this looks good to me. This loses one mostly insignificant "optimization" that does not really affect anything in reality: prior, if an old-generation member is rejoining, the code would try to re-sticky partitions to those old members for any partitions that are now on overloaded members or are unassigned. This is a pretty minor optimization though, and deleting this logic entirely from my own balancer breaks no tests. This algorithm primarily differs from mine by doing a bunch of up front checking work, and then doing a "single" pass that performs all assignments. Mine does a bunch of assigning while doing checks, and then does a small balancing pass. Both of these options are great, though! Pretty nifty observation about building |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM on the new optimization.
@twmb yeah, I should point out in the ticket that this approach drops the optimization giving preference to older-generation owners of a partition. I actually don't think it would be particularly difficult to incorporate into this new algorithm, but my take was that it still adds more complexity than any benefit it provides. It was nice to be able to build up the |
…n case (#8668) Motivation and pseudo code algorithm in the ticket. Added a scale test with large number of topic partitions and consumers and 30s timeout. With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds. Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions) Should be cherry-picked to 2.6, 2.5, and 2.4 Reviewers: Guozhang Wang <wangguoz@gmail.com>
…n case (#8668) Motivation and pseudo code algorithm in the ticket. Added a scale test with large number of topic partitions and consumers and 30s timeout. With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds. Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions) Should be cherry-picked to 2.6, 2.5, and 2.4 Reviewers: Guozhang Wang <wangguoz@gmail.com>
…n case (#8668) Motivation and pseudo code algorithm in the ticket. Added a scale test with large number of topic partitions and consumers and 30s timeout. With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds. Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions) Should be cherry-picked to 2.6, 2.5, and 2.4 Reviewers: Guozhang Wang <wangguoz@gmail.com>
Cherry-picked to 2.6/2.5/2.4. |
Did we check the build before merging this? It seems to have broken it: |
@guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally also build locally when cherry-picking. |
Sorry @ijuma, I think I only ever ran the local tests + checkstyle, not the full suite. My mistake |
// If the current member's generation is higher, all the previous owned partitions are invalid | ||
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { | ||
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); | ||
membersOfCurrentHighestGeneration.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, I introduced this bug right before merging. Luckily the tests caught it -- fix is #8777
@ableegoldman No worries, I do the same. We just need to check the PR result before merging. Additionally, committers should run checkstyle and spotBugs when cherry-picking to older branches. |
* apache-github/2.6: (32 commits) KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786) KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737) KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695) KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777) MINOR: Remove unused variable to fix spotBugs failure (apache#8779) MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773) KAFKA-10030: Allow fetching a key from a single partition (apache#8706) Kafka-10064 Add documentation for KIP-571 (apache#8760) MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750) KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668) KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956) KAFKA-10074: Improve performance of `matchingAcls` (apache#8769) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) ...
Motivation and pseudo code algorithm in the ticket.
Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.
Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)
Should be cherry-picked to 2.6, 2.5, and 2.4