Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-54: Implement sticky partition assignment strategy #2057

Merged
merged 8 commits into from
Sep 30, 2020

Conversation

aynroot
Copy link
Contributor

@aynroot aynroot commented May 26, 2020

#1122

Implementation of a sticky partition assignment strategy for kafka consumers.

General motivation behind sticky assignment:

In certain circumstances the round robin assignor, [...], fails to produce an optimal and balanced assignment of topic partitions to consumers. [...] In addition, when a reassignment occurs, none of the existing strategies consider what topic partition assignments were before reassignment, as if they are about to perform a fresh assignment. Preserving the existing assignments could reduce some of the overheads of a reassignment. For example, Kafka consumers retain pre-fetched messages for partitions assigned to them before a reassignment. Therefore, preserving the partition assignment could save on the number of messages delivered to consumers after a reassignment. Another advantage would be reducing the need to cleanup local partition state between rebalances.

The assignment algorithm is described here: KIP-54
An implementation of the strategy in Java: AbstractStickyAssignor.java + StickyAssignor.java. And its tests: AbstractStickyAssignorTest.java + StickyAssignorTest.java


This change is Reviewable

@aynroot aynroot force-pushed the KIP-54-sticky-assignor branch 2 times, most recently from 5611809 to fc9d90f Compare May 27, 2020 12:45
@aynroot aynroot marked this pull request as ready for review May 27, 2020 13:24
@aynroot
Copy link
Contributor Author

aynroot commented May 28, 2020

CI checks are failing for Python 3.7 due to a pylint issue, looks like it is being addressed already here #2058

@tvoinarovskyi
Copy link
Collaborator

@aynroot Yea, you can ignore that CI error for now, I need to debug it properly

@aynroot
Copy link
Contributor Author

aynroot commented May 28, 2020

@tvoinarovskyi alright, thanks for the info
then this PR is ready for review

@@ -44,13 +44,14 @@ def metadata(self, topics):
pass

@abc.abstractmethod
def on_assignment(self, assignment):
def on_assignment(self, assignment, generation):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change for anyone who may have implemented their own custom assignor, as they will now need to add a generation parameter. Is there a way to structure this so that it doesn't require changes to existing assignors?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked deeply into KIP-429, but if it also leverages generations (and I suspect it might given that it includes incremental shifts), then we may want to consider keeping this and bumping the kafka-python version to 3.x for the breaking change... I have a feeling it may make our lives easier down the road. But if they don't need it there, then agree it would be nicer not to add it just for the sticky assignor case...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about KIP-429, but generations here are crucial for KIP-341 bug fix.

We can allow backwards compatibility with something like this in consumer.py

assignor.on_assignment(assignment)
if assignor.name == 'sticky':
    assignor.on_generation_assignment(generation)

I'll make necessary changes.

@@ -6,6 +6,7 @@
import logging
import time

from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import order

@@ -31,7 +32,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': None,
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor, StickyPartitionAssignor),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would rather not add this to the default list, especially since it requires an external dependency

Copy link
Collaborator

@jeffwidman jeffwidman Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment algorithm isn't just sticky, it's also better balanced than either of the other two... and if there's a choice between sticky vs balanced, it favors balanced... at least that's what I recall from when I helped drive that KIP forward.

Given that, it'd be nice to add to the default list, and I'd even support moving it up the list to the primary default so that users get the most optimized balancing algorithm. The only risk is if the implementation is buggy, but it looks like there's a solid test suite here.

Be good to know what the Java client does here, I'd check but my wife just said dinner is ready so I need to roll...

Copy link
Collaborator

@jeffwidman jeffwidman Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this a little more, and based on the work in KIP-429, it's probably not a great idea to set this as the preferred option...

But I do still support including it in the list of default options. Additionally, if this is joining a mixed-language consumer group, it's more likely that it'd be cross-compatible with the Java clients etc if we include this in the default list.

It also makes it discoverable to folks that it's an option... because otherwise it's not obvious in the main consumer docs that this is an option.

setup.py Outdated
@@ -58,5 +59,8 @@ def run(cls):
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Libraries :: Python Modules",
],
install_requires=[
"sortedcontainers==2.1.0"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka-python has to-date not taken on any external dependencies, and we'd like to keep with that philosophy. Is this module necessary / can we refactor to use the standard library instead? If it is necessary, can we make it optional and disable the assignor if it is not installed?

Copy link
Collaborator

@jeffwidman jeffwidman Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be really nice to see this land without an external dependency...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thought about it as well, should not be a rocket science to implement sorted sets by hand. Will do.

@@ -233,7 +234,7 @@ def _on_join_complete(self, generation, member_id, protocol,

# give the assignor a chance to update internal state
# based on the received assignment
assignor.on_assignment(assignment)
assignor.on_assignment(assignment, generation)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will also crash if the user has configured a custom assignor that doesn't take the additional parameter

@dpkp
Copy link
Owner

dpkp commented Sep 7, 2020

Thanks -- I can see that you've put in a lot of work on this, and we'd love to get it merged. I've left a few comments re: compatibility and dependencies that I would like to resolve before adding to kafka-python.

@jeffwidman
Copy link
Collaborator

jeffwidman commented Sep 17, 2020

Does this also include the updates described in KIP-341? I'm sure Vahid also increased the test coverage to cover this case...

If you ported the latest Java code, you should be fine, but if you based your implementation off the original KIP-54 description, then we'd want to include the KIP-341 bugfix...

@aynroot
Copy link
Contributor Author

aynroot commented Sep 22, 2020

Does this also include the updates described in KIP-341? I'm sure Vahid also increased the test coverage to cover this case...

If you ported the latest Java code, you should be fine, but if you based your implementation off the original KIP-54 description, then we'd want to include the KIP-341 bugfix...

Yep, it includes KIP-341

@aynroot aynroot force-pushed the KIP-54-sticky-assignor branch 3 times, most recently from 539b6f4 to e4c05f8 Compare September 28, 2020 14:29
@aynroot
Copy link
Contributor Author

aynroot commented Sep 29, 2020

@dpkp @jeffwidman I've addressed all the comments, this PR is ready to be re-reviewed.
Please take a look when you have time.

Copy link
Owner

@dpkp dpkp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is great and a huge new feature!! Fantastic work

@dpkp dpkp merged commit c536dd2 into dpkp:master Sep 30, 2020
@sibiryakov
Copy link
Contributor

@aynroot Well done Valeria!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants