diff --git a/dbms/programs/server/MetricsTransmitter.h b/dbms/programs/server/MetricsTransmitter.h index e85113ad1414..fd3853a7a9ec 100644 --- a/dbms/programs/server/MetricsTransmitter.h +++ b/dbms/programs/server/MetricsTransmitter.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -46,7 +47,7 @@ class MetricsTransmitter bool quit = false; std::mutex mutex; std::condition_variable cond; - std::thread thread{&MetricsTransmitter::run, this}; + ThreadFromGlobalPool thread{&MetricsTransmitter::run, this}; static constexpr auto profile_events_path_prefix = "ClickHouse.ProfileEvents."; static constexpr auto current_metrics_path_prefix = "ClickHouse.Metrics."; diff --git a/dbms/src/Common/Config/ConfigReloader.cpp b/dbms/src/Common/Config/ConfigReloader.cpp index ed6fad4d42c1..063fbec8e5b5 100644 --- a/dbms/src/Common/Config/ConfigReloader.cpp +++ b/dbms/src/Common/Config/ConfigReloader.cpp @@ -33,7 +33,7 @@ ConfigReloader::ConfigReloader( void ConfigReloader::start() { - thread = std::thread(&ConfigReloader::run, this); + thread = ThreadFromGlobalPool(&ConfigReloader::run, this); } diff --git a/dbms/src/Common/Config/ConfigReloader.h b/dbms/src/Common/Config/ConfigReloader.h index ca4c97c5aeea..c0904422b39c 100644 --- a/dbms/src/Common/Config/ConfigReloader.h +++ b/dbms/src/Common/Config/ConfigReloader.h @@ -1,6 +1,7 @@ #pragma once #include "ConfigProcessor.h" +#include #include #include #include @@ -81,7 +82,7 @@ class ConfigReloader Updater updater; std::atomic quit{false}; - std::thread thread; + ThreadFromGlobalPool thread; /// Locked inside reloadIfNewer. std::mutex reload_mutex; diff --git a/dbms/src/Common/CurrentThread.cpp b/dbms/src/Common/CurrentThread.cpp index 8c05c91bac3c..c3e0cae9571a 100644 --- a/dbms/src/Common/CurrentThread.cpp +++ b/dbms/src/Common/CurrentThread.cpp @@ -34,7 +34,7 @@ ThreadStatus & CurrentThread::get() ProfileEvents::Counters & CurrentThread::getProfileEvents() { - return get().performance_counters; + return current_thread ? get().performance_counters : ProfileEvents::global_counters; } MemoryTracker & CurrentThread::getMemoryTracker() diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 13ea9e4744ae..eb52b6ff7e36 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -408,6 +408,7 @@ namespace ErrorCodes extern const int ILLEGAL_SYNTAX_FOR_CODEC_TYPE = 431; extern const int UNKNOWN_CODEC = 432; extern const int ILLEGAL_CODEC_PARAMETER = 433; + extern const int CANNOT_SCHEDULE_TASK = 434; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index f7c2eb0ef786..6a997e3b19a4 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -190,17 +190,20 @@ namespace CurrentMemoryTracker { void alloc(Int64 size) { - DB::CurrentThread::getMemoryTracker().alloc(size); + if (DB::current_thread) + DB::CurrentThread::getMemoryTracker().alloc(size); } void realloc(Int64 old_size, Int64 new_size) { - DB::CurrentThread::getMemoryTracker().alloc(new_size - old_size); + if (DB::current_thread) + DB::CurrentThread::getMemoryTracker().alloc(new_size - old_size); } void free(Int64 size) { - DB::CurrentThread::getMemoryTracker().free(size); + if (DB::current_thread) + DB::CurrentThread::getMemoryTracker().free(size); } } diff --git a/dbms/src/Common/ThreadPool.cpp b/dbms/src/Common/ThreadPool.cpp index e3f03e18a464..487bd6fd66d6 100644 --- a/dbms/src/Common/ThreadPool.cpp +++ b/dbms/src/Common/ThreadPool.cpp @@ -1,44 +1,103 @@ -#include +#include +#include + #include +#include + + +namespace DB +{ + namespace ErrorCodes + { + extern const int CANNOT_SCHEDULE_TASK; + } +} template -ThreadPoolImpl::ThreadPoolImpl(size_t num_threads) - : ThreadPoolImpl(num_threads, num_threads) +ThreadPoolImpl::ThreadPoolImpl(size_t max_threads) + : ThreadPoolImpl(max_threads, max_threads, max_threads) { } template -ThreadPoolImpl::ThreadPoolImpl(size_t num_threads, size_t queue_size) - : num_threads(num_threads), queue_size(queue_size) +ThreadPoolImpl::ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size) + : max_threads(max_threads), max_free_threads(max_free_threads), queue_size(queue_size) { - threads.reserve(num_threads); } template -void ThreadPoolImpl::schedule(Job job, int priority) +template +ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::optional wait_microseconds) { + auto on_error = [] + { + if constexpr (std::is_same_v) + throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK); + else + return false; + }; + { - std::unique_lock lock(mutex); - job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; }); + std::unique_lock lock(mutex); + + auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; + + if (wait_microseconds) + { + if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) + return on_error(); + } + else + job_finished.wait(lock, pred); + if (shutdown) - return; + return on_error(); jobs.emplace(std::move(job), priority); - ++active_jobs; + ++scheduled_jobs; - if (threads.size() < std::min(num_threads, active_jobs)) - threads.emplace_back([this] { worker(); }); + if (threads.size() < std::min(max_threads, scheduled_jobs)) + { + threads.emplace_front(); + try + { + threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + } + catch (...) + { + threads.pop_front(); + } + } } new_job_or_shutdown.notify_one(); + return ReturnType(true); +} + +template +void ThreadPoolImpl::schedule(Job job, int priority) +{ + scheduleImpl(std::move(job), priority, std::nullopt); +} + +template +bool ThreadPoolImpl::trySchedule(Job job, int priority, uint64_t wait_microseconds) +{ + return scheduleImpl(std::move(job), priority, wait_microseconds); +} + +template +void ThreadPoolImpl::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds) +{ + scheduleImpl(std::move(job), priority, wait_microseconds); } template void ThreadPoolImpl::wait() { { - std::unique_lock lock(mutex); - job_finished.wait(lock, [this] { return active_jobs == 0; }); + std::unique_lock lock(mutex); + job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); if (first_exception) { @@ -59,7 +118,7 @@ template void ThreadPoolImpl::finalize() { { - std::unique_lock lock(mutex); + std::unique_lock lock(mutex); shutdown = true; } @@ -74,12 +133,12 @@ void ThreadPoolImpl::finalize() template size_t ThreadPoolImpl::active() const { - std::unique_lock lock(mutex); - return active_jobs; + std::unique_lock lock(mutex); + return scheduled_jobs; } template -void ThreadPoolImpl::worker() +void ThreadPoolImpl::worker(typename std::list::iterator thread_it) { while (true) { @@ -87,7 +146,7 @@ void ThreadPoolImpl::worker() bool need_shutdown = false; { - std::unique_lock lock(mutex); + std::unique_lock lock(mutex); new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); }); need_shutdown = shutdown; @@ -111,11 +170,11 @@ void ThreadPoolImpl::worker() catch (...) { { - std::unique_lock lock(mutex); + std::unique_lock lock(mutex); if (!first_exception) first_exception = std::current_exception(); shutdown = true; - --active_jobs; + --scheduled_jobs; } job_finished.notify_all(); new_job_or_shutdown.notify_all(); @@ -124,8 +183,15 @@ void ThreadPoolImpl::worker() } { - std::unique_lock lock(mutex); - --active_jobs; + std::unique_lock lock(mutex); + --scheduled_jobs; + + if (threads.size() > scheduled_jobs + max_free_threads) + { + threads.erase(thread_it); + job_finished.notify_all(); + return; + } } job_finished.notify_all(); @@ -139,14 +205,14 @@ template class ThreadPoolImpl; void ExceptionHandler::setException(std::exception_ptr && exception) { - std::unique_lock lock(mutex); + std::unique_lock lock(mutex); if (!first_exception) first_exception = std::move(exception); } void ExceptionHandler::throwIfException() { - std::unique_lock lock(mutex); + std::unique_lock lock(mutex); if (first_exception) std::rethrow_exception(first_exception); } diff --git a/dbms/src/Common/ThreadPool.h b/dbms/src/Common/ThreadPool.h index a8cf84dd7b53..c0ab07fdcf51 100644 --- a/dbms/src/Common/ThreadPool.h +++ b/dbms/src/Common/ThreadPool.h @@ -6,15 +6,23 @@ #include #include #include -#include +#include +#include #include +#include + /** Very simple thread pool similar to boost::threadpool. * Advantages: * - catches exceptions and rethrows on wait. + * + * This thread pool can be used as a task queue. + * For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks + * - in this case you will be blocked to keep 10 tasks in fly. + * + * Thread: std::thread or something with identical interface. */ - template class ThreadPoolImpl { @@ -22,16 +30,22 @@ class ThreadPoolImpl using Job = std::function; /// Size is constant. Up to num_threads are created on demand and then run until shutdown. - explicit ThreadPoolImpl(size_t num_threads); + explicit ThreadPoolImpl(size_t max_threads); - /// queue_size - maximum number of running plus scheduled jobs. It can be greater than num_threads. Zero means unlimited. - ThreadPoolImpl(size_t num_threads, size_t queue_size); + /// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited. + ThreadPoolImpl(size_t max_threads, size_t max_free_threads, size_t queue_size); - /// Add new job. Locks until number of active jobs is less than maximum or exception in one of threads was thrown. + /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown. /// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function. /// Priority: greater is higher. void schedule(Job job, int priority = 0); + /// Wait for specified amount of time and schedule a job or return false. + bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0); + + /// Wait for specified amount of time and schedule a job or throw an exception. + void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0); + /// Wait for all currently active jobs to be done. /// You may call schedule and wait many times in arbitary order. /// If any thread was throw an exception, first exception will be rethrown from this method, @@ -42,8 +56,6 @@ class ThreadPoolImpl /// You should not destroy object while calling schedule or wait methods from another threads. ~ThreadPoolImpl(); - size_t size() const { return num_threads; } - /// Returns number of running and scheduled jobs. size_t active() const; @@ -52,10 +64,11 @@ class ThreadPoolImpl std::condition_variable job_finished; std::condition_variable new_job_or_shutdown; - const size_t num_threads; + const size_t max_threads; + const size_t max_free_threads; const size_t queue_size; - size_t active_jobs = 0; + size_t scheduled_jobs = 0; bool shutdown = false; struct JobWithPriority @@ -73,34 +86,65 @@ class ThreadPoolImpl }; std::priority_queue jobs; - std::vector threads; + std::list threads; std::exception_ptr first_exception; - void worker(); + template + ReturnType scheduleImpl(Job job, int priority, std::optional wait_microseconds); + + void worker(typename std::list::iterator thread_it); void finalize(); }; +/// ThreadPool with std::thread for threads. using FreeThreadPool = ThreadPoolImpl; + +/** Global ThreadPool that can be used as a singleton. + * Why it is needed? + * + * Linux can create and destroy about 100 000 threads per second (quite good). + * With simple ThreadPool (based on mutex and condvar) you can assign about 200 000 tasks per second + * - not much difference comparing to not using a thread pool at all. + * + * But if you reuse OS threads instead of creating and destroying them, several benefits exist: + * - allocator performance will usually be better due to reuse of thread local caches, especially for jemalloc: + * https://github.com/jemalloc/jemalloc/issues/1347 + * - address sanitizer and thread sanitizer will not fail due to global limit on number of created threads. + * - program will work faster in gdb; + */ class GlobalThreadPool : public FreeThreadPool, public ext::singleton { public: - GlobalThreadPool() : FreeThreadPool(10000) {} /// TODO: global blocking limit may lead to deadlocks. + GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000) {} }; + +/** Looks like std::thread but allocates threads in GlobalThreadPool. + * Also holds ThreadStatus for ClickHouse. + */ class ThreadFromGlobalPool { public: ThreadFromGlobalPool() {} - ThreadFromGlobalPool(std::function func) + template + explicit ThreadFromGlobalPool(Function && func, Args &&... args) { mutex = std::make_unique(); + /// The function object must be copyable, so we wrap lock_guard in shared_ptr. - GlobalThreadPool::instance().schedule([lock = std::make_shared>(*mutex), func = std::move(func)] { func(); }); + GlobalThreadPool::instance().scheduleOrThrow([ + lock = std::make_shared>(*mutex), + func = std::forward(func), + args = std::make_tuple(std::forward(args)...)] + { + DB::ThreadStatus thread_status; + std::apply(func, args); + }); } ThreadFromGlobalPool(ThreadFromGlobalPool && rhs) @@ -129,10 +173,18 @@ class ThreadFromGlobalPool } mutex.reset(); } + + bool joinable() const + { + return static_cast(mutex); + } + private: std::unique_ptr mutex; /// Object must be moveable. }; + +/// Recommended thread pool for the case when multiple thread pools are created and destroyed. using ThreadPool = ThreadPoolImpl; diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp index 9626a54aa202..ac049bcb8e5c 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -853,8 +853,8 @@ ZooKeeper::ZooKeeper( if (!auth_scheme.empty()) sendAuth(auth_scheme, auth_data); - send_thread = std::thread([this] { sendThread(); }); - receive_thread = std::thread([this] { receiveThread(); }); + send_thread = ThreadFromGlobalPool([this] { sendThread(); }); + receive_thread = ThreadFromGlobalPool([this] { receiveThread(); }); ProfileEvents::increment(ProfileEvents::ZooKeeperInit); } diff --git a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h index c93f13b93516..e5da9ea48fee 100644 --- a/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/dbms/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -209,8 +210,8 @@ class ZooKeeper : public IKeeper Watches watches; std::mutex watches_mutex; - std::thread send_thread; - std::thread receive_thread; + ThreadFromGlobalPool send_thread; + ThreadFromGlobalPool receive_thread; void connect( const Addresses & addresses, diff --git a/dbms/src/Common/tests/multi_version.cpp b/dbms/src/Common/tests/multi_version.cpp index ee90a79801b5..0937e597e2da 100644 --- a/dbms/src/Common/tests/multi_version.cpp +++ b/dbms/src/Common/tests/multi_version.cpp @@ -23,7 +23,7 @@ void thread2(MV & x, const char * result) } -int main(int argc, char ** argv) +int main(int, char **) { try { diff --git a/dbms/src/Core/BackgroundSchedulePool.cpp b/dbms/src/Core/BackgroundSchedulePool.cpp index 5da499e5ae9d..0493e13b2b9e 100644 --- a/dbms/src/Core/BackgroundSchedulePool.cpp +++ b/dbms/src/Core/BackgroundSchedulePool.cpp @@ -161,9 +161,9 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size) threads.resize(size); for (auto & thread : threads) - thread = std::thread([this] { threadFunction(); }); + thread = ThreadFromGlobalPool([this] { threadFunction(); }); - delayed_thread = std::thread([this] { delayExecutionThreadFunction(); }); + delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); }); } @@ -181,7 +181,7 @@ BackgroundSchedulePool::~BackgroundSchedulePool() delayed_thread.join(); LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish."); - for (std::thread & thread : threads) + for (auto & thread : threads) thread.join(); } catch (...) diff --git a/dbms/src/Core/BackgroundSchedulePool.h b/dbms/src/Core/BackgroundSchedulePool.h index ba23d93733f1..7b75d9459ba4 100644 --- a/dbms/src/Core/BackgroundSchedulePool.h +++ b/dbms/src/Core/BackgroundSchedulePool.h @@ -13,6 +13,8 @@ #include #include #include +#include + namespace DB { @@ -119,7 +121,7 @@ class BackgroundSchedulePool ~BackgroundSchedulePool(); private: - using Threads = std::vector; + using Threads = std::vector; void threadFunction(); void delayExecutionThreadFunction(); @@ -141,7 +143,7 @@ class BackgroundSchedulePool std::condition_variable wakeup_cond; std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. - std::thread delayed_thread; + ThreadFromGlobalPool delayed_thread; /// Tasks ordered by scheduled time. DelayedTasks delayed_tasks; diff --git a/dbms/src/DataStreams/ParallelInputsProcessor.h b/dbms/src/DataStreams/ParallelInputsProcessor.h index ba086b98939f..a83c2ca1e566 100644 --- a/dbms/src/DataStreams/ParallelInputsProcessor.h +++ b/dbms/src/DataStreams/ParallelInputsProcessor.h @@ -13,6 +13,7 @@ #include #include #include +#include /** Allows to process multiple block input streams (sources) in parallel, using specified number of threads. @@ -306,8 +307,8 @@ class ParallelInputsProcessor Handler & handler; - /// Streams. - using ThreadsData = std::vector; + /// Threads. + using ThreadsData = std::vector; ThreadsData threads; /** A set of available sources that are not currently processed by any thread. diff --git a/dbms/src/Dictionaries/ExecutableDictionarySource.cpp b/dbms/src/Dictionaries/ExecutableDictionarySource.cpp index 376153bd0e9e..028e0452fff7 100644 --- a/dbms/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/dbms/src/Dictionaries/ExecutableDictionarySource.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "DictionarySourceFactory.h" #include "DictionarySourceHelpers.h" @@ -165,7 +166,7 @@ namespace BlockInputStreamPtr stream; std::unique_ptr command; std::packaged_task task; - std::thread thread; + ThreadFromGlobalPool thread; bool wait_called = false; }; diff --git a/dbms/src/IO/AIOContextPool.h b/dbms/src/IO/AIOContextPool.h index 64d01a0f45b1..ca92e14b6ed0 100644 --- a/dbms/src/IO/AIOContextPool.h +++ b/dbms/src/IO/AIOContextPool.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace DB @@ -32,7 +33,7 @@ class AIOContextPool : public ext::singleton std::map> promises; std::atomic cancelled{false}; - std::thread io_completion_monitor{&AIOContextPool::doMonitor, this}; + ThreadFromGlobalPool io_completion_monitor{&AIOContextPool::doMonitor, this}; ~AIOContextPool(); diff --git a/dbms/src/Interpreters/AsynchronousMetrics.h b/dbms/src/Interpreters/AsynchronousMetrics.h index ceafc2af5868..8ccefb9e9303 100644 --- a/dbms/src/Interpreters/AsynchronousMetrics.h +++ b/dbms/src/Interpreters/AsynchronousMetrics.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -43,7 +44,7 @@ class AsynchronousMetrics Container container; mutable std::mutex container_mutex; - std::thread thread; + ThreadFromGlobalPool thread; void run(); void update(); diff --git a/dbms/src/Interpreters/Compiler.cpp b/dbms/src/Interpreters/Compiler.cpp index 8a60b24a24b3..9b0a8371f09c 100644 --- a/dbms/src/Interpreters/Compiler.cpp +++ b/dbms/src/Interpreters/Compiler.cpp @@ -142,40 +142,37 @@ SharedLibraryPtr Compiler::getOrCount( { /// The min_count_to_compile value of zero indicates the need for synchronous compilation. - /// Are there any free threads? - if (min_count_to_compile == 0 || pool.active() < pool.size()) - { - /// Indicates that the library is in the process of compiling. - libraries[hashed_key] = nullptr; + /// Indicates that the library is in the process of compiling. + libraries[hashed_key] = nullptr; + + LOG_INFO(log, "Compiling code " << file_name << ", key: " << key); - LOG_INFO(log, "Compiling code " << file_name << ", key: " << key); + if (min_count_to_compile == 0) + { + { + ext::unlock_guard unlock(mutex); + compile(hashed_key, file_name, additional_compiler_flags, get_code, on_ready); + } - if (min_count_to_compile == 0) + return libraries[hashed_key]; + } + else + { + bool res = pool.trySchedule([=] { + try { - ext::unlock_guard unlock(mutex); compile(hashed_key, file_name, additional_compiler_flags, get_code, on_ready); } - - return libraries[hashed_key]; - } - else - { - pool.schedule([=] + catch (...) { - try - { - compile(hashed_key, file_name, additional_compiler_flags, get_code, on_ready); - } - catch (...) - { - tryLogCurrentException("Compiler"); - } - }); - } + tryLogCurrentException("Compiler"); + } + }); + + if (!res) + LOG_INFO(log, "All threads are busy."); } - else - LOG_INFO(log, "All threads are busy."); } return nullptr; diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index ca90073436a2..749c2ae40d5b 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -521,7 +522,7 @@ class SessionCleaner std::mutex mutex; std::condition_variable cond; std::atomic quit{false}; - std::thread thread{&SessionCleaner::run, this}; + ThreadFromGlobalPool thread{&SessionCleaner::run, this}; }; } diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 54fcffbea2a9..730e37d9bd30 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -241,7 +241,7 @@ DDLWorker::DDLWorker(const std::string & zk_root_dir, Context & context_, const event_queue_updated = std::make_shared(); - thread = std::thread(&DDLWorker::run, this); + thread = ThreadFromGlobalPool(&DDLWorker::run, this); } diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index d3872b8ac95b..18714720d2da 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -90,7 +91,7 @@ class DDLWorker std::shared_ptr event_queue_updated; std::atomic stop_flag{false}; - std::thread thread; + ThreadFromGlobalPool thread; Int64 last_cleanup_time_seconds = 0; diff --git a/dbms/src/Interpreters/EmbeddedDictionaries.cpp b/dbms/src/Interpreters/EmbeddedDictionaries.cpp index 10f5692f6e61..60524d63cee8 100644 --- a/dbms/src/Interpreters/EmbeddedDictionaries.cpp +++ b/dbms/src/Interpreters/EmbeddedDictionaries.cpp @@ -150,7 +150,7 @@ EmbeddedDictionaries::EmbeddedDictionaries( , reload_period(context_.getConfigRef().getInt("builtin_dictionaries_reload_interval", 3600)) { reloadImpl(throw_on_error); - reloading_thread = std::thread([this] { reloadPeriodically(); }); + reloading_thread = ThreadFromGlobalPool([this] { reloadPeriodically(); }); } diff --git a/dbms/src/Interpreters/EmbeddedDictionaries.h b/dbms/src/Interpreters/EmbeddedDictionaries.h index ad2dd404b3e3..caa7c1cc62da 100644 --- a/dbms/src/Interpreters/EmbeddedDictionaries.h +++ b/dbms/src/Interpreters/EmbeddedDictionaries.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -41,7 +42,7 @@ class EmbeddedDictionaries mutable std::mutex mutex; - std::thread reloading_thread; + ThreadFromGlobalPool reloading_thread; Poco::Event destroy; diff --git a/dbms/src/Interpreters/ExternalLoader.cpp b/dbms/src/Interpreters/ExternalLoader.cpp index e4ccd9962c6b..814fc5ecec29 100644 --- a/dbms/src/Interpreters/ExternalLoader.cpp +++ b/dbms/src/Interpreters/ExternalLoader.cpp @@ -72,7 +72,7 @@ void ExternalLoader::init(bool throw_on_error) reloadAndUpdate(throw_on_error); } - reloading_thread = std::thread{&ExternalLoader::reloadPeriodically, this}; + reloading_thread = ThreadFromGlobalPool{&ExternalLoader::reloadPeriodically, this}; } diff --git a/dbms/src/Interpreters/ExternalLoader.h b/dbms/src/Interpreters/ExternalLoader.h index ac672f925e3c..c2ce161f0e11 100644 --- a/dbms/src/Interpreters/ExternalLoader.h +++ b/dbms/src/Interpreters/ExternalLoader.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -160,7 +161,7 @@ class ExternalLoader std::unique_ptr config_repository; - std::thread reloading_thread; + ThreadFromGlobalPool reloading_thread; Poco::Event destroy; Logger * log; diff --git a/dbms/src/Interpreters/SystemLog.h b/dbms/src/Interpreters/SystemLog.h index 1a4283fae8e1..4cb2bb76b4fa 100644 --- a/dbms/src/Interpreters/SystemLog.h +++ b/dbms/src/Interpreters/SystemLog.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -135,7 +136,7 @@ class SystemLog : private boost::noncopyable /** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table. */ - std::thread saving_thread; + ThreadFromGlobalPool saving_thread; void threadFunction(); @@ -161,7 +162,7 @@ SystemLog::SystemLog(Context & context_, log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")"); data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE); - saving_thread = std::thread([this] { threadFunction(); }); + saving_thread = ThreadFromGlobalPool([this] { threadFunction(); }); } diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.h b/dbms/src/Storages/Distributed/DirectoryMonitor.h index 484b0ac3f233..d7858d3af40d 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.h +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -55,7 +56,7 @@ class StorageDistributedDirectoryMonitor std::mutex mutex; std::condition_variable cond; Logger * log; - std::thread thread {&StorageDistributedDirectoryMonitor::run, this}; + ThreadFromGlobalPool thread{&StorageDistributedDirectoryMonitor::run, this}; }; } diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index c0911ac4d5e4..b60d860ec6ce 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -67,7 +67,7 @@ BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) threads.resize(size); for (auto & thread : threads) - thread = std::thread([this] { threadFunction(); }); + thread = ThreadFromGlobalPool([this] { threadFunction(); }); } @@ -110,7 +110,7 @@ BackgroundProcessingPool::~BackgroundProcessingPool() { shutdown = true; wake_event.notify_all(); - for (std::thread & thread : threads) + for (auto & thread : threads) thread.join(); } catch (...) diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h index 4eb5d4cce56c..fdf5251cb8a1 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h @@ -13,6 +13,8 @@ #include #include #include +#include + namespace DB @@ -60,7 +62,7 @@ class BackgroundProcessingPool friend class BackgroundProcessingPoolTaskInfo; using Tasks = std::multimap; /// key is desired next time to execute (priority). - using Threads = std::vector; + using Threads = std::vector; const size_t size; diff --git a/dbms/src/Storages/StorageBuffer.cpp b/dbms/src/Storages/StorageBuffer.cpp index 5d76279c95fc..511364bc11f7 100644 --- a/dbms/src/Storages/StorageBuffer.cpp +++ b/dbms/src/Storages/StorageBuffer.cpp @@ -420,7 +420,7 @@ void StorageBuffer::startup() << " Set apropriate system_profile to fix this."); } - flush_thread = std::thread(&StorageBuffer::flushThread, this); + flush_thread = ThreadFromGlobalPool(&StorageBuffer::flushThread, this); } diff --git a/dbms/src/Storages/StorageBuffer.h b/dbms/src/Storages/StorageBuffer.h index 9992d1b49bd8..85ea3f086b59 100644 --- a/dbms/src/Storages/StorageBuffer.h +++ b/dbms/src/Storages/StorageBuffer.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -110,7 +111,7 @@ friend class BufferBlockOutputStream; Poco::Event shutdown_event; /// Resets data by timeout. - std::thread flush_thread; + ThreadFromGlobalPool flush_thread; void flushAllBuffers(bool check_thresholds = true); /// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded. diff --git a/libs/libdaemon/src/BaseDaemon.cpp b/libs/libdaemon/src/BaseDaemon.cpp index bad38c78529b..e62ff31172f5 100644 --- a/libs/libdaemon/src/BaseDaemon.cpp +++ b/libs/libdaemon/src/BaseDaemon.cpp @@ -998,8 +998,6 @@ void BaseDaemon::initialize(Application & self) } initializeTerminationAndSignalProcessing(); - - DB::CurrentThread::get(); /// TODO Why do we need this? logRevision(); for (const auto & key : DB::getMultipleKeysFromConfig(config(), "", "graphite"))