Skip to content

Commit

Permalink
Merge pull request #60111 from ClickHouse/buffer_bg_flush_parallel
Browse files Browse the repository at this point in the history
Flush StorageBuffer into multiple threads if num_layers > 1
  • Loading branch information
alesapin committed Feb 19, 2024
2 parents 491a4cd + 6565423 commit 31bc327
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/Common/CurrentMetrics.cpp
Expand Up @@ -262,6 +262,9 @@
M(ActiveTimersInQueryProfiler, "Number of Active thread local timers in QueryProfiler") \
M(RefreshableViews, "Number materialized views with periodic refreshing (REFRESH)") \
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \
M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer")

#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)
Expand Down
27 changes: 26 additions & 1 deletion src/Storages/StorageBuffer.cpp
Expand Up @@ -5,6 +5,7 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/getColumnFromBlock.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
#include <Storages/AlterCommands.h>
Expand Down Expand Up @@ -56,6 +57,9 @@ namespace CurrentMetrics
{
extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes;
extern const Metric StorageBufferFlushThreads;
extern const Metric StorageBufferFlushThreadsActive;
extern const Metric StorageBufferFlushThreadsScheduled;
}


Expand Down Expand Up @@ -153,6 +157,12 @@ StorageBuffer::StorageBuffer(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);

if (num_shards > 1)
{
flush_pool = std::make_unique<ThreadPool>(
CurrentMetrics::StorageBufferFlushThreads, CurrentMetrics::StorageBufferFlushThreadsActive, CurrentMetrics::StorageBufferFlushThreadsScheduled,
num_shards, 0, num_shards);
}
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
}

Expand Down Expand Up @@ -802,7 +812,22 @@ bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes,
void StorageBuffer::flushAllBuffers(bool check_thresholds)
{
for (auto & buf : buffers)
flushBuffer(buf, check_thresholds, false);
{
if (flush_pool)
{
scheduleFromThreadPool<void>([&] ()
{
flushBuffer(buf, check_thresholds, false);
}, *flush_pool, "BufferFlush");
}
else
{
flushBuffer(buf, check_thresholds, false);
}
}

if (flush_pool)
flush_pool->wait();
}


Expand Down
2 changes: 2 additions & 0 deletions src/Storages/StorageBuffer.h
Expand Up @@ -3,6 +3,7 @@
#include <Core/BackgroundSchedulePool.h>
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <Common/ThreadPool.h>

#include <Poco/Event.h>

Expand Down Expand Up @@ -149,6 +150,7 @@ friend class BufferSink;

/// There are `num_shards` of independent buffers.
const size_t num_shards;
std::unique_ptr<ThreadPool> flush_pool;
std::vector<Buffer> buffers;

const Thresholds min_thresholds;
Expand Down

0 comments on commit 31bc327

Please sign in to comment.