Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

advice for the erlang binding #3300

Closed
silviucpp opened this issue Mar 16, 2021 · 6 comments
Closed

advice for the erlang binding #3300

silviucpp opened this issue Mar 16, 2021 · 6 comments

Comments

@silviucpp
Copy link

Hello,

Into https://github.com/silviucpp/erlkaf we are using a design where on each topic we have a dedicated erlang process that calls rd_kafka_queue_poll on the partition queue on regular intervals to get new events.

Problem we faced recently :

The messages had to be sent into a system which was down for a very long time. Our consumer is doing something like:

  1. call rd_kafka_queue_poll(..) to get the events
  2. process the events until all are ok. In case one fails will not go further until this is completed.
  3. is calling rd_kafka_offset_store to update the offset

As you can see in step 2 you can stay indefinitely amount of time while you are not calling any API on that queue..
This seems causing issues, for some reason the consumer didn't reconnected and the offsets were lost because on the server were set for 24 hours.

My question is: what API we can call to ping the librdkafka that we are still alive ?

Or if we call rd_kafka_queue_poll(..) and get message A B C, then A and B are processed completely and we call rd_kafka_offset_store with B offset, once we calling again

rd_kafka_queue_poll we will get starting from C (I mean C was already returned on the previous call also but we couldn't proces it).

Silviu

@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2021

Perhaps in step 2 you exceeded max.poll.interval.ms, causing the consumer to halt itself until you call poll() again?

The contract is for the application to call poll() at least every max poll interval so that it can detect rebalances and stop processing messages for partitions that have been revoked and assigned to other consumers.

What you can do is separate the consumer queue (where rebalance events, errors, etc are emitted) and the partition fetch queues (where messages are emitted), and then just make sure to poll the consumer queue while doing processing so that you can detect rebalances.
The partition queues are by default forwarded to the consumer queue on assign().

@silviucpp silviucpp reopened this Jul 4, 2021
@silviucpp
Copy link
Author

silviucpp commented Jul 4, 2021

Yes you are right ! in step 2 I exceed the max.poll.interval.ms ...

Right now my implementation does the following:

on rebalance_cb for RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS I'm getting the partition queue and remove the forwarding to the main queue:

rd_kafka_queue_t* partition_queue = rd_kafka_queue_get_partition(rk, topic.c_str(), partition);
rd_kafka_queue_forward(partition_queue, NULL);

Inside my erlang code each partition is handled by an erlang process that:

  1. call rd_kafka_queue_poll(..) to get the events
  2. process the events until all are ok. In case one fails will not go further until this is completed.
  3. is calling rd_kafka_offset_store to update the offset

Can you please let me know how I can also forward consumer queue to another queue to avoid the problems
when step 2 takes longer than max.poll.interval.ms ?

@edenhill
Copy link
Contributor

edenhill commented Jul 5, 2021

With all partition queues re-routed you should be able to just call consumer_poll() regularily from a separate thread to serve max.poll, rebalances, etc.

@silviucpp
Copy link
Author

hmm, from what I see my logic with the main queue is as follow:

 rd_kafka_queue_t* queue = is_consumer ? rd_kafka_queue_get_consumer(instance): rd_kafka_queue_get_main(instance);
 rd_kafka_queue_cb_event_enable(queue, consumers_event_callback, this);
 rd_kafka_queue_destroy(queue);

And when consumers_event_callback is triggered I see I'm calling rd_kafka_consumer_poll (for consumer) and rd_kafka_poll for propducer..

I picked this implementation to optimize the calls to *_poll functions here. But seems is not covering max.poll.interval.ms.

Instead doing this I think the easy fix will be to make sure that calls between rd_kafka_consumer_poll are not longer than max.poll.interval.ms right ?

@edenhill
Copy link
Contributor

edenhill commented Jul 5, 2021

Yes, that sounds like a good plan,
max.poll.interval.ms is really the maximum poll interval, and also dictates how long it will take all consumer group members to enter rebalancing, so to avoid lengthy idle times I recommend you keep cb_event_enable to trigger immediate callbacks, and also use a somewhat-below-max-poll timer to call consumer_poll to stay alive.

@silviucpp
Copy link
Author

@edenhill thanks a lot ! will do the changes and see if problem improves !

Silviu

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants