Skip to content

Commit

Permalink
Clean cached messages on destroy kafka consumer
Browse files Browse the repository at this point in the history
The callchain of the kafka consumer is very tricky, so for the sake of
common sense let's just clean the messages on moving out consumer (and
in dtor, but this is just to keep that two code path in sync).

(Also reported by @filimonov)

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
  • Loading branch information
azat committed Dec 29, 2023
1 parent b3d6caf commit 853fdfe
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Storages/Kafka/KafkaConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)

ConsumerPtr && KafkaConsumer::moveConsumer()
{
cleanUnprocessed();
if (!consumer->get_subscription().empty())
{
try
Expand All @@ -173,6 +174,7 @@ KafkaConsumer::~KafkaConsumer()
if (!consumer)
return;

cleanUnprocessed();
try
{
if (!consumer->get_subscription().empty())
Expand Down

0 comments on commit 853fdfe

Please sign in to comment.