Skip to content

Commit

Permalink
Attempt to implemnt global thread pool ClickHouse#4018
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-milovidov committed Jan 14, 2019
1 parent 35c35f1 commit f6b9b06
Show file tree
Hide file tree
Showing 32 changed files with 241 additions and 106 deletions.
3 changes: 2 additions & 1 deletion dbms/programs/server/MetricsTransmitter.h
Expand Up @@ -6,6 +6,7 @@
#include <thread>
#include <vector>
#include <Common/ProfileEvents.h>
#include <Common/ThreadPool.h>


namespace DB
Expand Down Expand Up @@ -46,7 +47,7 @@ class MetricsTransmitter
bool quit = false;
std::mutex mutex;
std::condition_variable cond;
std::thread thread{&MetricsTransmitter::run, this};
ThreadFromGlobalPool thread{&MetricsTransmitter::run, this};

static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents.";
static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics.";
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/Config/ConfigReloader.cpp
Expand Up @@ -33,7 +33,7 @@ ConfigReloader::ConfigReloader(

void ConfigReloader::start()
{
thread = std::thread(&ConfigReloader::run, this);
thread = ThreadFromGlobalPool(&ConfigReloader::run, this);
}


Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Common/Config/ConfigReloader.h
@@ -1,6 +1,7 @@
#pragma once

#include "ConfigProcessor.h"
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <time.h>
Expand Down Expand Up @@ -81,7 +82,7 @@ class ConfigReloader
Updater updater;

std::atomic<bool> quit{false};
std::thread thread;
ThreadFromGlobalPool thread;

/// Locked inside reloadIfNewer.
std::mutex reload_mutex;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/CurrentThread.cpp
Expand Up @@ -34,7 +34,7 @@ ThreadStatus & CurrentThread::get()

ProfileEvents::Counters & CurrentThread::getProfileEvents()
{
return get().performance_counters;
return current_thread ? get().performance_counters : ProfileEvents::global_counters;
}

MemoryTracker & CurrentThread::getMemoryTracker()
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Expand Up @@ -408,6 +408,7 @@ namespace ErrorCodes
extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431;
extern const int UNKNOWN_CODEC = 432;
extern const int ILLEGAL_CODEC_PARAMETER = 433;
extern const int CANNOT_SCHEDULE_TASK = 434;

extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Common/MemoryTracker.cpp
Expand Up @@ -190,17 +190,20 @@ namespace CurrentMemoryTracker
{
void alloc(Int64 size)
{
DB::CurrentThread::getMemoryTracker().alloc(size);
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().alloc(size);
}

void realloc(Int64 old_size, Int64 new_size)
{
DB::CurrentThread::getMemoryTracker().alloc(new_size - old_size);
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().alloc(new_size - old_size);
}

void free(Int64 size)
{
DB::CurrentThread::getMemoryTracker().free(size);
if (DB::current_thread)
DB::CurrentThread::getMemoryTracker().free(size);
}
}

Expand Down
118 changes: 92 additions & 26 deletions dbms/src/Common/ThreadPool.cpp
@@ -1,44 +1,103 @@
#include <common/ThreadPool.h>
#include <Common/ThreadPool.h>
#include <Common/Exception.h>

#include <iostream>
#include <type_traits>


namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_SCHEDULE_TASK;
}
}


template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads)
: ThreadPoolImpl(num_threads, num_threads)
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads)
: ThreadPoolImpl(max_threads, max_threads, max_threads)
{
}

template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads, size_t queue_size)
: num_threads(num_threads), queue_size(queue_size)
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size)
: max_threads(max_threads), max_free_threads(max_free_threads), queue_size(queue_size)
{
threads.reserve(num_threads);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
{
auto on_error = []
{
if constexpr (std::is_same_v<ReturnType, void>)
throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK);
else
return false;
};

{
std::unique_lock<std::mutex> lock(mutex);
job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; });
std::unique_lock lock(mutex);

auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };

if (wait_microseconds)
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error();
}
else
job_finished.wait(lock, pred);

if (shutdown)
return;
return on_error();

jobs.emplace(std::move(job), priority);
++active_jobs;
++scheduled_jobs;

if (threads.size() < std::min(num_threads, active_jobs))
threads.emplace_back([this] { worker(); });
if (threads.size() < std::min(max_threads, scheduled_jobs))
{
threads.emplace_front();
try
{
threads.front() = Thread([this, it = threads.begin()] { worker(it); });
}
catch (...)
{
threads.pop_front();
}
}
}
new_job_or_shutdown.notify_one();
return ReturnType(true);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}

template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds)
{
scheduleImpl<void>(std::move(job), priority, wait_microseconds);
}

template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
job_finished.wait(lock, [this] { return active_jobs == 0; });
std::unique_lock lock(mutex);
job_finished.wait(lock, [this] { return scheduled_jobs == 0; });

if (first_exception)
{
Expand All @@ -59,7 +118,7 @@ template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock lock(mutex);
shutdown = true;
}

Expand All @@ -74,20 +133,20 @@ void ThreadPoolImpl<Thread>::finalize()
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
std::unique_lock lock(mutex);
return scheduled_jobs;
}

template <typename Thread>
void ThreadPoolImpl<Thread>::worker()
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
while (true)
{
Job job;
bool need_shutdown = false;

{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock lock(mutex);
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
need_shutdown = shutdown;

Expand All @@ -111,11 +170,11 @@ void ThreadPoolImpl<Thread>::worker()
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock lock(mutex);
if (!first_exception)
first_exception = std::current_exception();
shutdown = true;
--active_jobs;
--scheduled_jobs;
}
job_finished.notify_all();
new_job_or_shutdown.notify_all();
Expand All @@ -124,8 +183,15 @@ void ThreadPoolImpl<Thread>::worker()
}

{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
std::unique_lock lock(mutex);
--scheduled_jobs;

if (threads.size() > scheduled_jobs + max_free_threads)
{
threads.erase(thread_it);
job_finished.notify_all();
return;
}
}

job_finished.notify_all();
Expand All @@ -139,14 +205,14 @@ template class ThreadPoolImpl<ThreadFromGlobalPool>;

void ExceptionHandler::setException(std::exception_ptr && exception)
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock lock(mutex);
if (!first_exception)
first_exception = std::move(exception);
}

void ExceptionHandler::throwIfException()
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock lock(mutex);
if (first_exception)
std::rethrow_exception(first_exception);
}
Expand Down

0 comments on commit f6b9b06

Please sign in to comment.