Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent-kafka-python, Tornado and GIL #100

Open
ghost opened this issue Dec 10, 2016 · 17 comments

Comments

@ghost
Copy link

commented Dec 10, 2016

Let's talk about usage of confluent-kafka-python Producer together with Tornado web server.

We know that Tornado is non blocking-server and has event loop for processing socket events.
We also know that librdkafka has several threads and its own event loop. And confluent-kafka-python is build on top of librdkafka.

When we call Producer.produce() from something inside Tornado it executes in main thread. Right? Then librdkafka pass data (with callback, probably) to another thread (event loop thread). And when message succefully recieved by Kafka, callback executed.

What happens when librdkafka call callback function from its event loop? Does it lock Python main thread with GIL?

@edenhill

This comment has been minimized.

Copy link
Member

commented Dec 10, 2016

Callbacks are called in the main thread, triggered by proudcer.poll().
The GIL is unlocked while the underlying rd_kafka_poll() C function is called (since it may block if timeout > 0), if a callback is triggered the GIL is re-acquired before calling any registered Python-code callbacks (e.g., on_delivery).

Illustration:

  • Python app: p.poll(10)
  • Module unlocks GIL
  • Module calls C rd_kafka_poll(10*1000)
  • C-callbacks are triggered
  • C-callback re-acquires GIL
  • C-callback executes registered Python callback (on_delivery, error_cb, ..)
  • Python callback executes and returns
  • C-callback unlocks GIL
  • rd_kafka_poll() returns
  • Module reacquires GIL
  • p.poll() returns
  • Python app resumes execution

This all happens in the same thread, main application thread.

@ghost

This comment has been minimized.

Copy link
Author

commented Dec 11, 2016

Thank you for your answer!
I still have some questions, so i try to explain.

Lets imagine situation where we use confluent-kafka-python from single thread Python app.

  • p.poll(10) called from Main Thread
  • Module unlocks GIL. Yes, it's unlocked now. But we still have one thread, so interpreter doesn't switch execution to the next thread.
  • Module calls C rd_kafka_poll(10*1000). Here interpeter will be blocked untill rd_kafka_poll returns.

Am i right?

@edenhill

This comment has been minimized.

Copy link
Member

commented Dec 11, 2016

Yes, Python execution of the thread calling poll() will halt until the function returns, but other Python threads will continue to run during this time.

@ghost

This comment has been minimized.

Copy link
Author

commented Dec 12, 2016

This is exactly what i am searching for. This means that we shouldn't use confluent-kafka-python with Tornado, because Tornado has only one exection thread and calling poll() wil block it. So we will lose perfomance.

@ghost ghost closed this Dec 12, 2016

@mtrienis

This comment has been minimized.

Copy link

commented May 4, 2017

Hi @johnsm87 and @edenhill ,

I'm also doing an evaluation of confluent-kafka-python with applications built on the Tornado framework and I'm hoping to understand this conversation a bit better.

This means that we shouldn't use confluent-kafka-python with Tornado, because Tornado has only one exection thread and calling poll() wil block it. So we will lose perfomance.

Did you find a way to do non-blocking calls using Tornado async interfaces? Any way to extend confluent-kafka-python to support tornado coroutines?

As far as I understand the poll(1000) function creates a busy loop and doesn't allow Tornado to execute other work (or code) while waiting for a response from confluent-kafka-python.

I'm looking for a possible workaround or patch, anything that could help here :-)

@ewencp

This comment has been minimized.

Copy link
Member

commented May 5, 2017

I'm not certain about Tornado, but with asyncio I'm pretty sure you'd want to integrate with run_in_executor for any blocking calls. I'd imagine Tornado has something similar since pretty much any async library needs a way to integrate with synchronous libraries by pushing them out to a thread/threadpool.

I think the major issue here is with callbacks since they come from different C threads. Any callbacks that are triggered by C code when the GIL is unlocked would be invoked as soon as they can acquire the lock, which would introduce arbitrary ordering wrt the event thread. This might be ok if those callbacks always appended them to the event queue. I'm not sure off the top of my head what kinds of problems could occur if the user callback is executed immediately.

@edenhill

This comment has been minimized.

Copy link
Member

commented May 5, 2017

Side note: pretty much all librdkafka callbacks are triggered from .poll() (or similar) in the application thread.

@davidblewett

This comment has been minimized.

Copy link

commented May 5, 2017

I used this library along with Tornado here: https://github.com/CrowdStrike/cs.eyrie/blob/master/cs/eyrie/transistor.py#L470 (in this code, self.collector is an instance of Consumer). If you call poll(0), it will return immediately. It might return None if there is not a message ready, but it won't block. The above code handles that by waiting (using gen.sleep) progressively longer before polling again.

@sergiimk

This comment has been minimized.

Copy link

commented May 5, 2017

Dedicating a thread to poll() calls definitely works, but hinders scalability.

With one thread per broker, librdkafka is already very liberal in it's use of threads. This may be tolerable for most languages where you run one application process per host and maximize CPU utilization by running more worker threads. With Python and its GIL however you achieve better utilization by running more worker processes.

We typically run over a hundred worker processes per host. In the environment with 10 Kafka brokers we are looking at having a process go from one thread to 13 (main IOLoop thread, main librdkafka thread, 10 broker threads, polling and callback dispatch thread).

Napkin math:

  • 100 processes * 12 threads = 1,200 more threads (context switching overhead)
  • 1,200 threads * 2MB default stack size = 2.4 GB more RAM

Because of this cascading effect we will also have to keep in mind that whenever we change Kafka broker topology we need to either decrease the number of workers we run (not to run out of memory), or to increase it (to get better utilization).

My understanding is that broker threads mostly sit idle blocked on IO. If that's the case - having one thread manage all broker sockets via poll or select should not negatively affect the throughput.

Ideally with asyncio and Tornado you would not need extra threads at all. Sockets can be put in non-blocking mode and registered with IOLoop.

Sure, threading model is not something you can change easily, but it would be a great future improvement for Python clients.

@mtrienis

This comment has been minimized.

Copy link

commented May 5, 2017

Thanks everyone for your responses, it was very helpful. We're going to look at adding a dedicated thread for polling and will let you know how that goes.

@davidblewett

This comment has been minimized.

Copy link

commented May 6, 2017

@mikhtonyuk: the code I linked to is designed to write to a zmq socket, so that there's one process reading from Kafka and many processes connected to the zmq PULL socket. Messages are then distributed in round robin fashion.

@mtrienis

This comment has been minimized.

Copy link

commented Jul 24, 2017

Just to update this issue on what I did when integrating Tornado and confluent-kafka-python:

  • I created a dedicated thread that calls Producer().poll infinitely.
  • I attached a callback function to Producer().produce that adds another callback function [i] to the Tornado event loop. See IOLoop.add_callback
    1. The second callback function sets the result of the future once the message is delivered to Kafka (i.e. poll triggers the callback function)
        if err is None:
            future.set_result(True)
        else:
            future.set_result(False)

       return future 

It all works fine, however, it becomes complicated if you try to send two non-blocking messages in different coroutines on the main thread. There is no guarantee that the order in which you send the messages will be the same order that the callback function is triggered.

This means that I need to wait until each message is sent successfully before sending another. That flow results in an average latency around ~2ms per message (with queue_buffering_max_ms=0) to a local Kafka cluster. That's roughly ~500 messages / second compared to 20,000 messages / second using the produce method without the coroutine.

I guess my next step would be to wait until this issue is resolved and set max.inflight=1:

Question: what is the best way to support two non-blocking requests coming from the same main thread given that the responses may be out-of-order?

Thanks!

@ewencp

This comment has been minimized.

Copy link
Member

commented Jul 31, 2017

@mtrienis One way to accomplish this ordering guarantee would be to do a bit of your own buffering /blocking. In your callback that is setting the future, maintain some state that tracks the next expected message offset (or offsets across multiple topic partitions). I think aside from the initial setup, you shouldn't have to worry about locking because these callbacks are all executing on the main thread. When you see the expected offset, fire it as well as any messages that are queued up behind it. The vast majority of the time you'd expect this buffer to be empty, but in the case of some reordering you do a bit of buffering until the earliest request's response comes through.

Now, there are a couple of issues here -- you need to know the partitions the messages are going to (or have a global order on the produce requests), the quick design I sketched out won't handle the offset jumps in compacted topics, etc. But those could be addressed if you need this before the underlying issue can be fixed in librdkafka. (Falling back to synchronous production is basically never a good solution, for exactly the reason you point out, even if you need to manage a bit more state yourself.)

@mrocklin

This comment has been minimized.

Copy link
Contributor

commented Oct 3, 2017

The producer side is fairly easy due to the non-blocking/callback mechanism already present here.

Building a tornado/asyncio framework around confluent-kafka-python would be much easier if we had the ability to register a Python callback that was triggered every time the internal buffer of messages went from empty to non-empty. After this callback was triggered we would be guaranteed that the next call to Consumer.poll(0) would return a message, and not None.

Is adding such a callback feasible?

@mrocklin

This comment has been minimized.

Copy link
Contributor

commented Oct 4, 2017

This notebook gives a basic non-blocking implementation of Consumer.poll and shows basic benchmarking information.

https://gist.github.com/3feba6adcf9b33ffb261c896b5e6c343

Would it be reasonable to reopen this issue to discuss using confluent_kafka with Python's async frameworks?

@edenhill edenhill reopened this Oct 9, 2017

@edenhill

This comment has been minimized.

Copy link
Member

commented Jan 16, 2018

@mrocklin There is indeed community demand for getting this client to work flawlessly with asyncio, see issue #185.

We would be very happy to see community contributed work in this area.

@mrocklin

This comment has been minimized.

Copy link
Contributor

commented Jan 16, 2018

FWIW it is no longer likely that I will implement anything like this near-term. I would no longer plan on productivity from me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.