diff --git a/include/thread_pool/rouser.hpp b/include/thread_pool/rouser.hpp index 326894ce..351fa114 100644 --- a/include/thread_pool/rouser.hpp +++ b/include/thread_pool/rouser.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -52,17 +53,13 @@ class Rouser final /** * @brief Move ctor implementation. - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ - Rouser(Rouser&& rhs) noexcept; + Rouser(Rouser&& rhs) = delete; /** * @brief Move assignment implementaion. - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ - Rouser& operator=(Rouser&& rhs) noexcept; + Rouser& operator=(Rouser&& rhs) = delete; /** * @brief Destructor implementation. @@ -71,13 +68,11 @@ class Rouser final /** * @brief start Create the executing thread and start tasks execution. - * @param workers A reference to the vector containing sibling workers for performing round robin work stealing. - * @param idle_workers A reference to the slotted bag containing all idle workers. - * @param num_busy_waiters A reference to the atomic busy waiter counter. + * @param state A pointer to thread pool's shared state. * @note The parameters passed into this function generally relate to the global thread pool state. */ template class Queue> - void start(std::vector>>& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters); + void start(std::shared_ptr> state); /** * @brief stop Stop all worker's thread and stealing activity. @@ -91,12 +86,10 @@ class Rouser final /** * @brief threadFunc Executing thread function. - * @param workers A reference to the vector containing sibling workers for performing round robin work stealing. - * @param idle_workers A reference to the slotted bag containing all idle workers. - * @param num_busy_waiters A reference to the atomic busy waiter counter. + * @param state A pointer to thread pool's shared state. */ template class Queue> - void threadFunc(std::vector>>& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters); + void threadFunc(std::shared_ptr> shared_state); std::atomic m_state; std::thread m_thread; @@ -109,36 +102,19 @@ inline Rouser::Rouser(std::chrono::microseconds rouse_period) { } -inline Rouser::Rouser(Rouser&& rhs) noexcept -{ - *this = std::move(rhs); -} - -inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept -{ - if (this != &rhs) - { - m_state = rhs.m_state.load(); - m_thread = std::move(rhs.m_thread); - m_rouse_period = std::move(rhs.m_rouse_period); - } - - return *this; -} - inline Rouser::~Rouser() { stop(); } template class Queue> -inline void Rouser::start(std::vector>>& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters) +inline void Rouser::start(std::shared_ptr> state) { auto expectedState = State::Initialized; if (!m_state.compare_exchange_strong(expectedState, State::Running, std::memory_order_acq_rel)) throw std::runtime_error("Cannot start Rouser: it has previously been started or stopped."); - m_thread = std::thread(&Rouser::threadFunc, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters)); + m_thread = std::thread(&Rouser::threadFunc, this, state); } inline void Rouser::stop() @@ -151,16 +127,16 @@ inline void Rouser::stop() } template class Queue> -inline void Rouser::threadFunc(std::vector>>& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters) +inline void Rouser::threadFunc(std::shared_ptr> shared_state) { while (m_state.load(std::memory_order_acquire) == State::Running) { // Try to wake up a thread if there are no current busy waiters. - if (num_busy_waiters.load(std::memory_order_acquire) == 0) + if (shared_state->numBusyWaiters().load(std::memory_order_acquire) == 0) { - auto result = idle_workers.tryEmptyAny(); + auto result = shared_state->idleWorkers().tryEmptyAny(); if (result.first) - workers[result.second]->wake(); + shared_state->workers()[result.second]->wake(); } // Sleep. diff --git a/include/thread_pool/thread_pool.hpp b/include/thread_pool/thread_pool.hpp index 0f1c335a..f3a39899 100644 --- a/include/thread_pool/thread_pool.hpp +++ b/include/thread_pool/thread_pool.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -53,15 +54,11 @@ class GenericThreadPool final /** * @brief Move ctor implementation. - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ GenericThreadPool(GenericThreadPool&& rhs) noexcept; /** * @brief Move assignment implementaion.v - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ GenericThreadPool& operator=(GenericThreadPool&& rhs) noexcept; @@ -109,12 +106,10 @@ class GenericThreadPool final */ size_t getWorkerId(); - SlottedBag m_idle_workers; - WorkerVector m_workers; - Rouser m_rouser; size_t m_failed_wakeup_retry_cap; std::atomic m_next_worker; - std::atomic m_num_busy_waiters; + std::shared_ptr m_rouser; + std::shared_ptr> m_state; }; @@ -122,22 +117,20 @@ class GenericThreadPool final template class Queue> inline GenericThreadPool::GenericThreadPool(ThreadPoolOptions options) - : m_idle_workers(options.threadCount()) - , m_workers(options.threadCount()) - , m_rouser(options.rousePeriod()) - , m_failed_wakeup_retry_cap(options.failedWakeupRetryCap()) + : m_failed_wakeup_retry_cap(options.failedWakeupRetryCap()) , m_next_worker(0) - , m_num_busy_waiters(0) + , m_rouser(std::make_shared(options.rousePeriod())) + , m_state(ThreadPoolState::create(options)) { // Instatiate all workers. - for (auto it = m_workers.begin(); it != m_workers.end(); ++it) + for (auto it = m_state->workers().begin(); it != m_state->workers().end(); ++it) it->reset(new Worker(options.busyWaitOptions(), options.queueSize())); // Initialize all worker threads. - for (size_t i = 0; i < m_workers.size(); ++i) - m_workers[i]->start(i, m_workers, m_idle_workers, m_num_busy_waiters); + for (size_t i = 0; i < m_state->workers().size(); ++i) + m_state->workers()[i]->start(i, m_state); - m_rouser.start(m_workers, m_idle_workers, m_num_busy_waiters); + m_rouser->start(m_state); } template class Queue> @@ -151,12 +144,10 @@ inline GenericThreadPool& GenericThreadPool::operator= { if (this != &rhs) { - m_idle_workers = std::move(rhs.m_idle_workers); - m_workers = std::move(rhs.m_workers); - m_rouser = std::move(rhs.m_rouser); m_failed_wakeup_retry_cap = rhs.m_failed_wakeup_retry_cap; m_next_worker = rhs.m_next_worker.load(); - m_num_busy_waiters = rhs.m_num_busy_waiters.load(); + m_rouser = std::move(rhs.m_rouser); + m_state = std::move(rhs.m_state); } return *this; @@ -165,9 +156,12 @@ inline GenericThreadPool& GenericThreadPool::operator= template class Queue> inline GenericThreadPool::~GenericThreadPool() { - m_rouser.stop(); + if (!m_state || !m_rouser) + return; - for (auto& worker_ptr : m_workers) + m_rouser->stop(); + + for (auto& worker_ptr : m_state->workers()) worker_ptr->stop(); } @@ -175,6 +169,9 @@ template class Queue> template inline bool GenericThreadPool::tryPost(Handler&& handler) { + if (!m_state || !m_rouser) + throw std::runtime_error("Attempting to invoke post on a moved object."); + return tryPostImpl(std::forward(handler), m_failed_wakeup_retry_cap); } @@ -195,13 +192,13 @@ inline bool GenericThreadPool::tryPostImpl(Handler&& handler, size_ // is fully utilized (num active workers = argmin(num tasks, num total workers)). // If there aren't busy waiters, let's see if we have any idling threads. // These incur higher overhead to wake up than the busy waiters. - if (m_num_busy_waiters.load(std::memory_order_acquire) == 0) + if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0) { - auto result = m_idle_workers.tryEmptyAny(); + auto result = m_state->idleWorkers().tryEmptyAny(); if (result.first) { - auto success = m_workers[result.second]->tryPost(std::forward(handler)); - m_workers[result.second]->wake(); + auto success = m_state->workers()[result.second]->tryPost(std::forward(handler)); + m_state->workers()[result.second]->wake(); // The above post will only fail if the idle worker's queue is full, which is an extremely // low probability scenario. In that case, we wake the worker and let it get to work on @@ -219,24 +216,24 @@ inline bool GenericThreadPool::tryPostImpl(Handler&& handler, size_ auto initialWorkerId = id; do { - if (m_workers[id]->tryPost(std::forward(handler))) + if (m_state->workers()[id]->tryPost(std::forward(handler))) { // The following section increases the probability that tasks will not be dropped. // This is a soft constraint, the strict task dropping bound is covered by the Rouser // thread's functionality. This code experimentally lowers task response time under // low thread pool utilization without incurring significant performance penalties at // high thread pool utilization. - if (m_num_busy_waiters.load(std::memory_order_acquire) == 0) + if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0) { - auto result = m_idle_workers.tryEmptyAny(); + auto result = m_state->idleWorkers().tryEmptyAny(); if (result.first) - m_workers[result.second]->wake(); + m_state->workers()[result.second]->wake(); } return true; } - ++id %= m_workers.size(); + ++id %= m_state->workers().size(); } while (id != initialWorkerId); // All Queues in our thread pool are full during one whole iteration. @@ -249,8 +246,8 @@ inline size_t GenericThreadPool::getWorkerId() { auto id = Worker::getWorkerIdForCurrentThread(); - if (id > m_workers.size()) - id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size(); + if (id > m_state->workers().size()) + id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_state->workers().size(); return id; } diff --git a/include/thread_pool/thread_pool_state.hpp b/include/thread_pool/thread_pool_state.hpp new file mode 100644 index 00000000..91b590bd --- /dev/null +++ b/include/thread_pool/thread_pool_state.hpp @@ -0,0 +1,127 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace tp +{ + +template class Queue> +class Worker; + +template class Queue> +class ThreadPoolState +{ + using WorkerVector = std::vector>>; + +public: + + /** + * @brief create Construct a shared instance of a thread pool state object. + * @param options Creation options. + */ + static std::shared_ptr> create(ThreadPoolOptions const& options); + + /** + * @brief idleWorkers obtain the idle worker queue. + */ + SlottedBag& idleWorkers(); + + /** + * @brief workers Obtain the worker list. + */ + WorkerVector& workers(); + + /** + * @brief numBusyWaiters Obtain the busy waiter count. + */ + std::atomic& numBusyWaiters(); + +protected: + + /** + * @brief ThreadPoolState Construct a thread pool state. + * @param options Creation options. + */ + explicit ThreadPoolState(ThreadPoolOptions const& options); + + /** + * @brief Copy ctor implementation. + */ + ThreadPoolState(ThreadPoolState const&) = delete; + + /** + * @brief Copy assignment implementation. + */ + ThreadPoolState& operator=(ThreadPoolState const& rhs) = delete; + + /** + * @brief Move ctor implementation. + */ + ThreadPoolState(ThreadPoolState&& rhs) = delete; + + /** + * @brief Move assignment implementaion.v + */ + ThreadPoolState& operator=(ThreadPoolState&& rhs) = delete; + + /** + * @brief ~ThreadPoolState destructor. + */ + virtual ~ThreadPoolState() = default; + +private: + + SlottedBag m_idle_workers; + WorkerVector m_workers; + std::atomic m_num_busy_waiters; +}; + + +/// Implementation + +template class Queue> +inline std::shared_ptr> ThreadPoolState::create(ThreadPoolOptions const& options) +{ + struct Creator : public ThreadPoolState + { + Creator(ThreadPoolOptions options) + : ThreadPoolState(options) + { + } + }; + + return std::make_shared(options); +} + +template class Queue> +inline SlottedBag& ThreadPoolState::idleWorkers() +{ + return m_idle_workers; +} + +template class Queue> +inline typename ThreadPoolState::WorkerVector& ThreadPoolState::workers() +{ + return m_workers; +} + +template class Queue> +inline std::atomic& ThreadPoolState::numBusyWaiters() +{ + return m_num_busy_waiters; +} + +template class Queue> +inline ThreadPoolState::ThreadPoolState(ThreadPoolOptions const& options) + : m_idle_workers(options.threadCount()) + , m_workers(options.threadCount()) + , m_num_busy_waiters(0) +{ +} + +} diff --git a/include/thread_pool/worker.hpp b/include/thread_pool/worker.hpp index 5af584fa..1b7d8373 100644 --- a/include/thread_pool/worker.hpp +++ b/include/thread_pool/worker.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -70,17 +71,13 @@ class Worker final /** * @brief Move ctor implementation. - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ - Worker(Worker&& rhs) noexcept; + Worker(Worker&& rhs) = delete; /** * @brief Move assignment implementaion. - * @note Be very careful when invoking this while the thread pool is - * active, or in an otherwise undefined state. */ - Worker& operator=(Worker&& rhs) noexcept; + Worker& operator=(Worker&& rhs) = delete; /** * @brief Destructor implementation. @@ -90,12 +87,10 @@ class Worker final /** * @brief start Create the executing thread and start tasks execution. * @param id Worker ID. - * @param workers A pointer to the vector containing sibling workers for performing round robin work stealing. - * @param idle_workers A pointer to the slotted bag containing all idle workers. - * @param num_busy_waiters A pointer to the atomic busy waiter counter. + * @param state A pointer to thread pool's shared state. * @note The parameters passed into this function generally relate to the global thread pool state. */ - void start(const size_t id, WorkerVector& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters); + void start(const size_t id, std::shared_ptr> state); /** * @brief stop Stop all worker's thread and stealing activity. @@ -161,12 +156,10 @@ class Worker final /** * @brief threadFunc Executing thread function. * @param id Worker ID. - * @param workers A pointer to the vector containing sibling workers for performing round robin work stealing. - * @param idle_workers A pointer to the slotted bag containing all idle workers. - * @param num_busy_waiters A pointer to the atomic busy waiter counter. + * @param state A pointer to thread pool's shared state. * @note The parameters passed into this function generally relate to the global thread pool state. */ - void threadFunc(const size_t id, WorkerVector& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters); + void threadFunc(const size_t id, std::shared_ptr> state); Queue m_queue; std::atomic m_state; @@ -205,33 +198,6 @@ inline Worker::Worker(ThreadPoolOptions::BusyWaitOptions const& bus { } -template class Queue> -inline Worker::Worker(Worker&& rhs) noexcept -{ - *this = std::move(rhs); -} - -template class Queue> -inline Worker& Worker::operator=(Worker&& rhs) noexcept -{ - if (this != &rhs) - { - m_queue = std::move(rhs.m_queue); - m_state = rhs.m_state.load(); - m_thread = std::move(rhs.m_thread); - m_next_donor = rhs.m_next_donor; - m_busy_wait_options = std::move(rhs.m_busy_wait_options); - - m_idle_mutex = std::move(rhs.m_idle_mutex); - m_idle_cv = std::move(rhs.m_idle_cv); - - m_is_idle = rhs.m_is_idle; - m_abort_idle = rhs.m_abort_idle; - } - - return *this; -} - template class Queue> inline Worker::~Worker() { @@ -252,13 +218,13 @@ inline void Worker::stop() } template class Queue> -inline void Worker::start(const size_t id, WorkerVector& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters) +inline void Worker::start(const size_t id, std::shared_ptr> state) { auto expectedState = State::Initialized; if (!m_state.compare_exchange_strong(expectedState, State::Running, std::memory_order_acq_rel)) throw std::runtime_error("Cannot start Worker: it has previously been started or stopped."); - m_thread = std::thread(&Worker::threadFunc, this, id, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters)); + m_thread = std::thread(&Worker::threadFunc, this, id, state); } template class Queue> @@ -343,10 +309,10 @@ bool Worker::tryGetTask(Task& task, WorkerVector& workers) } template class Queue> -inline void Worker::threadFunc(const size_t id, WorkerVector& workers, SlottedBag& idle_workers, std::atomic& num_busy_waiters) +inline void Worker::threadFunc(const size_t id, std::shared_ptr> state) { detail::thread_id() = id; - m_next_donor = (id + 1) % workers.size(); + m_next_donor = (id + 1) % state->workers().size(); Task handler; bool task_found = false; @@ -356,7 +322,7 @@ inline void Worker::threadFunc(const size_t id, WorkerVector& worke { // By default, this loop operates in the active state. // We poll for items from our local task queue and try to steal from others. - if (tryGetTask(handler, workers)) + if (tryGetTask(handler, state->workers())) { handleTask(handler); continue; @@ -365,18 +331,18 @@ inline void Worker::threadFunc(const size_t id, WorkerVector& worke // We were unable to obtain a task. // We now transition into the busy wait state. task_found = false; - num_busy_waiters.fetch_add(1, std::memory_order_acq_rel); + state->numBusyWaiters().fetch_add(1, std::memory_order_acq_rel); for (auto i = 0u; i < m_busy_wait_options.numIterations() && !task_found; i++) { std::this_thread::sleep_for(m_busy_wait_options.iterationFunction()(i)); - task_found = tryGetTask(handler, workers); + task_found = tryGetTask(handler, state->workers()); } // If we found a task during our busy wait sequence, we abort it and transition back into the active loop. if (task_found) { - num_busy_waiters.fetch_sub(1, std::memory_order_acq_rel); + state->numBusyWaiters().fetch_sub(1, std::memory_order_acq_rel); handleTask(handler); // Handle the task body only once we decrement the busy waiting loop. continue; } @@ -390,24 +356,24 @@ inline void Worker::threadFunc(const size_t id, WorkerVector& worke } // We put this worker up for grabs as a recipient to new posts in the thread pool. - idle_workers.fill(id); + state->idleWorkers().fill(id); // We need to transition out of the busy wait state after we have submitted ourselves to the idle // worker queue in order to avoid a race. - num_busy_waiters.fetch_sub(1, std::memory_order_acq_rel); + state->numBusyWaiters().fetch_sub(1, std::memory_order_acq_rel); // While we were adding this worker to the idle worker bag, a job may have been posted into this // worker's queue. We need to check for work again before initiating the deep sleep sequence, otherwise // the given task may be lost. // Any further posts will flip the m_abort_idle flag to true, and we will catch them later. - if (tryGetTask(handler, workers)) + if (tryGetTask(handler, state->workers())) { handleTask(handler); // A task was indeed posted in the time it took this worker to enter the bag. // We remove the worker from the bag. If the internal state of the bag was not changed, // this means a different thread has already removed this worker from the idle queue, // and this case will be caught below. Looping early in this case will cause a loss of synchronization. - if (idle_workers.empty(id)) + if (state->idleWorkers().empty(id)) continue; }