Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ThreadPool in PipelineExecutor #48146

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
Expand Down
1 change: 1 addition & 0 deletions src/Core/ExternalTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <base/find_symbols.h>
#include <base/scope_guard.h>


namespace DB
Expand Down
41 changes: 15 additions & 26 deletions src/Processors/Executors/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <IO/WriteBufferFromString.h>
#include <Common/ThreadPool.h>
#include <Common/CurrentThread.h>
#include <Common/CurrentMetrics.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTracker.h>
#include <Processors/Executors/PipelineExecutor.h>
Expand All @@ -19,6 +21,12 @@
#endif


namespace CurrentMetrics
{
extern const Metric QueryPipelineExecutorThreads;
extern const Metric QueryPipelineExecutorThreadsActive;
}

namespace DB
{

Expand Down Expand Up @@ -304,26 +312,23 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
tasks.init(num_threads, use_threads, profile_processors, trace_processors, read_progress_callback.get());
tasks.fill(queue);

std::unique_lock lock{threads_mutex};
threads.reserve(num_threads);
if (num_threads > 1)
pool = std::make_unique<ThreadPool>(CurrentMetrics::QueryPipelineExecutorThreads, CurrentMetrics::QueryPipelineExecutorThreadsActive, num_threads);
}

void PipelineExecutor::spawnThreads()
{
while (auto slot = slots->tryAcquire())
{
std::unique_lock lock{threads_mutex};
size_t thread_num = threads.size();
size_t thread_num = threads++;

/// Count of threads in use should be updated for proper finish() condition.
/// NOTE: this will not decrease `use_threads` below initially granted count
tasks.upscale(thread_num + 1);

/// Start new thread
threads.emplace_back([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)]
pool->scheduleOrThrowOnError([this, thread_num, thread_group = CurrentThread::getGroup(), slot = std::move(slot)]
{
/// ThreadStatus thread_status;

SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachFromGroupIfNotDetached();
Expand All @@ -347,23 +352,6 @@ void PipelineExecutor::spawnThreads()
}
}

void PipelineExecutor::joinThreads()
{
for (size_t thread_num = 0; ; thread_num++)
{
std::unique_lock lock{threads_mutex};
if (thread_num >= threads.size())
break;
if (threads[thread_num].joinable())
{
auto & thread = threads[thread_num];
lock.unlock(); // to avoid deadlock if thread we are going to join starts spawning threads
thread.join();
}
}
// NOTE: No races: all concurrent spawnThreads() calls are done from `threads`, but they're already joined.
}
azat marked this conversation as resolved.
Show resolved Hide resolved

void PipelineExecutor::executeImpl(size_t num_threads)
{
initializeExecution(num_threads);
Expand All @@ -374,15 +362,16 @@ void PipelineExecutor::executeImpl(size_t num_threads)
if (!finished_flag)
{
finish();
joinThreads();
if (pool)
pool->wait();
}
);

if (num_threads > 1)
{
spawnThreads(); // start at least one thread
tasks.processAsyncTasks();
joinThreads();
pool->wait();
}
else
{
Expand Down
8 changes: 4 additions & 4 deletions src/Processors/Executors/PipelineExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
#include <Processors/IProcessor.h>
#include <Processors/Executors/ExecutorTasks.h>
#include <Common/EventCounter.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ConcurrencyControl.h>

#include <queue>
#include <mutex>
#include <memory>


namespace DB
Expand Down Expand Up @@ -69,8 +70,8 @@ class PipelineExecutor
// Concurrency control related
ConcurrencyControl::AllocationPtr slots;
ConcurrencyControl::SlotPtr single_thread_slot; // slot for single-thread mode to work using executeStep()
std::mutex threads_mutex;
std::vector<ThreadFromGlobalPool> threads;
serxa marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<ThreadPool> pool;
std::atomic_size_t threads = 0;

/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
Expand All @@ -94,7 +95,6 @@ class PipelineExecutor
void initializeExecution(size_t num_threads); /// Initialize executor contexts and task_queue.
void finalizeExecution(); /// Check all processors are finished.
void spawnThreads();
void joinThreads();

/// Methods connected to execution.
void executeImpl(size_t num_threads);
Expand Down
1 change: 1 addition & 0 deletions src/Server/GRPCServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Common/SettingsChanges.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <DataTypes/DataTypeFactory.h>
#include <QueryPipeline/ProfileInfo.h>
#include <Interpreters/Context.h>
Expand Down