Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Create strategy lazily on first message #317

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

untitaker
Copy link
Member

When a series of rebalance events occurs, this avoids recreating the
strategy from within callbacks entirely (making on_revoke in particular
faster). Instead the strategy is created whenever the first message is
being processed.

There were ideas to create the strategy eagerly before even subscribing
to a topic. However, currently create_with_partitions requires knowledge
of the currently assigned partitions. We would have to deprecate this
argument if we wanted to make this happen.

When a series of rebalance events occurs, this avoids recreating the
strategy from within callbacks entirely (making on_revoke in particular
faster). Instead the strategy is created whenever the first message is
being processed.

There were ideas to create the strategy eagerly before even subscribing
to a topic. However, currently create_with_partitions requires knowledge
of the currently assigned partitions. We would have to deprecate this
argument if we wanted to make this happen.
@untitaker untitaker requested a review from a team as a code owner December 20, 2023 12:44
@@ -243,7 +222,6 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
"Partition assignment while processing strategy active"
)
_close_strategy()
_create_strategy(partitions)
Copy link
Member

@lynnagara lynnagara Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert that self.__processing_strategy is None here, or make sure we close if it is not None? What would happen if we get an assign without a prior revocation? I know that doesn't actually happen today but the code right now is designed to work for incremental assignment as well, which might not be the case after this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i follow, we do close it on the line before exactly when the strategy is not None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this part of rebalancing was actually broken with incremental assign, as we would create the strategy with the subset of partitions from this callback instead of the full assignment. with this pr we're now actually only calling _create_strategy with consumer.tell() which should always contain the full assignment

Comment on lines -259 to -276
# Recreate the strategy if the consumer still has other partitions
# assigned and is not closed or errored
try:
current_partitions = self.__consumer.tell()
if len(current_partitions.keys() - set(partitions)):
active_partitions = {
partition: offset
for partition, offset in current_partitions.items()
if partition not in partitions
}
logger.info(
"Recreating strategy since there are still active partitions: %r",
active_partitions,
)
_create_strategy(active_partitions)
except RuntimeError:
pass

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that we don't get a subset of partitions revoked today as we are using eager rebalancing. But that may change in the future. This code was written this way so it also works with incremental rebalancing, even though we never actually recreate the strategy on partitions revoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i think we're now still handling that case correctly. this code effectively moved from the callback into _run_once. both times we recreate the strategy with the return value from consumer.tell(). we're still closing and flushing the strategy if any partition had been revoked.

just the condition under which we create the strategy changed. before, we'd recreate the strategy if we still had partitions assigned, now we're creating the strategy if we have none and consumer.poll() returned a message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants