Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-62 / KAFKA-3888: Allow consumer to send heartbeats from a background thread #948

Closed
jeffwidman opened this issue Jan 18, 2017 · 27 comments
Assignees

Comments

@jeffwidman
Copy link
Collaborator

jeffwidman commented Jan 18, 2017

Allows consumers to take their time processing messages without being timed out from their consumer group.

max_poll_records is a decent workaround for most of the pain here, but it'd still be nice to add this for consumers that have inconsistent message processing times.

Related issues: #872, #544

@tvoinarovskyi
Copy link
Collaborator

Hey @jeffwidman do you plan to do any work on this? If not I probably can take it on the weekend.

@jeffwidman
Copy link
Collaborator Author

No, I'm afraid I won't have time to work on this anytime soon... if you have time, go for it!

Only comment--and you probably already know this--is as far as implementation, the closer it mirrors the Java consumer (without the bugs 😉 ), the better IMHO. Among other things, it makes it so config tuning advice from the Kafka mailing list etc is easier to translate to kafka-python.

@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Feb 7, 2017

We were just discussing this internally at my day job... since Python has the GIL, how would this background thread ever issue a heartbeat if the main consumer thread is CPU-bound in it's message processing?

Ie, the main thread would occupy the CPU the whole time, and the background thread wouldn't get scheduled in to issue the heartbeat unless the message processing was I/O bound.

@tvoinarovskyi
Copy link
Collaborator

Hey @jeffwidman
Sorry, but GIL will not block you from doing IO in a separate thread. At least if you are not doing something crazy like passing all messages in a pure C code, that processes them. Python will yield GIL after some time (python3) or instruction count (python2), so as long as you are not stuck in C code it should be OK.

@jianbin-wei
Copy link
Contributor

I agree that in most cases the GIL would not prevent the heart-beat background thread from doing its job.

@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Mar 7, 2017

@tvoinarovskyi are you still planning on working on this?

It's not something I can tackle in the near term, just wanted to check in to see where you're at on it as it would be a very useful addition...

@tvoinarovskyi
Copy link
Collaborator

@jeffwidman Hey, sorry, but not seems possible in near term. I did look at how Java client does it, quite a hassle with locking, not sure I can do it properly in my limited free time.

@oscarbatori
Copy link

@jeffwidman @Drizzt1991 - is there a pull request for this? We would quite like to leverage KIP-62 functionality in our organization.

@tvoinarovskyi
Copy link
Collaborator

@oscarbatori Sorry, nothing too good to show...

@pvanderlinden
Copy link

Is there any work around for this problem? I just switched to kafka-python from a different client, only to discover this problem. The problem I have that one of the consumers does a lot of heavy processing taking anything from 10 second to 5 minutes, this means we have to set the session time out high, meaning recovering from crashed consumers will take forever.

@tvoinarovskyi
Copy link
Collaborator

@pvanderlinden You could do a quite eleborate setup with using pause() API. Like have a thread pool for processing and a thread for polling Kafka. That's how you did it before the Heartbeat in background. Is you're processing by any chance done in C? If so you'll have problems. If it's Python I can try to get you some workaround.

@pvanderlinden
Copy link

pvanderlinden commented Jul 14, 2017

@tvoinarovskyi I don't see a way to poll Kafka though, if I call poll it will actually fetch and return messages which I don't want of course? Just noticed if I increase the session_timeout_ms I also need to increase the request_timeout_ms, which means failures will take even longer to be detected on that part as well. The processing is in python, io waits for an external program, and some in c modules for python (scikit-learn/numpy).

@tvoinarovskyi
Copy link
Collaborator

@pvanderlinden The idea is that you poll(), put the results in a Queue instance per partition and pause the partition. That way the consuming thread always polls and sends heartbeats, but does not fetch new data, cause the partition is paused.

@pvanderlinden
Copy link

@tvoinarovskyi So every time you consume a message you pause all partitions, start a thread which calls poll, then after processing you unpause and stop the thread, consume, and start all over again?

@tvoinarovskyi
Copy link
Collaborator

@patricklucas Let me put up a snippet of code. I wanted to put it togather before anyway. Will be back in a hour or so)

@pvanderlinden
Copy link

Thanks, that would be very useful.

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Jul 14, 2017

@pvanderlinden Ok, so for starters, it's not exactly easy code here. https://gist.github.com/tvoinarovskyi/05a5d083a0f96cae3e9b4c2af580be74

It's not production code, only something I got working and tested locally in 2 hours, so it will need some polishing. I did include all the hard parts, like:

  • Proper committing of offsets on shutdown or rebalance
  • KafkaConsumer is accessed strictly from 1 thread, as it's not thread-safe

But it has some drawbacks:

  • It's not super reactive, for example if there's no data in Kafka, or we are having a dead loop (all partitons paused) it will wait 1 second before checking, that some of the queue workers finished processing. Can't get around that as KafkaConsumer is not thread safe, and there's no wakeup functions like in Java Client. Created issue for that Add wakeup to Consumer as in Java Client #1148
  • If you have a long C call, it may not give the consumer_thread code quants to execute, as C code can lock GIL for too long. This may be an issue and will require to use ProcessPool's instead of Thread ones. It will have more code around WorkerQueue, but is doable.

@tvoinarovskyi
Copy link
Collaborator

By the way, @dpkp, @jeffwidman maybe you have a link for a good implementation of the consumer that uses pause and processes data in other threads. I did see a good implementation in Java, but couldn't find it...

@pvanderlinden
Copy link

That is quiet complex, maybe the background thread for heartbeat is a better way to go then.

@pvanderlinden
Copy link

pvanderlinden commented Jul 17, 2017

Just ran in to major problems again. Fetch size of 1, session timeout of 20 seconds, average processing time of 10 seconds, still I see the error happening about every 15 minutes and bumping the offset back again. I might need to switch back to the other python client which sort of worked.

@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Sep 1, 2017

I would like to start working on this, we have multiple services at my day job that are impacted by this (and just realized there may be more due to #1039).

Unfortunately, I've never really worked with Java, so will be a bit slow trying to figure this out.

@tvoinarovskyi do you have any pointers on where to look in the Java code for where to start changing things compared to how they are in kafka-python today?

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Sep 3, 2017

@jeffwidman Cool, so you can use https://github.com/apache/kafka/pull/1627/files to understand the parts that need to be altered, but I prefer doing the changes right from the trunk to have a more up-to-date version (I use the PR as reference what should we change and see in trunk how the latest version looks like).

I would approach this task with something like:

  • Write a simple check script, that does a long processing cycle.
  • Add support for V1 Join and Sync group requests. Add max_poll_interval parameter to the consumer.
  • Remove code that does delayed tasks for heartbeats. Just clean up so Consumer works, but does not send heartbeats.
  • Add a heartbeat thread, that only prints Heartbeat and refactor ensure_active_group to properly start/pause/stop the thread. See the AbstractCoordinator.java file changes.
  • Implement the heartbeat threads run. At this point, you will need to trace the calls this thread makes and make sure the state is changed with an acquired lock. See Java's run(). One lock in Coordinator is enough for this, similar to how Java uses AbstractCoordinator object's monitor, but we will probably need a Condition, rather then an RLock.
  • Make sure your script is doing good with heartbeats.
  • Add tests for heartbeats in the background.
  • Go through the Java code a few time, polishing out.

And hey, don't worry about breaking something, you will have another couple of eyes here =) When you start, you can create a WIP PR and I'll help out with more pointers, or take some part off from you.

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Sep 3, 2017

You can omit some hard parts on the first iteration of this:

  • Changes to coordinator/consumer.py that refractor auto-commit routine. In Java's version, it's done to remove the DelayedTasks code from the client. We can do this on the 2nd iteration.
  • Making client_async.py thread-safe. We will need it, but as heartbeat thread will have a hard lock shared with Coordinator, only Fetcher will need to use locking in client. We can do this on next iteration too.

So you can concentrate only on corrdinator/base.py, coordinator/heartbeat.py and consumer/group.py changes on first iteration.

@dpkp
Copy link
Owner

dpkp commented Oct 10, 2017

I worked on this over the weekend. There's a subtle issue that needs some attention. KIP-62 relies on new JoinGroup api versions. The v1 JoinGroup api supports passing both a session_timeout_ms and a rebalance_timeout_ms (aka max_poll_interval_ms). The session timeout tells the group coordinator how long to wait between heartbeats before marking the consumer dead and forcing a rebalance. The rebalance timeout tells the group coordinator how long to keep the rebalance operation open so that the consumer can complete a poll() loop, commit offsets, and begin rejoin.

Question is how best to configure when we're connecting to an older broker that doesn't support the new JoinGroup api version. My current thinking is that we should require that session_timeout_ms == max_poll_interval_ms when using api_version < 0.10.1, and that we should only use the smaller default session_timeout_ms of 10000 if api_version >= 0.10.1 . This would allow consumers to use the background thread w/ older brokers, though I dont think it would help actual behavior because session_timeout_ms would still need to be configured as worst-case poll() duration.

@tvoinarovskyi
Copy link
Collaborator

@dpkp how about just prevent setting max_poll_interval_ms for old broker versions? The behaviour for session_timeout seems fine as you described.

@pablasso
Copy link

pablasso commented Mar 6, 2018

Isn't this already solved by #1266?

@dpkp
Copy link
Owner

dpkp commented Mar 8, 2018

Yes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants