Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support incremental assignments in stream processor #58

Merged
merged 13 commits into from
Apr 10, 2022

Conversation

lynnagara
Copy link
Member

@lynnagara lynnagara commented Apr 6, 2022

With the incremental assign / revoke strategy the assignment callback can
be triggered without a corresponding revocation beforehand. In that case
the processing strategy will still be active. When this happens, we close
the strategy before recreating it to make the behavior compatible with
the eager strategy.

With the incremental assign strategy the assignment callback can
be triggered without a corresponding revocation beforehand. In that case
the processing strategy will still be active. When this happens, we close
the strategy before recreating it to make the behavior compatible with
the eager strategy.
@lynnagara lynnagara requested a review from a team as a code owner April 6, 2022 23:08
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because the only case where you can receive new assigned partitions without prior revocation is an incremental rebalancing and the partitions you receive were not assigned before and you are not resetting any offsets (when you call assign_incremental you are assigning the same offset that is the committed offset). Is that correct?

Now the question is: what if you loose partitions without gaining new ones? you will have a call to revoke and that will close the strategy, but that will not be followed by an assignment so you will remain without a strategy instantiated. How do you plan to deal with that ?

@lynnagara
Copy link
Member Author

lynnagara commented Apr 8, 2022

@fpacifici Made a few changes since you last reviewed to handle incremental revoke. Now we check on revocation if the consumer has any other assigned partitions and if so the strategy is also recreated. Could you take another look and let me know if this makes sense to you?

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment. Otherwise it makes sense.

Any chance you can make separate tests for the cases where all partitions are revoked vs only some of them ?

if self.__consumer.tell().keys() - set(partitions):
_create_strategy()
except Exception:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we crash instead? If you cannot call tell we would not be creating the strategy and we would run into an inconsistent state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this was mostly just to catch consumer closed / terminated exception. I can just catch those specific errors instead and let anything else crash.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually on second thoughts, I think I would leave this as-is. it we cannot call tell it only means that the consumer is in the process of shutting down and we should not be re-creating the strategy otherwise it'll mess with the shutdown sequence.

Copy link
Contributor

@fpacifici fpacifici Apr 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log a warning ?
What happens if you cannot call tell for a transient network error? I think we will get an assertion error when we try to use the strategy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up changing this line to catch RuntimeError only. So now it will crash on anything else including networking issues.

@lynnagara lynnagara merged commit 57763e2 into main Apr 10, 2022
@lynnagara lynnagara deleted the stream-processor-incremental-assign branch April 10, 2022 22:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants