Skip to content

Commit

Permalink
Avoid problem with configuration queued.min.messages automatically fo…
Browse files Browse the repository at this point in the history
…r Kafka storage
  • Loading branch information
r3b-fish committed Oct 30, 2023
1 parent 3631e47 commit 5a92201
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/Storages/Kafka/StorageKafka.cpp
Expand Up @@ -513,8 +513,9 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
// that allows to prevent fast draining of the librdkafka queue
// during building of single insert block. Improves performance
// significantly, but may lead to bigger memory consumption.
size_t default_queued_min_messages = 100000; // we don't want to decrease the default
conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages));
size_t default_queued_min_messages = 100000; // must be greater than or equal to default
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));

/// a reference to the consumer is needed in statistic callback
/// although the consumer does not exist when callback is being registered
Expand Down

0 comments on commit 5a92201

Please sign in to comment.