Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destroying queue references makes polling unusable #1792

Closed
accelerated opened this issue May 3, 2018 · 11 comments
Closed

Destroying queue references makes polling unusable #1792

accelerated opened this issue May 3, 2018 · 11 comments

Comments

@accelerated
Copy link

This could be a bug or perhaps I'm doing something wrong, please read.

Current workflow for the auto rebalance consumer:

  1. Forward the main queue to the consumer queue with rd_kafka_poll_set_consumer
  2. Get references to both the consumer queue and also to all assigned partitions queues with rd_kafka_queue_get_consumer and rd_kafka_queue_get_partition respectively.
  3. On the partition queues, stop forwarding to the consumer queue with rd_kafka_queue_forward(..., NULL)
  4. Poll both consumer queue and individual partitions queues for events via rd_kafka_consume_queue.
  5. When complete, forward partitions queues back to consumer queue with rd_kafka_queue_forward.
  6. Delete all queue references by calling rd_kafka_queue_destroy.
  7. Repeat 2-6 if needed.

Issue I'm seeing:

Steps 1-6 work well. I can get messages and poll in all queues including the consumer queue. I get all events serviced as well which is fine. The problem arises if I try to repeat 2-6 again. At this point nothing works anymore. I get new references to the queues, but it seems that the queues are disabled somehow and the rdkafka background thread is not fetching anything from the broker. The call to rd_kafka_cgrp_partitions_fetch_start happens when assignments are done, but if the queue is not working, I never get these events!! I tracked the problem to rd_kafka_queue_destroy which actually not only decrements the reference but it also totally deletes the queue. Normally the consumer object should have at least one reference to the consumer queue and to each partition queues, but it doesn't seem like it's the case, so the destroy is final.
I tried calling rd_kafka_consume_start or rd_kafka_consume_queue_start on the new queues, but this doesn't do anything. Furthermore, after calling rd_kafka_consume_start, if I poll the main consumer queue, I get an assert. I even tried calling rd_kafka_consume_stop before the destroy and then rd_kafka_consume_start again on the new references but that doesn't work either.
Currently the only way which seems to work and I can actually repeat steps 2-6 successfully is to never call rd_kafka_queue_destroy. I also got a deadlock at some point, but I could not replicate it unfortunately (scary!!).
As a side note, it would be nice to expose rd_kafka_q_purge0 to the public API so that when I'm done with polling each individual partition and I want to re-instate forwarding to the consumer queue, all non-processed messages in the local partition queues are copied over to the forwarded queue...which is not always a desirable behavior. I would prefer to have control and flush all local queues IF I want to.

Any ideas what I'm doing wrong or if you think there's actually a bug?

@edenhill
Copy link
Contributor

edenhill commented May 3, 2018

Ha, I just found the exact same issue yesterday in code that hasn't been touched in years. What are the odds? :)

The problem is that rd_kafka_queue_destroy() always treats the application as the owner of the queue, which is correct for rd_kafka_queue_new() queues, but not for existing internal queues such as rd_kafka_queue_get_partition, ..et.al. This causes the queue to be disabled, which leads to exactly what you are seeing, silence.

I'll post a fix soon.

As for queue_purge(): I'll look into it, it makes sense.

@accelerated
Copy link
Author

accelerated commented May 3, 2018

wow, yeah talk about odds! I guess most people don't use these more advanced apis. One more thing that I would like to understand, when a new assignment is made (or an un-assignment): these internal partition queues are purged and destroyed under the hood, and then new ones are created after the new assignment is made? Or you just purge the messages but keep the queues around?
PS: that makes me wonder about the deadlock I ran into...wonder if you saw that as well?

@edenhill
Copy link
Contributor

edenhill commented May 3, 2018

An op (rd_kafka_op_t), which is what a queue is made up, has an optional version barrier.
Fetched messages (RD_KAFKA_OP_FETCH) has a version barrier based on the last fetcher state update for the given partition. When the fetcher state is updated, due to rebalance, stop, pause, or whatever, the messages in the queue are either purged directly based on this version, or purged lazily as the queue is being read by an op version filter. Effectively this means that messages fetched for a previous fetcher state / version barrier will not be seen by the application.

Asynchronoucity in all its glory.

https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_queue.c#L342

https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_queue.c#L286

@accelerated
Copy link
Author

Ok thanks. Please let me know when you post the fix, It's pending a checkin in cppkafka as well, which I managed to make work with the above workaround i.e. not calling destroy. Also on a totally unrelated thread, when you get a chance to review the latest PR (on the scope of setting options) which I made a while back (based on our conversations) that would be great! That one is also pending a fix in cppkafka :).

@mfontanini
Copy link
Contributor

@edenhill just to be sure: would it cause issues if cppkafka used the queue handles and never destroyed them? Will that memory be released once the rdkafka handle gets destroyed?

Also @accelerated we can't really rely on this change as older users of rdkafka will be stuck with the broken behavior otherwise. At best the code could choose to return a non owning or an owning Queue object depending on the rdkafka version being used.

@accelerated
Copy link
Author

accelerated commented May 3, 2018

@mfontanini I would wait until the fix is put in librdkafka and then I will provide "full ownership" of handles. But in terms of backwards compatibility...we can just say that this round-robin adapter only works with a certain version of librdkafka. We can take this discussion on the cppkafka thread, i'm fine either way.

@accelerated
Copy link
Author

@edenhill Hi, what is the ETA for 0.11.5 ? Any ideas?

@edenhill
Copy link
Contributor

@accelerated we're aiming for 2-3w

@accelerated
Copy link
Author

Thanks

@agis
Copy link

agis commented Jul 19, 2018

This can be probably closed (fixed in 0.11.5)?

@edenhill
Copy link
Contributor

Yes! thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants