Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librdkafka and kevent/epoll. #617

Closed
dp-farm opened this issue Apr 18, 2016 · 6 comments
Closed

librdkafka and kevent/epoll. #617

dp-farm opened this issue Apr 18, 2016 · 6 comments

Comments

@dp-farm
Copy link

dp-farm commented Apr 18, 2016

I am working on a single threaded but performance oriented kevent based code. Process runs in a while loop calling kevent and relying on kevent to provide event (timer or file descriptor) which will then be passed to appropriate event handler.

Now librdkafka has its own mechanism to handle events and we need to call rd_kafka_consume() with a timeout. So practically I can call rd_kafka_consume in a timer. But as messages must be consumed within few mili seconds, keeping a timer of mili-second will cause high cpu.

Is there any way I can integrate librdkafka events to kevents based on some fd?

@ah-
Copy link
Contributor

ah- commented Apr 18, 2016

Yes, this sounds really interesting.
Also, would it be possible/useful to just run librdkafka on top of an existing event loop like libuv?

@edenhill
Copy link
Contributor

You could modify rd_kafka_q_t to trigger a kevent/epoll event as it calls cnd_signal() to wake up any listening threads.

As a proof of concept try something like this:

  • Add a field to rd_kafka_q_t that represents your event dispatcher object.
  • Set up a rd_kafka_queue_t from your application and set the dispatcher object on the underlying rd_kafka_q_t.
  • In rd_kafka_q_enq() and rd_kafka_q_concat0() where it calls cnd_signal() add a check for the dispatcher object and trigger an event for it if exists.
  • Only call rd_kafka_consume_queue() when the event is fired in your main thread (but possibly multiple times until queue is drained, with timeout=RD_POLL_NOWAIT).

@edenhill
Copy link
Contributor

@ah- Since librdkafka is multi-threaded internally it dont think there is any real use of plugging it into an existing dispatcher.

I do have plans on rewriting some of the internal IO and thread handling though to improve producer latency.

@edenhill
Copy link
Contributor

You could use the new rd_kafka_queue_io_event_enable() API, see here:
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1655

@akhi3030
Copy link
Contributor

Hi, thank you very much for this feature. I don't see a way to use it from within the C++ API. Am I missing something, is this intentional, or is it not yet implemented?

@edenhill
Copy link
Contributor

It isn't implemented but it is trivial to implement io_event_enable() on the Queue class.
PR welcome :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants