Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer timeout issue? #2634

Closed
5 of 7 tasks
keith-chew opened this issue Nov 22, 2019 · 10 comments
Closed
5 of 7 tasks

Consumer timeout issue? #2634

keith-chew opened this issue Nov 22, 2019 · 10 comments

Comments

@keith-chew
Copy link

keith-chew commented Nov 22, 2019

Description

In our node-rdkafka test app, we have a simple 1 second loop doing:

  • consume msgs (max 1200)
  • print elapsed time for consume above
  • commit

We use setDefaultConsumeTimeout(1000).

We can see the minimum consume time is 1000ms, which is expected, but quite often the time is around 3-5s and sometimes up to 8s..!

How to reproduce

We use a simple test app above which prints the elapsed time for consume() in each loop.

Checklist

  • librdkafka version (release number or git tag): 1.2.2
  • Apache Kafka version: 2.1.3
  • librdkafka client configuration: "enable.auto.commit": false
  • Operating system: rhel7`
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue

Below is the logs for an expected result of 1000ms:

{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op OFFSET_COMMIT (v0) in state up (join state started, v8 vs 0)"}] 
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [6]: stored offset 131978961, committed offset 131978941: setting stored offset 131978961 for commit"}] 
{"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/2: OffsetCommit for 1 partition(s): manual: returned: Success"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 1 message(s) (990 bytes, 1 ops) on my-topic [6] fetch queue (qlen 1, v2, last_offset 131978961, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
consume() [1004]ms

Below is the logs for a consume() which took 4.7s:

{"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op OFFSET_COMMIT (v0) in state up (join state started, v8 vs 0)"}] 
{"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [6]: stored offset 131978883, committed offset 131978869: setting stored offset 131978883 for commit"}] 
{"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/2: OffsetCommit for 1 partition(s): manual: returned: Success"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 4 message(s) (3675 bytes, 4 ops) on my-topic [6] fetch queue (qlen 4, v2, last_offset 131978886, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978887 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 5 message(s) (4499 bytes, 5 ops) on my-topic [6] fetch queue (qlen 5, v2, last_offset 131978891, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1817 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978893, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978894 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 4 message(s) (3760 bytes, 4 ops) on my-topic [6] fetch queue (qlen 4, v2, last_offset 131978897, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 18 message(s) (16788 bytes, 18 ops) on my-topic [6] fetch queue (qlen 18, v2, last_offset 131978915, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 5 message(s) (4499 bytes, 5 ops) on my-topic [6] fetch queue (qlen 5, v2, last_offset 131978920, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1817 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978922, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978923 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 3 message(s) (2829 bytes, 3 ops) on my-topic [6] fetch queue (qlen 3, v2, last_offset 131978925, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1849 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978927, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 3 message(s) (2821 bytes, 3 ops) on my-topic [6] fetch queue (qlen 3, v2, last_offset 131978930, 0 ctrl msgs, uncompressed)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
{"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
consume() [4759]ms

You can see there are 10 x CONSUME calls, making the total time 4.7s

If we used setDefaultConsumeTimeout(100). we can see the consume() time is always between 100ms and 200ms. The variation is much smaller compared to the above.

Is there an bug with the consume call which does not time out properly?

@edenhill
Copy link
Contributor

It would be very useful with timestamps in the debug logs since this is a timing related issue

@keith-chew
Copy link
Author

My apologies, I accidentally stripped out the timestamps from the logs earlier. Logs with timestamp below:

Nov 23 00:28:33 {"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op OFFSET_COMMIT (v0) in state up (join state started, v8 vs 0)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [6]: stored offset 131978961, committed offset 131978941: setting stored offset 131978961 for commit"}] 
Nov 23 00:28:33 {"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/2: OffsetCommit for 1 partition(s): manual: returned: Success"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978961 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 1 message(s) (990 bytes, 1 ops) on my-topic [6] fetch queue (qlen 1, v2, last_offset 131978961, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:33 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:34 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:35 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:35 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:35 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978962 (v2)"}] 
Nov 23 00:28:35 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:35 consume() [1004]ms

and second one:

Nov 23 00:28:18 {"severity":7,"fac":"CGRPOP","message":"[thrd:main]: Group \"my-group\" received op OFFSET_COMMIT (v0) in state up (join state started, v8 vs 0)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"OFFSET","message":"[thrd:main]: Topic my-topic [6]: stored offset 131978883, committed offset 131978869: setting stored offset 131978883 for commit"}] 
Nov 23 00:28:18 {"severity":7,"fac":"COMMIT","message":"[thrd:main]: GroupCoordinator/2: OffsetCommit for 1 partition(s): manual: returned: Success"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:18 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978883 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 4 message(s) (3675 bytes, 4 ops) on my-topic [6] fetch queue (qlen 4, v2, last_offset 131978886, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978887 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 5 message(s) (4499 bytes, 5 ops) on my-topic [6] fetch queue (qlen 5, v2, last_offset 131978891, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:19 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978892 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1817 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978893, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978894 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 4 message(s) (3760 bytes, 4 ops) on my-topic [6] fetch queue (qlen 4, v2, last_offset 131978897, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978898 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 18 message(s) (16788 bytes, 18 ops) on my-topic [6] fetch queue (qlen 18, v2, last_offset 131978915, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:20 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978916 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 5 message(s) (4499 bytes, 5 ops) on my-topic [6] fetch queue (qlen 5, v2, last_offset 131978920, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:21 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978921 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1817 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978922, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978923 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 3 message(s) (2829 bytes, 3 ops) on my-topic [6] fetch queue (qlen 3, v2, last_offset 131978925, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
Nov 23 00:28:22 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978926 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 2 message(s) (1849 bytes, 2 ops) on my-topic [6] fetch queue (qlen 2, v2, last_offset 131978927, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978928 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 3 message(s) (2821 bytes, 3 ops) on my-topic [6] fetch queue (qlen 3, v2, last_offset 131978930, 0 ctrl msgs, uncompressed)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:23 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch topic my-topic [6] at offset 131978931 (v2)"}] 
Nov 23 00:28:24 {"severity":7,"fac":"FETCH","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Fetch 1/1/4 toppar(s)"}] 
Nov 23 00:28:24 consume() [4759]ms

@edenhill
Copy link
Contributor

As soon as you see a line like this:
Nov 23 00:28:19 {"severity":7,"fac":"CONSUME","message":"[thrd:kafka01:9092/bootstrap]: kafka01:9092/1: Enqueue 4 message(s) (3675 bytes, 4 ops) on my-topic [6] fetch queue (qlen 4, v2, last_offset 131978886, 0 ctrl msgs, uncompressed)"}]

.. the messages should be available to consume from the application and any blocking consume/poll calls should return a message.

The two reasons I can think of that causes the long delay would be:

  • You are using the batch consume interface and the message count has not yet been reached
  • Your system is gravely underpowered and your application thread is not getting any CPU time (sounds unlikely, but could be possible on a single-core 90ies vacuum cleaner)

@keith-chew
Copy link
Author

Hmm the first case cannot be true as we use 1200 as the max msg count:

    const messages: any = await new Promise((resolve, reject) => {
	this.consumer.consume(1200, (err: any, messages: any) => {
	    if (err) {
		return reject(err);
	    }
	    resolve(messages);
	});
    });

When the consumer is consuming 1200 msgs/s, the time is very fast, under 50ms... The above only happens when the traffic is medium to low (in the 2 cases of the above, messages returned were less than 30)... And because it is not high traffic, i can also confirm the CPU is under 30% utilised.

But your statement about the msgs should be available after CONSUME is very helpful, I will track this further.

@keith-chew
Copy link
Author

Since this is in a NodeJS environment, out of curiosity, what is the minimum number of threads required by librdkafka? As I have seen discussions about UV_THREADPOOL_SIZE in node-rdkafka.

@edenhill
Copy link
Contributor

I don't know about node, but librdkafka will want at least 3 CPU (v)cores to be reasonably performant

@keith-chew
Copy link
Author

OK, I have tracked down where the issue it, below is the code in node-rdkafka. Basically, it is using the 1s as the timeout for each poll to librdkafka, not as the total timeout. So, if small number of messages trickle in, say 2 in 500ms, 5 in 800ms, 4 in 300ms, etc, the method will happily keep consuming until the it hits 0 messages in 1s. This also explains why a 100ms timeout has a smaller return time variation, because the chances of timing out (for low traffic) in 100ms will be greater than 1s. It also explains why at high traffic we see the method return in 50ms... This is not too good for us because this method is not flexible enough to cater for both high and low throughput conditions.

void KafkaConsumerConsumeNum::Execute() {
  std::size_t max = static_cast<std::size_t>(m_num_messages);
  bool looping = true;
  int timeout_ms = m_timeout_ms;

  while (m_messages.size() < max && looping) {
    // Get a message
    Baton b = m_consumer->Consume(timeout_ms);
    switch (b.err()) {
      case RdKafka::ERR__PARTITION_EOF:
        // If partition EOF and have consumed messages, retry with timeout 1
        // This allows getting ready messages, while not waiting for new ones
        if (m_messages.size() > 0) {
          timeout_ms = 1;
        }
        break;
      case RdKafka::ERR__TIMED_OUT:
      case RdKafka::ERR__TIMED_OUT_QUEUE:
        // Break of the loop if we timed out
        looping = false;
        break;
      case RdKafka::ERR_NO_ERROR:
        m_messages.push_back(b.data<RdKafka::Message*>());
        break;
      default:
        // Set the error for any other errors and break
        if (m_messages.size() == 0) {
          SetErrorBaton(b);
        }
        looping = false;
        break;
    }
  }
}

@keith-chew
Copy link
Author

Just to confirm that method is causing the issue, I tried this modification on my local:

  Baton b = m_consumer->Consume(timeout_ms);
  double elapsed = end - start;
  if (elapsed > timeout_ms) {
    looping = false;
  }

After this change, consume() always returns in less than 2 x timeout_ms, which is what we expect. Thank you so much for helping me track this down, much appreciated!

@keith-chew
Copy link
Author

After doing more testing, and re-reading your original 2 points, just wanted to say that you were absolutely spot on. Your first point is correct, and helped me track down the issue. I have now implemented a total timeout option, which the consume() must return by, it works well for low/medium traffic. Your second point is also correct, after doing a load test, I can confirm that the return time will be longer due to CPU starvation, but this can be controlled by throttling the consumption rate.

Thank you once again for your time and help.

@edenhill
Copy link
Contributor

Great analysis and fix!
It is a common mistake not to reduce the time spent while doing an accumulation loop, I've done it myself multiple times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants