Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer hangs if closing together with deleting topic #4362

Open
5 of 7 tasks
ilejn opened this issue Jul 21, 2023 · 5 comments
Open
5 of 7 tasks

Consumer hangs if closing together with deleting topic #4362

ilejn opened this issue Jul 21, 2023 · 5 comments

Comments

@ilejn
Copy link

ilejn commented Jul 21, 2023

Description

Consumer hangs if closing together with deleting topic

How to reproduce

Hello,
in ClickHouse we have an issue with an integration test if librdkafka master is more recent than 8e20e1e, IOW if librdkafka contains PR 4117.
Test scenario:
1. Six consumers consume messages from a topic with six partitions.
2. Delete the topic (not via librdkafka)
3. Close the consumers one by one
One of the consumers is more or less reproducibly hangs during closing, while virtually anything helps – it is enough to add a sleep() between (2) and (3) or even try to use a ClickHouse build with a sanitizer.
I tried to create MRU not using ClickHouse, but did not succeeded.

The scenario seems a bit insane, though it is crucial for us and effectively prevents us from using recent librdkafka.
How ClickHouse closes a consumer.
• unsubscribe
• drain queue
• free callbacks
• call rdkafka_consumer_close

ClickHouse maintains rebalance callback (actually cppkafka does).

My investigations.
Problematic part of PR 4117 is rd_kafka_toppar_keep(rktp)
Specifically where it is called from rd_kafka_toppar_pause_resume to do resume.
In rd_kafka_broker_thread_main we are waiting forever while (!rd_kafka_broker_terminating(rkb)) which is actually rd_refcnt_get(&(rkb)->rkb_refcnt) <= 1 .
REFCNT DEBUG output https://pastila.nl/?00659d03/ee47523355fd8a694171a23c8b2a48c6
Some ClickHouse logs https://pastila.nl/?002bbb54/247b8ebbb941432451f7ae5ce10f319b

Am I right thinking that the problem is there is no suitable counterpart to read RD_KAFKA_OP_BARRIER from fetch queue?
Is it possible to resolve this problem at application side?

Checklist

@kwdubuc
Copy link

kwdubuc commented Jul 29, 2023

I recently encountered the same issue.

I traced it down to the fact that:

  1. rd_kafka_toppar_fetch_stop calls rd_kafka_q_fwd_set(rktp->rktp_fetchq, NULL);, which, at least in our application, disconnects the rktp_fetchq from anything that will service it.

  2. An immediately following rd_kafka_toppar_pause_resume call (from a previously-queued RESUME op) calls rd_kafka_toppar_op_version_bump, which puts a BARRIER op on that rktp_fetchq. That op contains a reference to the rktp object, so if the rktp_fetchq is never serviced, the rktp object is never freed. The rktp object in turn contains a reference to its former rktp_broker object, so the rktp_broker's reference count never drops to 1. Which in turn means that the broker's thread never terminated.

  3. An eventual application call to rd_kafka_destroy waits for the broker thread to terminate, which never happens; the application hangs.

I patched this with the following change in rd_kafka_toppar_broker_leave_for_remove:

diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c
index 46d2fb3e..735c2bf9 100644
--- a/src/rdkafka_partition.c
+++ b/src/rdkafka_partition.c
@@ -1086,6 +1086,13 @@ void rd_kafka_toppar_broker_leave_for_remove(rd_kafka_toppar_t *rktp) {
                 rd_kafka_toppar_set_fetch_state(
                     rktp, RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY);
 
+        /* Purge any stale operations on the fetchq; nothing will serve them
+         * at this point. */
+        if (!rd_kafka_q_is_fwded(rktp->rktp_fetchq)) {
+                rd_kafka_q_disable(rktp->rktp_fetchq);
+                rd_kafka_q_purge(rktp->rktp_fetchq);
+        }
+
         rko           = rd_kafka_op_new(RD_KAFKA_OP_PARTITION_LEAVE);
         rko->rko_rktp = rd_kafka_toppar_keep(rktp);

But, it's my first foray into the deeper guts of rdkafka; I have no particular confidence that that's the best patch.

@emasab
Copy link
Contributor

emasab commented Jul 31, 2023

Hello, does it happen with versions >= 2.1.0 ? There's this fix in that version.

A reference count issue was blocking the consumer from closing.
The problem would happen when a partition is lost, because forcibly
unassigned from the consumer or if the corresponding topic is deleted.

@kwdubuc
Copy link

kwdubuc commented Jul 31, 2023

Yes, we're working off a fork of v2.1.0, which has #4187.

@kwdubuc
Copy link

kwdubuc commented Aug 1, 2023

I've attached debug logs illustrating the issue in our reproduction.
confluentinc-librdkafka-4362.txt

@wbarnha
Copy link

wbarnha commented Jan 3, 2024

Bumping for attention, still seeing the same issue on v1.9.2 and v2.3.0 on Python 3.11 using confluent-kafka-python. Strangely enough, I didn't have this issue with v1.9.2 on Python 3.8.

I don't have data to quantify this, but I do think that the aforementioned patch did successfully reduce the incidence of consumers failing to close. I'll need to dig deeper to see what's going on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants