Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll in batches #57

Closed
JHK opened this issue Nov 14, 2018 · 8 comments
Closed

Poll in batches #57

JHK opened this issue Nov 14, 2018 · 8 comments

Comments

@JHK
Copy link

JHK commented Nov 14, 2018

Handling the offset right is cumbersome. Especially, when I want to consume a few messages, do a batch-insert into the database and don't want an offset change before the batch-insert has succeeded. To do so I've disabled the auto.commit and do the commit manually after the batch got processed successfully (see zendesk/racecar@d00dbc7).

This can probably be implemented using the rd_kafka_consume_batch binding, which allows to auto commit after a callback got processed successfully.

@mensfeld
Copy link
Member

I'm looking into switching to rdkafka as well once some of the things are available. What I also lack is the batch metadata details (see here: https://github.com/karafka/karafka/blob/1.3-wip/lib/karafka/connection/batch_delegator.rb#L26 and here: https://github.com/karafka/karafka/blob/1.3-wip/lib/karafka/params/builders/metadata.rb)

@mensfeld
Copy link
Member

Just as a clarification on the purpose of that: it is just easier to implement rejection logic based on the offset lag for example (so old things won't be processed at all when the lag is growing in favour of the new once)

@thijsc
Copy link
Collaborator

thijsc commented Mar 20, 2019

@Paxa asked a question related to this issue:

What non-legacy API can we use to implement poll in batches?

I just looked into the docs and there is no poll in batches in the new consumer API. I think this is the case because there is no technical need for it. The high-level consumer already batches under the hood. You're getting messages delivered one by one, but they're being popped of internal batches.

I do think there is a use case we have to think about though, people seem to use the batches to make handling commits easier. @JHK said:

Handling the offset right is cumbersome. Especially, when I want to consume a few messages, do a batch-insert into the database and don't want an offset change before the batch-insert has succeeded. To do so I've disabled the auto.commit and do the commit manually after the batch got processed successfully (see zendesk/racecar@d00dbc7).

I would implement this using store_offset. You can disable enable.auto.offset.store and use this method to control the exact offset that will be stored on the next commit.

@JHK Does that fix this use case for you?

@thijsc
Copy link
Collaborator

thijsc commented Mar 28, 2019

@JHK Do you think using store offset would support your use case?

@JHK
Copy link
Author

JHK commented Mar 29, 2019

As far as I understood the intention of rdkafka-ruby is is meant to be a thin wrapper around librdkafa. If there is no non-legacy API to implement this, calling store_offset explicitly would solve this problem. Then I would not need to commit and wait for an ack after every batch, but update the offset and wait for the next auto-commit to happen asynchronously.

However, I think that there is an abstraction in librdkafka missing then. Batching is a common way to increase throughput. Providing an API which allows the auto-commit for a single message is fine, but when multiple messages have to be read before updating the offset, one needs to tune some parameters very carefully. This could be done best in the native library as otherwise it needs to be implemented in each language again. Should we raise awareness there?

I'd be fine with store_offset for this ticket then. Thank you for your effort on this!

@thijsc
Copy link
Collaborator

thijsc commented Apr 1, 2019

Good to hear!

@thijsc thijsc closed this as completed Apr 1, 2019
@thijsc
Copy link
Collaborator

thijsc commented Apr 1, 2019

However, I think that there is an abstraction in librdkafka missing then. Batching is a common way to increase throughput. Providing an API which allows the auto-commit for a single message is fine, but when multiple messages have to be read before updating the offset, one needs to tune some parameters very carefully. This could be done best in the native library as otherwise it needs to be implemented in each language again. Should we raise awareness there?

I think the library actually gives you control already. You can disable both the automatic offset store and auto commit. This way you have exact control over when what is committed. I think this is the preferred approach using the high level consumer.

I used this approach in a demo project here: https://github.com/appsignal/kafka-talk-demo/blob/master/lib/tasks/processor.rake#L105

@JHK
Copy link
Author

JHK commented Apr 1, 2019

I once read that the offset to be committed gets updated on the next rd_kafka_poll call. I currently don't find the exact behavior what happens to the offset on a poll, but I assume it to be updated on a call to rd_kafka_poll to match the one from the previously read message.

This behavior gives you at least once semantics with auto.commit and enable.auto.offset.store enabled, i.e. your code in ensured to run to full completion until the offset is being updated. I'd just like to have this behavior for n messages.

I like the option to have to set those manually and have control about the exact behavior of the library, but the default should imply reasonable behavior (i.e. at least once delivery). Having this in the base library for many languages makes handling with kafka less error prone.

breunigs added a commit to JHK/racecar that referenced this issue Apr 2, 2019
The advantage is that setting the offset is fast, since only a single
value is updated. It is committed asynchronously in the background.
This was suggested both from rdkafka-ruby [1] and librdkafka [2].

This might also fix the issue where offsets "disappear". It's suspected
this has to do with not receiving new messages for longer than
offsets.retention.minutes and thus not committing. Even though the
consumer stays connected, the offsets get cleaned up. With the patch
there will be auto commits on an interval, which might stop the
offsets from being cleaned up.

[1] karafka/rdkafka-ruby#57 (comment)
[2] within max.poll.interval.ms: https://github.com/edenhill/librdkafka/blob/d0195a0fdd35aca12e4d4277ca3ae11d6165da27/CONFIGURATION.md
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

3 participants