Skip to content

Commit

Permalink
Fix to main loop timeout calculation
Browse files Browse the repository at this point in the history
leading to a tight loop for a max period of 1 ms

When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x
  • Loading branch information
emasab committed Apr 3, 2024
1 parent f405243 commit f5ffbf8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ librdkafka v2.3.1 is a maintenance release:
and updating the topic id corresponding to the partition name (#4660)
* Fixes to metadata cache expiration, metadata refresh interruption and
to avoid usage of stale metadata (#4660).
* Fix to main loop timeout calculation leading to a tight loop for a
max period of 1 ms (#4671).


## Fixes
Expand All @@ -32,6 +34,10 @@ librdkafka v2.3.1 is a maintenance release:
was undergoing a validation and being retried because of an error.
Solved by doing a partition migration only with a non-stale leader epoch.
Happening since 2.1.0 (#4660).
* When the main thread loop was awakened less than 1 ms
before the expiration of a timeout, it was serving with a zero timeout,
leading to increased CPU usage until the timeout was reached.
Happening since 1.x (#4671).



Expand Down
5 changes: 4 additions & 1 deletion src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,10 @@ static int rd_kafka_thread_main(void *arg) {
RD_KAFKA_CGRP_STATE_TERM)))) {
rd_ts_t sleeptime = rd_kafka_timers_next(
&rk->rk_timers, 1000 * 1000 /*1s*/, 1 /*lock*/);
rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
/* Use ceiling division to avoid calling serve with a 0 ms
* timeout in a tight loop until 1 ms has passed. */
int timeout_ms = (sleeptime + 999) / 1000;
rd_kafka_q_serve(rk->rk_ops, timeout_ms, 0,
RD_KAFKA_Q_CB_CALLBACK, NULL, NULL);
if (rk->rk_cgrp) /* FIXME: move to timer-triggered */
rd_kafka_cgrp_serve(rk->rk_cgrp);
Expand Down

0 comments on commit f5ffbf8

Please sign in to comment.