New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronous partition assignment handler (internal) #761
Conversation
This looks really promising @ennru . When I was considering an API I was thinking about providing strategies that users could opt in with (i.e. "commit on revoked partitions"), and then provide a super user interface which just exposed the consumer itself, but I like the idea behind |
1bcbe5d
to
244e8fa
Compare
Looking good. I like how TransactionalSource was able to use this without any code changes. |
2632f94
to
7f7db12
Compare
8730228
to
352c50a
Compare
352c50a
to
349a67b
Compare
606825d
to
6ca20b6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
6ca20b6
to
13d015b
Compare
Purpose
For now this PR just introduces the synchronous partition assignment handler internally without a user-facing API as the use-cases for this need to be understood better.
Introduce a new callback for changes in partitions assigned to an Alpakka Kafka consumer. This allows executing code before a rebalance continues, and querying the underlying Kafka consumer.
These callbacks are much more powerful (and dangerous) than the current rebalance listener actor. The callbacks are executed on the same thread that called
poll()
(the actor thread) so they are allowed to call into the Kafka consumer eg. to query current offsets, commit offsets or to seek to an offset position. The callbacks add to the timepoll()
is executed and block the internal Kafka consumer actor from doing any other work. As this can easily make influence other Kafka consumer actors running on the same dispatcher, the time the callbacks take is checked and warnings are issued to the logs if they pass a configurable threshold.This allows for new ways to handle offsets which might show more efficient for certain use-cases. The tests show an example for external offset storage and one example which commits on rebalance only.
Background Context
It has been discussed in #539 that the lack of blocking user-defined call-backs to react on Kafka's partition rebalancing makes certain use cases impossible with Alpakka Kafka.
In particular, it is impossible to combine Kafka's partition assignment with external offset storage.
References
References #539
Follow up #841