From d8cceb378ebdd88962ac0dac50678a85bc947697 Mon Sep 17 00:00:00 2001 From: Sergei Trifonov Date: Sat, 5 Oct 2024 20:29:58 +0000 Subject: [PATCH] Merge pull request #68694 from filimonov/thread_pool_thread_creation_out_of_lock Thread pool: move thread creation out of lock --- src/Common/AsyncLoader.cpp | 12 +- src/Common/ThreadPool.cpp | 388 ++++++++++++++++++++++++++++++------- src/Common/ThreadPool.h | 90 +++++++-- 3 files changed, 397 insertions(+), 93 deletions(-) diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index d40e320e741c..0e9e0dbb7ac4 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -51,12 +51,12 @@ AsyncLoader::Pool::Pool(const AsyncLoader::PoolInitializer & init) , priority(init.priority) , max_threads(init.max_threads > 0 ? init.max_threads : getNumberOfPhysicalCPUCores()) , thread_pool(std::make_unique( - init.metric_threads, - init.metric_active_threads, - init.metric_scheduled_threads, - /* max_threads = */ std::numeric_limits::max(), // Unlimited number of threads, we do worker management ourselves - /* max_free_threads = */ 0, // We do not require free threads - /* queue_size = */0)) // Unlimited queue to avoid blocking during worker spawning + init.metric_threads, + init.metric_active_threads, + init.metric_scheduled_threads, + /* max_threads = */ ThreadPool::MAX_THEORETICAL_THREAD_COUNT, // Unlimited number of threads, we do worker management ourselves + /* max_free_threads = */ 0, // We do not require free threads + /* queue_size = */ 0)) // Unlimited queue to avoid blocking during worker spawning {} AsyncLoader::Pool::Pool(Pool&& o) noexcept diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 8685533e2d10..5dfefcbfccf1 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -47,6 +47,47 @@ namespace ProfileEvents } +namespace +{ + struct ScopedDecrement + { + std::optional>> atomic_var; + + // Deleted copy constructor and copy assignment operator + ScopedDecrement(const ScopedDecrement&) = delete; + ScopedDecrement& operator=(const ScopedDecrement&) = delete; + + // Move constructor + ScopedDecrement(ScopedDecrement&& other) noexcept + : atomic_var(std::move(other.atomic_var)) + { + other.atomic_var.reset(); + } + + // Move assignment operator + ScopedDecrement& operator=(ScopedDecrement&& other) noexcept + { + if (this != &other) + { + atomic_var.swap(other.atomic_var); + } + return *this; + } + + explicit ScopedDecrement(std::atomic& var) + : atomic_var(var) + { + atomic_var->get().fetch_sub(1, std::memory_order_relaxed); + } + + ~ScopedDecrement() + { + if (atomic_var) + atomic_var->get().fetch_add(1, std::memory_order_relaxed); + } + }; +} + class JobWithPriority { public: @@ -55,6 +96,8 @@ class JobWithPriority Job job; Priority priority; CurrentMetrics::Increment metric_increment; + ScopedDecrement available_threads_decrement; + DB::OpenTelemetry::TracingContextOnThread thread_trace_context; /// Call stacks of all jobs' schedulings leading to this one @@ -62,11 +105,20 @@ class JobWithPriority bool enable_job_stack_trace = false; Stopwatch job_create_time; + // Deleted copy constructor and copy assignment operator + JobWithPriority(const JobWithPriority&) = delete; + JobWithPriority& operator=(const JobWithPriority&) = delete; + + // Move constructor and move assignment operator + JobWithPriority(JobWithPriority&&) noexcept = default; + JobWithPriority& operator=(JobWithPriority&&) noexcept = default; + JobWithPriority( Job job_, Priority priority_, CurrentMetrics::Metric metric, const DB::OpenTelemetry::TracingContextOnThread & thread_trace_context_, - bool capture_frame_pointers) + bool capture_frame_pointers, ScopedDecrement available_threads_decrement_) : job(job_), priority(priority_), metric_increment(metric), + available_threads_decrement(std::move(available_threads_decrement_)), thread_trace_context(thread_trace_context_), enable_job_stack_trace(capture_frame_pointers) { if (!capture_frame_pointers) @@ -85,8 +137,6 @@ class JobWithPriority { return job_create_time.elapsedMicroseconds(); } - - }; static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; @@ -125,12 +175,19 @@ ThreadPoolImpl::ThreadPoolImpl( , queue_size(queue_size_ ? std::max(queue_size_, max_threads) : 0 /* zero means the queue is unlimited */) , shutdown_on_exception(shutdown_on_exception_) { + max_threads = std::min(max_threads, static_cast(MAX_THEORETICAL_THREAD_COUNT)); + max_free_threads = std::min(max_free_threads, static_cast(MAX_THEORETICAL_THREAD_COUNT)); + remaining_pool_capacity.store(max_threads, std::memory_order_relaxed); + available_threads.store(0, std::memory_order_relaxed); } template void ThreadPoolImpl::setMaxThreads(size_t value) { + value = std::min(value, static_cast(MAX_THEORETICAL_THREAD_COUNT)); std::lock_guard lock(mutex); + remaining_pool_capacity.fetch_add(value - max_threads, std::memory_order_relaxed); + bool need_start_threads = (value > max_threads); bool need_finish_free_threads = (value < max_free_threads); @@ -163,6 +220,7 @@ size_t ThreadPoolImpl::getMaxThreads() const template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { + value = std::min(value, static_cast(MAX_THEORETICAL_THREAD_COUNT)); std::lock_guard lock(mutex); bool need_finish_free_threads = (value < max_free_threads); @@ -184,7 +242,6 @@ void ThreadPoolImpl::setQueueSize(size_t value) jobs.reserve(queue_size); } - template template ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std::optional wait_microseconds, bool propagate_opentelemetry_tracing_context) @@ -207,6 +264,38 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: return false; }; + // Decrement available_threads, scoped to the job lifecycle. + // This ensures that available_threads decreases when a new job starts + // and automatically increments when the job completes or goes out of scope. + ScopedDecrement available_threads_decrement(available_threads); + + std::unique_ptr new_thread; + + // Load the current capacity + int64_t capacity = remaining_pool_capacity.load(std::memory_order_relaxed); + int64_t currently_available_threads = available_threads.load(std::memory_order_relaxed); + + while (currently_available_threads <= 0 && capacity > 0) + { + if (remaining_pool_capacity.compare_exchange_weak(capacity, capacity - 1, std::memory_order_relaxed)) + { + try + { + new_thread = std::make_unique(*this); + break; // Exit the loop once a thread is successfully created. + } + catch (...) + { + // Failed to create the thread, restore capacity + remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed); + std::lock_guard lock(mutex); // needed to change first_exception. + return on_error("failed to start the thread"); + } + } + // capacity gets reloaded by (unsuccessful) compare_exchange_weak + currently_available_threads = available_threads.load(std::memory_order_relaxed); + } + { Stopwatch watch; std::unique_lock lock(mutex); @@ -219,6 +308,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; + /// Wait for available threads or timeout if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) @@ -230,48 +320,90 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: if (shutdown) return on_error("shutdown"); - /// We must not to allocate any memory after we emplaced a job in a queue. - /// Because if an exception would be thrown, we won't notify a thread about job occurrence. - - /// Check if there are enough threads to process job. - if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) + /// We must not allocate memory or perform operations that could throw exceptions after adding a job to the queue, + /// because if an exception occurs, it may leave the job in the queue without notifying any threads. + typename ThreadFromThreadPool::ThreadList::iterator thread_slot; + + /// The decision to start a new thread is made outside the locked section. + /// However, thread load and demand can change dynamically, and decisions based on + /// atomic variables outside the critical section might become outdated by the time we acquire the lock. + /// This can lead to two possible scenarios: + /// + /// 1) Relatively common: A new thread was started outside the lock, but by the time we acquire the lock, + /// demand for threads has decreased (e.g., other threads have finished their jobs and are now idle). + /// In this case, even though there are now enough threads, we still attempt to add the new thread + /// to the pool, provided it does not exceed the `max_threads` or `max_free_threads` limits. Keeping + /// an extra thread in the pool may help accommodate a sudden increase in demand without the need + /// to wait for thread creation. + /// + /// 2) Very unlikely (but possible): Outside the lock, it appeared there were enough threads + /// to handle the workload. However, after acquiring the lock, it turns out the new thread + /// is needed (possibly because one of the existing threads was removed or became unavailable). + /// In this case, we create the thread inside the critical section, even though this may introduce + /// a small delay. + + /// Check if we can add the thread created outside the critical section to the pool. + bool adding_new_thread = new_thread && threads.size() < std::min(max_threads, 1 /* current job */ + scheduled_jobs + max_free_threads); + + // If we didn't create a new thread initially but realize we actually need one (unlikely scenario). + if (unlikely(!adding_new_thread && threads.size() < std::min(max_threads, scheduled_jobs + 1))) { try { - threads.emplace_front(); + remaining_pool_capacity.fetch_sub(1, std::memory_order_relaxed); + new_thread = std::make_unique(*this); } catch (...) { - /// Most likely this is a std::bad_alloc exception - return on_error("cannot allocate thread slot"); + // If thread creation fails, restore the pool capacity and return an error. + remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed); + return on_error("failed to start the thread"); } + adding_new_thread = true; + } + if (adding_new_thread) + { try { - Stopwatch watch2; - threads.front() = Thread([this, it = threads.begin()] { worker(it); }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, - watch2.elapsedMicroseconds()); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + threads.emplace_front(std::move(new_thread)); + thread_slot = threads.begin(); } catch (...) { - threads.pop_front(); - return on_error("cannot allocate thread"); + /// Most likely this is a std::bad_alloc exception + return on_error("cannot emplace the thread in the pool"); } } + else // we have a thread but there is no space for that in the pool. + { + new_thread.reset(); + } - jobs.emplace(std::move(job), - priority, - metric_scheduled_jobs, - /// Tracing context on this thread is used as parent context for the sub-thread that runs the job - propagate_opentelemetry_tracing_context ? DB::OpenTelemetry::CurrentContext() : DB::OpenTelemetry::TracingContextOnThread(), - /// capture_frame_pointers - DB::Exception::enable_job_stack_trace); + try + { + jobs.emplace(std::move(job), + priority, + metric_scheduled_jobs, + /// Tracing context on this thread is used as parent context for the sub-thread that runs the job + propagate_opentelemetry_tracing_context ? DB::OpenTelemetry::CurrentContext() : DB::OpenTelemetry::TracingContextOnThread(), + /// capture_frame_pointers + DB::Exception::enable_job_stack_trace, + std::move(available_threads_decrement)); + + ++scheduled_jobs; + + if (adding_new_thread) + (*thread_slot)->start(thread_slot); + + } + catch (...) + { + if (adding_new_thread) + threads.pop_front(); - ++scheduled_jobs; + return on_error("cannot start the job or thread"); + } } /// Wake up a free thread to run the new job. @@ -291,30 +423,51 @@ void ThreadPoolImpl::startNewThreadsNoLock() /// Start new threads while there are more scheduled jobs in the queue and the limit `max_threads` is not reached. while (threads.size() < std::min(scheduled_jobs, max_threads)) { + std::unique_ptr new_thread; + + int64_t capacity = remaining_pool_capacity.load(std::memory_order_relaxed); + + while (capacity > 0) + { + if (remaining_pool_capacity.compare_exchange_weak(capacity, capacity - 1, std::memory_order_relaxed)) + { + try + { + // Successfully decremented, attempt to create a new thread + new_thread = std::make_unique(*this); + } + catch (...) + { + // Failed to create the thread, restore capacity + remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed); + } + break; // Exit loop whether thread creation succeeded or not + } + } + + if (!new_thread) + break; /// failed to start more threads + + typename ThreadFromThreadPool::ThreadList::iterator thread_slot; + try { - threads.emplace_front(); + threads.emplace_front(std::move(new_thread)); + thread_slot = threads.begin(); } catch (...) { - break; /// failed to start more threads + break; } try { - Stopwatch watch; - threads.front() = Thread([this, it = threads.begin()] { worker(it); }); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, - watch.elapsedMicroseconds()); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); - + (*thread_slot)->start(thread_slot); } catch (...) { threads.pop_front(); - break; /// failed to start more threads + break; } } } @@ -376,21 +529,29 @@ void ThreadPoolImpl::finalize() { std::lock_guard lock(mutex); shutdown = true; - /// We don't want threads to remove themselves from `threads` anymore, otherwise `thread.join()` will go wrong below in this function. + + /// scheduleImpl doesn't check for shutdown outside the critical section, + /// so we set remaining_pool_capacity to a large negative value + /// (e.g., -MAX_THEORETICAL_THREAD_COUNT) to signal that no new threads are needed. + /// This effectively prevents any new threads from being started during shutdown. + remaining_pool_capacity.store(-MAX_THEORETICAL_THREAD_COUNT, std::memory_order_relaxed); + + /// Disable thread self-removal from `threads`. Otherwise, if threads remove themselves, + /// the thread.join() operation will fail later in this function. threads_remove_themselves = false; } - /// Wake up threads so they can finish themselves. + /// Notify all threads to wake them up, so they can complete their work and exit gracefully. new_job_or_shutdown.notify_all(); - /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). - for (auto & thread : threads) + /// Join all threads before clearing the list + for (auto& thread_ptr : threads) { - thread.join(); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + if (thread_ptr) + thread_ptr->join(); } + // now it's safe to clear the threads threads.clear(); } @@ -426,11 +587,88 @@ bool ThreadPoolImpl::finished() const return shutdown; } + +template +ThreadPoolImpl::ThreadFromThreadPool::ThreadFromThreadPool(ThreadPoolImpl& parent_pool_) + : parent_pool(parent_pool_) + , thread_state(ThreadState::Preparing) // Initial state is Preparing +{ + Stopwatch watch2; + + thread = Thread(&ThreadFromThreadPool::worker, this); + + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch2.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + + parent_pool.available_threads.fetch_add(1, std::memory_order_relaxed); +} + + +template +void ThreadPoolImpl::ThreadFromThreadPool::start(typename ThreadList::iterator & it) +{ + /// the thread which created ThreadFromThreadPool should start it after adding it to the pool, or destroy it. + /// no parallelism is expected here. So the only valid transition for the start method is Preparing to Running. + chassert(thread_state.load(std::memory_order_relaxed) == ThreadState::Preparing); + thread_it = it; + thread_state.store(ThreadState::Running, std::memory_order_relaxed); /// now worker can start executing the main loop +} + template -void ThreadPoolImpl::worker(typename std::list::iterator thread_it) +void ThreadPoolImpl::ThreadFromThreadPool::join() +{ + // Ensure the thread is joined before destruction if still joinable + if (thread.joinable()) + thread.join(); +} + +template +void ThreadPoolImpl::ThreadFromThreadPool::removeSelfFromPoolNoPoolLock() +{ + if (thread.joinable()) + thread.detach(); + + parent_pool.threads.erase(thread_it); +} + +template +ThreadPoolImpl::ThreadFromThreadPool::~ThreadFromThreadPool() +{ + parent_pool.available_threads.fetch_sub(1, std::memory_order_relaxed); + + // The thread is being destructed, so the remaining pool capacity increases + parent_pool.remaining_pool_capacity.fetch_add(1, std::memory_order_relaxed); + + // If the worker was still waiting in the loop for thread initialization, + // signal it to terminate and be destroyed now. + thread_state.store(ThreadState::Destructing, std::memory_order_relaxed); + + join(); + + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); +} + + +template +void ThreadPoolImpl::ThreadFromThreadPool::worker() { DENY_ALLOCATIONS_IN_SCOPE; - CurrentMetrics::Increment metric_pool_threads(metric_threads); + + // wait until the thread will be started + while (thread_state.load(std::memory_order_relaxed) == ThreadState::Preparing) + { + std::this_thread::yield(); // let's try to yield to avoid consuming too much CPU in the busy-loop + } + + // If the thread transitions to Destructing, exit + if (thread_state.load(std::memory_order_relaxed) == ThreadState::Destructing) + return; + + CurrentMetrics::Increment metric_pool_threads(parent_pool.metric_threads); bool job_is_done = false; std::exception_ptr exception_from_job; @@ -447,7 +685,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ { Stopwatch watch; - std::unique_lock lock(mutex); + std::unique_lock lock(parent_pool.mutex); ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, watch.elapsedMicroseconds()); @@ -458,48 +696,55 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ job_is_done = false; if (exception_from_job) { - if (!first_exception) - first_exception = exception_from_job; - if (shutdown_on_exception) - shutdown = true; + if (!parent_pool.first_exception) + parent_pool.first_exception = exception_from_job; + if (parent_pool.shutdown_on_exception) + { + parent_pool.shutdown = true; + + // Prevent new thread creation, as explained in finalize. + parent_pool.remaining_pool_capacity.store(-MAX_THEORETICAL_THREAD_COUNT, std::memory_order_relaxed); + } exception_from_job = {}; } - --scheduled_jobs; + --parent_pool.scheduled_jobs; - job_finished.notify_all(); - if (shutdown) - new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. + parent_pool.job_finished.notify_all(); + if (parent_pool.shutdown) + parent_pool.new_job_or_shutdown.notify_all(); /// `shutdown` was set, wake up other threads so they can finish themselves. } - new_job_or_shutdown.wait(lock, [&] { return !jobs.empty() || shutdown || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads); }); + parent_pool.new_job_or_shutdown.wait(lock, [this] { + return !parent_pool.jobs.empty() + || parent_pool.shutdown + || parent_pool.threads.size() > std::min(parent_pool.max_threads, parent_pool.scheduled_jobs + parent_pool.max_free_threads); + }); - if (jobs.empty() || threads.size() > std::min(max_threads, scheduled_jobs + max_free_threads)) + + if (parent_pool.jobs.empty() || parent_pool.threads.size() > std::min(parent_pool.max_threads, parent_pool.scheduled_jobs + parent_pool.max_free_threads)) { // We enter here if: // - either this thread is not needed anymore due to max_free_threads excess; // - or shutdown happened AND all jobs are already handled. - if (threads_remove_themselves) - { - thread_it->detach(); - threads.erase(thread_it); - ProfileEvents::increment( - std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); - } + + if (parent_pool.threads_remove_themselves) + removeSelfFromPoolNoPoolLock(); // Detach and remove itself from the pool + return; } /// boost::priority_queue does not provide interface for getting non-const reference to an element /// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority. - job_data = std::move(const_cast(jobs.top())); - jobs.pop(); + job_data = std::move(const_cast(parent_pool.jobs.top())); + parent_pool.jobs.pop(); ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds, job_data->elapsedMicroseconds()); /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. - if (shutdown) + if (parent_pool.shutdown) { { ALLOW_ALLOCATIONS_IN_SCOPE; @@ -522,7 +767,7 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ if (DB::Exception::enable_job_stack_trace) DB::Exception::setThreadFramePointers(std::move(job_data->frame_pointers)); - CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads); + CurrentMetrics::Increment metric_active_pool_threads(parent_pool.metric_active_threads); if constexpr (!std::is_same_v) { @@ -575,7 +820,6 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ } } - template class ThreadPoolImpl; template class ThreadPoolImpl>; template class ThreadPoolImpl>; diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index fd9149bda045..7e497245acc3 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -32,7 +32,7 @@ class JobWithPriority; * * This thread pool can be used as a task queue. * For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks - * - in this case you will be blocked to keep 10 tasks in fly. + * - in this case you will be blocked to keep 10 tasks in flight. * * Thread: std::thread or something with identical interface. */ @@ -40,9 +40,57 @@ template class ThreadPoolImpl { public: + // used as 'unlimited' thread pool size + // on linux you can not have more threads even if the RAM is unlimited + // see https://docs.kernel.org/admin-guide/sysctl/kernel.html#threads-max + static constexpr int MAX_THEORETICAL_THREAD_COUNT = 0x3fffffff; // ~1 billion + using Job = std::function; using Metric = CurrentMetrics::Metric; + // Subclass that encapsulates the thread and has the ability to remove itself from the pool. + class ThreadFromThreadPool + { + public: + using ThreadList = std::list>; + + /// Constructor to initialize and start the thread (but not associate it with the pool) + explicit ThreadFromThreadPool(ThreadPoolImpl& parent_pool); + + // Shift the thread state from Preparing to Running to allow the worker to start. + void start(ThreadList::iterator& it); + + void join(); + + // Destructor to join the thread if needed (shift the state to Destructing if it was not running) + ~ThreadFromThreadPool(); + + private: + ThreadPoolImpl& parent_pool; + Thread thread; + + enum class ThreadState + { + Preparing, + Running, + Destructing + }; + + // Atomic state to track the thread's state + std::atomic thread_state; + + // Stores the position of the thread in the parent thread pool list + typename std::list>::iterator thread_it; + + // Remove itself from the parent pool + void removeSelfFromPoolNoPoolLock(); + + // Worker does a busy loop (with yield) while the state is Preparing. + // After that, immediately returns if the state changed to Destructing, + // or starts the main working loop if the state is Running. + void worker(); + }; + /// Maximum number of threads is based on the number of physical cores. ThreadPoolImpl(Metric metric_threads_, Metric metric_active_threads_, Metric metric_scheduled_jobs_); @@ -63,14 +111,14 @@ class ThreadPoolImpl size_t queue_size_, bool shutdown_on_exception_ = true); - /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown. - /// If any thread was throw an exception, first exception will be rethrown from this method, - /// and exception will be cleared. + /// Add new job. Locks until the number of scheduled jobs is less than the maximum or an exception in one of the threads was thrown. + /// If any thread has thrown an exception, the first exception will be rethrown from this method, + /// and the exception will be cleared. /// Also throws an exception if cannot create thread. /// Priority: lower is higher. - /// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects, - /// located on stack of current thread, the stack must not be unwinded until all jobs finished. However, - /// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor. + /// NOTE: Probably you should call wait() if an exception was thrown. If some previously scheduled jobs are using some objects, + /// located on the stack of the current thread, the stack must not be unwound until all jobs are finished. However, + /// if ThreadPool is a local object, it will wait for all scheduled jobs in its own destructor. void scheduleOrThrowOnError(Job job, Priority priority = {}); /// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false. @@ -81,12 +129,12 @@ class ThreadPoolImpl /// Wait for all currently active jobs to be done. /// You may call schedule and wait many times in arbitrary order. - /// If any thread was throw an exception, first exception will be rethrown from this method, - /// and exception will be cleared. + /// If any thread has thrown an exception, the first exception will be rethrown from this method, + /// and the exception will be cleared. void wait(); /// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions). - /// You should not destroy object while calling schedule or wait methods from another threads. + /// You should not destroy the object while calling schedule or wait methods from other threads. ~ThreadPoolImpl(); /// Returns number of running and scheduled jobs. @@ -127,28 +175,40 @@ class ThreadPoolImpl size_t queue_size; size_t scheduled_jobs = 0; + + // Originally equals to max_threads, but changes dynamically. + // Decrements with every new thread started, increments when it finishes. + // If positive, then more threads can be started. + // When it comes to zero, it means that max_threads threads have already been started. + // it can be below zero when the threadpool is shutting down + std::atomic remaining_pool_capacity; + + // Increments every time a new thread joins the thread pool or a job finishes. + // Decrements every time a task is scheduled. + // If positive, it means that there are more threads than jobs (and some are idle). + // If zero, it means that every thread has a job. + // If negative, it means that we have more jobs than threads. + std::atomic available_threads; + bool shutdown = false; bool threads_remove_themselves = true; const bool shutdown_on_exception = true; boost::heap::priority_queue> jobs; - std::list threads; + ThreadFromThreadPool::ThreadList threads; std::exception_ptr first_exception; std::stack on_destroy_callbacks; template ReturnType scheduleImpl(Job job, Priority priority, std::optional wait_microseconds, bool propagate_opentelemetry_tracing_context = true); - void worker(typename std::list::iterator thread_it); - - /// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with `mutex` locked. + /// Tries to start new threads if there are scheduled jobs and the limit `max_threads` is not reached. Must be called with the mutex locked. void startNewThreadsNoLock(); void finalize(); void onDestroy(); }; - /// ThreadPool with std::thread for threads. using FreeThreadPool = ThreadPoolImpl;