New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create consumers for Kafka tables on fly with TTL (resubmit) #58310
Create consumers for Kafka tables on fly with TTL (resubmit) #58310
Conversation
….size()) Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 123d63e)
Fixes: ClickHouse#42777 Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 51d4f58)
…e last used) Pool of consumers created a problem for librdkafka internal statistics, you need to read from the queue always, while in ClickHouse consumers created regardless are there any readers or not (attached materialized views or direct SELECTs). Otherwise, this statistics messages got queued and never released, which: - creates live memory leak - and also makes destroy very slow, due to librdkafka internals (it moves entries from this queue into another linked list, but in a with sorting, which is incredibly slow for linked lists) So the idea is simple, let's create a pool of consumers only when they are required, and destroy them after some timeout (right now it is 60 seconds) if nobody uses them, that way this problem should gone. This should also reduce number of internal librdkafka threads, when nobody reads from Kafka tables. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit e7592c1)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit db74549)
This will make system.kafka_consumers more useful, since after TTL consumer object will be removed prio this patch, but after, all information will be preserved. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 2ff0bfb)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit b19b70b)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit a7453f7)
Since pool may exceed threads, while we need to run this thread always to avoid memory leaking. And this should not be a problem since librdkafka has multiple threads for each consumer (5!) anyway. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 06a9e9a)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 1f03a21)
Actually now we can create consumer object in the ctor, no need to do this in startup(), since consumer now do not connects to kafka. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit 0321820)
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com> (cherry picked from commit ebad1bf)
CI founds [1]: Exception: Sanitizer assert found for instance �================================================================= ==1==ERROR: AddressSanitizer: heap-use-after-free on address 0x5250006a4100 at pc 0x55d4ed46d2e2 bp 0x7f7e33b40190 sp 0x7f7e33b3f950 WRITE of size 5390 at 0x5250006a4100 thread T2 (TCPHandler) 8 0x55d50eba9497 in DB::KafkaConsumer::setRDKafkaStat(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&) build_docker/./src/Storages/Kafka/KafkaConsumer.h:117:22 12 0x55d51e0eebfe in cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) build_docker/./contrib/cppkafka/src/configuration.cpp:92:5 13 0x55d51e151e3d in rd_kafka_poll_cb build_docker/./contrib/librdkafka/src/rdkafka.c:3790:7 14 0x55d51e15531b in rd_kafka_consumer_close build_docker/./contrib/librdkafka/src/rdkafka.c:3200:31 15 0x55d51e0f3241 in cppkafka::Consumer::close() build_docker/./contrib/cppkafka/src/consumer.cpp:293:33 16 0x55d51e0f3241 in cppkafka::Consumer::~Consumer() build_docker/./contrib/cppkafka/src/consumer.cpp:82:9 20 0x55d50eb8d12e in DB::KafkaConsumer::~KafkaConsumer() build_docker/./src/Storages/Kafka/KafkaConsumer.cpp:179:1 0x5250006a4100 is located 0 bytes inside of 8736-byte region [0x5250006a4100,0x5250006a6320) freed by thread T2 (TCPHandler) here: 0 0x55d4ed4a26b2 in operator delete(void*, unsigned long) (/usr/bin/clickhouse+0xa94b6b2) (BuildId: 74ec4a14a5109c41de109e82d56d8d863845144d) 1 0x55d50eb8ca55 in void std::__1::__libcpp_operator_delete[abi:v15000]<void*, unsigned long>(void*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:256:3 2 0x55d50eb8ca55 in void std::__1::__do_deallocate_handle_size[abi:v15000]<>(void*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:282:10 3 0x55d50eb8ca55 in std::__1::__libcpp_deallocate[abi:v15000](void*, unsigned long, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:296:14 4 0x55d50eb8ca55 in std::__1::allocator<char>::deallocate[abi:v15000](char*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/__memory/allocator.h:128:13 5 0x55d50eb8ca55 in std::__1::allocator_traits<std::__1::allocator<char>>::deallocate[abi:v15000](std::__1::allocator<char>&, char*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/__memory/allocator_traits.h:282:13 6 0x55d50eb8ca55 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>::~basic_string() build_docker/./contrib/llvm-project/libcxx/include/string:2334:9 7 0x55d50eb8ca55 in DB::KafkaConsumer::~KafkaConsumer() build_docker/./src/Storages/Kafka/KafkaConsumer.cpp:179:1 [1]: https://s3.amazonaws.com/clickhouse-test-reports/0/745d9bb47f3425e28e5660ed7c730038ffece4ee/integration_tests__asan__analyzer__%5B6_6%5D/integration_run_parallel4_0.log Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
af6f594
to
ecf7188
Compare
This is an automated comment for commit 853fdfe with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
|
We've discussed this with @filimonov and he pointed out that everything else (except for rdkafka_stat/rdkafka_stat_mutex) is done via members orders, so let's do it in the same style. Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
The callchain of the kafka consumer is very tricky, so for the sake of common sense let's just clean the messages on moving out consumer (and in dtor, but this is just to keep that two code path in sync). (Also reported by @filimonov) Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
|
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Create consumers for Kafka tables on fly (but keep them for some period -
kafka_consumers_pool_ttl_ms
, since last used), this should fix problem with statistics forsystem.kafka_consumers
(that does not consumed when nobody reads from Kafka table, which leads to live memory leak and slow table detach) and also this PR enables stats forsystem.kafka_consumers
by default again.Pool of consumers created a problem for librdkafka internal statistics,
you need to read from the queue always, while in ClickHouse consumers
created regardless are there any readers or not (attached materialized
views or direct SELECTs).
Otherwise, this statistics messages got queued and never released,
which:
moves entries from this queue into another linked list, but in a
with sorting, which is incredibly slow for linked lists)
So the idea is simple, let's create a pool of consumers only when they
are required, and destroy them after some timeout (right now it is 60
seconds) if nobody uses them, that way this problem should gone.
This should also reduce number of internal librdkafka threads, when
nobody reads from Kafka tables.
Fixes: #50999 (cc @ilejn)
Resubmits: #57829 (the last patch from the PR fixes the use-after-free reported here #57829 (comment))