Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The on_assign callback for subscribe provides an empty partitions list #133

Closed
aaronhays opened this issue Feb 21, 2017 · 5 comments
Closed

Comments

@aaronhays
Copy link

We do custom partition/offset tracking, so we run our consumers with enable.auto.commit=False and group.id=None and we call Consumer.subscribe with a callback set for on_assign. Usually, our initial call to subscribe results in an empty partitions parameter to the on_assign callback. We added retry logic around our subscribe calls, and we generally get a valid partitions list in on_assign within a minute or so.

From this, it seems like initially the Consumer isn't really ready to field subscribe calls with on_assign but eventually becomes ready (perhaps related to initial metadata requests running in the background?). Is there some way for users to determine this readiness and delay calls to subscribe? Our retries are a bit hacky and I'd like to remove them.

Or, is our usage horribly wrong, requiring a new approach to our offset management?

@edenhill
Copy link
Contributor

Setting group.id to None does not actually null it, it is converted to the string "None" - thus a valid consumer group. This is a bug in the Python wrapper.
However, the underlying librdkafka library will not allow you to run subscribe() on a consumer without a valid group.id set since the group.id is a required property for balanced consumer groups.
You can only use assign() when no group.id is set.

Why you are seeing an empty assignment initially is probably because of the above, you might have multiple clients using this "None" group and not enough partitions for all joined consumers. Sounds plausible?

@aaronhays
Copy link
Author

Yes, that is completely plausible - it would also explain why I've been unable to reproduce it in a dev setting (because I've only ever used a single client in dev).

But, the restriction on subscribe will lead me to another problem - the subscribe with on_assign allows me to discover partitions but manage offsets myself. For assign I need to know what all of the partitions are up front, which is a problem when partitions change on the cluster. Is there another way to discover the current partitions for a topic?

@edenhill
Copy link
Contributor

The easiest approach is to disable auto commits and using a unique group.id for each client, this will give you a short-lived consumer group with only one consumer.

@aaronhays
Copy link
Author

Actually, a couple follow-up questions: Eventually, all of my clients get an on_assign callback with a non-empty partitions list (within a minute or so). Would you expect this to happen in the multiple clients case you describe?

Also, you mentioned that assign can only be used by no group.id is set; however, all of my consumer configs use group.id=None and some of them call assign (and it works). Is this expected? Will this eventually stop working?

@edenhill
Copy link
Contributor

Hard to say what is going on there since I don't have a clear picture of your setup, but if you want to see what's going on behind the scenes I suggest you enable "debug":"cgrp" in the config dict.

subscribe() starts the high-level balanced consumer, joins the configured group.id, and waits for a partition assignment based on its subscribed topics. When an assignment is received it either calls assign() automatically or lets you do it in the on_assign callback.

assign() is the lower level consumer that actually starts consuming the given set of partitions (after stopping any previous consumption). The group.id does not have any relevance for this call since it all it operates on is a definitive set of partitions that comes from somewhere (high-level consumer assignment, user manual assignment, etc..). You are free to call assign() at any time, more or less, to replace the current set of partitions being consumed.

Having said that, a group.id must still be configured for assign() to function, but this is due to internal implementation details. Unless you join the group (subscribe()) or commit offsets the group.id is not actually used.

@edenhill edenhill added this to the v0.9.5 milestone Mar 2, 2017
edenhill added a commit that referenced this issue Apr 28, 2017
None conf values are now converted to NULL (closes #133)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants