Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add field to rd_kafka_queue which denotes if it contains fetched msgs #4256

Merged
merged 10 commits into from
Apr 21, 2023

Conversation

milindl
Copy link
Contributor

@milindl milindl commented Apr 14, 2023

An issue in v2.1.0 was fixed in which max.poll.interval.ms was not honored,
because it was reset on any queue poll, not just consumer poll.
It was changed it so that only certain rdkafka.h functions which were polling
would reset the timer.

However, librdkafka exposes a method rd_kafka_queue_get_consumer, which returns
the consumer queue, and the application can poll this queue for events rather
than calling consume poll. There is no way to distinguish polls to this queue
and an arbitrary queue, and it won't reset the timer.

So, a new field is maintained inside the queue denoting if it might
contain fetched messages, or not. It deals with forwarding of queues, so
if a queue which receives fetched messages is forwarded multiple times,
calling poll on the forwardee will also reset the timer.

Note that there are a few points where someone might forward/unforward into a queue while polling it. (See cases with does_q_contain_consumer_msgs = rd_kafka_q_consumer_cnt(rkq, 1);) which might make the timer reset wrong on these occasions.

I don't think it matters much since that's a somewhat convoluted use case and to fix it will require taking a lock over the duration of the entire poll. But let me know what you think.

Fixes confluentinc/confluent-kafka-go#980

src/rdkafka.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are locking on polling, please check performance once.

CHANGELOG.md Show resolved Hide resolved
src/rdkafka_cgrp.c Outdated Show resolved Hide resolved
src/rdkafka_queue.h Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.c Show resolved Hide resolved
@milindl milindl force-pushed the dev_max_poll_not_reset_if_consume_q_polled_separately branch from 68aaa51 to e921874 Compare April 19, 2023 07:24
@milindl milindl requested a review from pranavrth April 19, 2023 07:28
src/rdkafka.h Outdated Show resolved Hide resolved
src/rdkafka_queue.c Outdated Show resolved Hide resolved
src/rdkafka_queue.h Show resolved Hide resolved
tests/0089-max_poll_interval.c Outdated Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
@milindl milindl requested a review from emasab April 20, 2023 09:46
milindl and others added 8 commits April 20, 2023 15:39
An issue in v2.1.0 was fixed in which max.poll.interval.ms was not honored,
because it was reset on any queue poll, not just consumer poll.
It was changed it so that only certain rdkafka.h functions which were polling
would reset the timer.

However, librdkafka exposes a method rd_kafka_queue_get_consumer, which returns
the consumer queue, and the application can poll this queue for events rather
than calling consume poll. There is no way to distinguish polls to this queue
and an arbitrary queue, and it won't reset the timer.

So, a new field is maintained inside the queue denoting if it might
contain fetched messages, or not. It deals with forwarding of queues, so
if a queue which receives fetched messages is forwarded multiple times,
calling poll on the forwardee will also reset the timer.
Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
@milindl milindl force-pushed the dev_max_poll_not_reset_if_consume_q_polled_separately branch from a1bbc5d to abd072f Compare April 20, 2023 10:12
@milindl milindl requested a review from pranavrth April 20, 2023 10:33
Copy link
Collaborator

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Milind, it was difficult to find the correct combination of changes that solves both problems by resetting the timer only when needed and fits with the existing code.

@milindl
Copy link
Contributor Author

milindl commented Apr 21, 2023

Thanks Emanuele. Will merge after #4262 for CI

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@milindl milindl merged commit b0b5bfe into master Apr 21, 2023
@milindl milindl deleted the dev_max_poll_not_reset_if_consume_q_polled_separately branch April 21, 2023 07:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unable to rejoin a group when consumer poll timeout in confluent-kafka-go v2.1.0
3 participants