Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should rd_kafka_flush always call poll? #1950

Closed
1 of 5 tasks
coldeasy opened this issue Aug 14, 2018 · 5 comments
Closed
1 of 5 tasks

Should rd_kafka_flush always call poll? #1950

coldeasy opened this issue Aug 14, 2018 · 5 comments

Comments

@coldeasy
Copy link

Description

rd_kafka_flush does not call rd_kafka_poll if the timeout given is zero. I feel like it should always call it once since if someone is trying to use this in a non-blocking way, repeatedly calling rd_kafka_flush with a zero timeout will not give the desired results. We can easily work around this by calling rd_kafka_poll repeatedly and checking the queue length afterwards however I think making rd_kafka_flush call rd_kafka_poll at least once would yield a nicer API to work with.

How to reproduce

Produce some messages
Repeatedly call rd_kafka_flush
Notice that the queue length does not decrease.

Checklist

Please provide the following information:

  • librdkafka version (release number or git tag): v0.10.5
  • [x ] Apache Kafka version: 0.10.2
  • librdkafka client configuration: <REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
  • [x ] Operating system: Centos 6 (x64)
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

That makes sense, will fix 👍

@coldeasy
Copy link
Author

👍 Thanks!

@edenhill
Copy link
Contributor

Could you try out the fix (on the fetcholdrktp branch)?

@coldeasy
Copy link
Author

I've tested it with the python client and it works well. Thanks for the quick turnaround.

Before

>>> producer = confluent_kafka.Producer({'bootstrap.servers': '{}:{}'.format(kafka_host, kafka_port)})
>>> producer.produce('topic', 'data')
>>> producer.produce('topic', 'data')
>>> len(producer)
3
>>> producer.flush(0)
3L
>>> len(producer)
3

After

>>> producer = confluent_kafka.Producer({'bootstrap.servers': '{}:{}'.format(kafka_host, kafka_port)})
>>> producer.produce('topic', 'data')
>>> producer.produce('topic', 'data')
>>> len(producer)
4
>>> producer.flush(0)
0
>>> len(producer)
0

@edenhill
Copy link
Contributor

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants