Skip to content

Commit

Permalink
Remove StorageKafka::num_created_consumers (in favor of all_consumers…
Browse files Browse the repository at this point in the history
….size())

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
  • Loading branch information
azat committed Dec 27, 2023
1 parent a0fccb0 commit 123d63e
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
13 changes: 6 additions & 7 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Pipe StorageKafka::read(
size_t /* max_block_size */,
size_t /* num_streams */)
{
if (num_created_consumers == 0)
if (all_consumers.empty())
return {};

if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
Expand All @@ -357,12 +357,12 @@ Pipe StorageKafka::read(

/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_created_consumers);
pipes.reserve(all_consumers.size());
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);

// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < num_created_consumers; ++i)
for (size_t i = 0; i < all_consumers.size(); ++i)
{
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
/// TODO: probably that leads to awful performance.
Expand Down Expand Up @@ -419,7 +419,6 @@ void StorageKafka::startup()
auto consumer = createConsumer(i);
pushConsumer(consumer);
all_consumers.push_back(consumer);
++num_created_consumers;
}
catch (const cppkafka::Exception &)
{
Expand Down Expand Up @@ -447,7 +446,7 @@ void StorageKafka::shutdown(bool)
}

LOG_TRACE(log, "Closing consumers");
for (size_t i = 0; i < num_created_consumers; ++i)
for (size_t i = 0; i < all_consumers.size(); ++i)
auto consumer = popConsumer();
LOG_TRACE(log, "Consumers closed");

Expand Down Expand Up @@ -756,7 +755,7 @@ void StorageKafka::threadFunc(size_t idx)
mv_attached.store(true);

// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled && num_created_consumers > 0)
while (!task->stream_cancelled && !all_consumers.empty())
{
if (!checkDependencies(table_id))
break;
Expand Down Expand Up @@ -844,7 +843,7 @@ bool StorageKafka::streamToViews()
std::vector<std::shared_ptr<KafkaSource>> sources;
Pipes pipes;

auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
auto stream_count = thread_per_consumer ? 1 : all_consumers.size();
sources.reserve(stream_count);
pipes.reserve(stream_count);
for (size_t i = 0; i < stream_count; ++i)
Expand Down
4 changes: 0 additions & 4 deletions src/Storages/Kafka/StorageKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ class StorageKafka final : public IStorage, WithContext

std::atomic<bool> mv_attached = false;

/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.

std::vector<KafkaConsumerPtr> consumers; /// available consumers
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers

Expand Down

0 comments on commit 123d63e

Please sign in to comment.