Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 13 additions & 37 deletions include/thread_pool/rouser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <thread_pool/slotted_bag.hpp>
#include <thread_pool/thread_pool_options.hpp>
#include <thread_pool/thread_pool_state.hpp>
#include <thread_pool/worker.hpp>

#include <atomic>
Expand Down Expand Up @@ -52,17 +53,13 @@ class Rouser final

/**
* @brief Move ctor implementation.
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
Rouser(Rouser&& rhs) noexcept;
Rouser(Rouser&& rhs) = delete;

/**
* @brief Move assignment implementaion.
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
Rouser& operator=(Rouser&& rhs) noexcept;
Rouser& operator=(Rouser&& rhs) = delete;

/**
* @brief Destructor implementation.
Expand All @@ -71,13 +68,11 @@ class Rouser final

/**
* @brief start Create the executing thread and start tasks execution.
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
* @param idle_workers A reference to the slotted bag containing all idle workers.
* @param num_busy_waiters A reference to the atomic busy waiter counter.
* @param state A pointer to thread pool's shared state.
* @note The parameters passed into this function generally relate to the global thread pool state.
*/
template <typename Task, template<typename> class Queue>
void start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
void start(std::shared_ptr<ThreadPoolState<Task, Queue>> state);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better now.


/**
* @brief stop Stop all worker's thread and stealing activity.
Expand All @@ -91,12 +86,10 @@ class Rouser final

/**
* @brief threadFunc Executing thread function.
* @param workers A reference to the vector containing sibling workers for performing round robin work stealing.
* @param idle_workers A reference to the slotted bag containing all idle workers.
* @param num_busy_waiters A reference to the atomic busy waiter counter.
* @param state A pointer to thread pool's shared state.
*/
template <typename Task, template<typename> class Queue>
void threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters);
void threadFunc(std::shared_ptr<ThreadPoolState<Task, Queue>> shared_state);

std::atomic<State> m_state;
std::thread m_thread;
Expand All @@ -109,36 +102,19 @@ inline Rouser::Rouser(std::chrono::microseconds rouse_period)
{
}

inline Rouser::Rouser(Rouser&& rhs) noexcept
{
*this = std::move(rhs);
}

inline Rouser& Rouser::operator=(Rouser&& rhs) noexcept
{
if (this != &rhs)
{
m_state = rhs.m_state.load();
m_thread = std::move(rhs.m_thread);
m_rouse_period = std::move(rhs.m_rouse_period);
}

return *this;
}

inline Rouser::~Rouser()
{
stop();
}

template <typename Task, template<typename> class Queue>
inline void Rouser::start(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
inline void Rouser::start(std::shared_ptr<ThreadPoolState<Task, Queue>> state)
{
auto expectedState = State::Initialized;
if (!m_state.compare_exchange_strong(expectedState, State::Running, std::memory_order_acq_rel))
throw std::runtime_error("Cannot start Rouser: it has previously been started or stopped.");

m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, std::ref(workers), std::ref(idle_workers), std::ref(num_busy_waiters));
m_thread = std::thread(&Rouser::threadFunc<Task, Queue>, this, state);
}

inline void Rouser::stop()
Expand All @@ -151,16 +127,16 @@ inline void Rouser::stop()
}

template <typename Task, template<typename> class Queue>
inline void Rouser::threadFunc(std::vector<std::unique_ptr<Worker<Task, Queue>>>& workers, SlottedBag<Queue>& idle_workers, std::atomic<size_t>& num_busy_waiters)
inline void Rouser::threadFunc(std::shared_ptr<ThreadPoolState<Task, Queue>> shared_state)
{
while (m_state.load(std::memory_order_acquire) == State::Running)
{
// Try to wake up a thread if there are no current busy waiters.
if (num_busy_waiters.load(std::memory_order_acquire) == 0)
if (shared_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
{
auto result = idle_workers.tryEmptyAny();
auto result = shared_state->idleWorkers().tryEmptyAny();
if (result.first)
workers[result.second]->wake();
shared_state->workers()[result.second]->wake();
}

// Sleep.
Expand Down
65 changes: 31 additions & 34 deletions include/thread_pool/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <thread_pool/mpmc_bounded_queue.hpp>
#include <thread_pool/slotted_bag.hpp>
#include <thread_pool/thread_pool_options.hpp>
#include <thread_pool/thread_pool_state.hpp>
#include <thread_pool/worker.hpp>
#include <thread_pool/rouser.hpp>

Expand Down Expand Up @@ -53,15 +54,11 @@ class GenericThreadPool final

/**
* @brief Move ctor implementation.
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
GenericThreadPool(GenericThreadPool&& rhs) noexcept;

/**
* @brief Move assignment implementaion.v
* @note Be very careful when invoking this while the thread pool is
* active, or in an otherwise undefined state.
*/
GenericThreadPool& operator=(GenericThreadPool&& rhs) noexcept;

Expand Down Expand Up @@ -109,35 +106,31 @@ class GenericThreadPool final
*/
size_t getWorkerId();

SlottedBag<Queue> m_idle_workers;
WorkerVector m_workers;
Rouser m_rouser;
size_t m_failed_wakeup_retry_cap;
std::atomic<size_t> m_next_worker;
std::atomic<size_t> m_num_busy_waiters;
std::shared_ptr<Rouser> m_rouser;
std::shared_ptr<ThreadPoolState<Task, Queue>> m_state;
};


/// Implementation

template <typename Task, template<typename> class Queue>
inline GenericThreadPool<Task, Queue>::GenericThreadPool(ThreadPoolOptions options)
: m_idle_workers(options.threadCount())
, m_workers(options.threadCount())
, m_rouser(options.rousePeriod())
, m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
: m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
, m_next_worker(0)
, m_num_busy_waiters(0)
, m_rouser(std::make_shared<Rouser>(options.rousePeriod()))
, m_state(ThreadPoolState<Task, Queue>::create(options))
{
// Instatiate all workers.
for (auto it = m_workers.begin(); it != m_workers.end(); ++it)
for (auto it = m_state->workers().begin(); it != m_state->workers().end(); ++it)
it->reset(new Worker<Task, Queue>(options.busyWaitOptions(), options.queueSize()));

// Initialize all worker threads.
for (size_t i = 0; i < m_workers.size(); ++i)
m_workers[i]->start(i, m_workers, m_idle_workers, m_num_busy_waiters);
for (size_t i = 0; i < m_state->workers().size(); ++i)
m_state->workers()[i]->start(i, m_state);

m_rouser.start(m_workers, m_idle_workers, m_num_busy_waiters);
m_rouser->start(m_state);
}

template <typename Task, template<typename> class Queue>
Expand All @@ -151,12 +144,10 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
{
if (this != &rhs)
{
m_idle_workers = std::move(rhs.m_idle_workers);
m_workers = std::move(rhs.m_workers);
m_rouser = std::move(rhs.m_rouser);
m_failed_wakeup_retry_cap = rhs.m_failed_wakeup_retry_cap;
m_next_worker = rhs.m_next_worker.load();
m_num_busy_waiters = rhs.m_num_busy_waiters.load();
m_rouser = std::move(rhs.m_rouser);
m_state = std::move(rhs.m_state);
}

return *this;
Expand All @@ -165,16 +156,22 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
template <typename Task, template<typename> class Queue>
inline GenericThreadPool<Task, Queue>::~GenericThreadPool()
{
m_rouser.stop();
if (!m_state || !m_rouser)
return;

for (auto& worker_ptr : m_workers)
m_rouser->stop();

for (auto& worker_ptr : m_state->workers())
worker_ptr->stop();
}

template <typename Task, template<typename> class Queue>
template <typename Handler>
inline bool GenericThreadPool<Task, Queue>::tryPost(Handler&& handler)
{
if (!m_state || !m_rouser)
throw std::runtime_error("Attempting to invoke post on a moved object.");

return tryPostImpl(std::forward<Handler>(handler), m_failed_wakeup_retry_cap);
}

Expand All @@ -195,13 +192,13 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
// is fully utilized (num active workers = argmin(num tasks, num total workers)).
// If there aren't busy waiters, let's see if we have any idling threads.
// These incur higher overhead to wake up than the busy waiters.
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
{
auto result = m_idle_workers.tryEmptyAny();
auto result = m_state->idleWorkers().tryEmptyAny();
if (result.first)
{
auto success = m_workers[result.second]->tryPost(std::forward<Handler>(handler));
m_workers[result.second]->wake();
auto success = m_state->workers()[result.second]->tryPost(std::forward<Handler>(handler));
m_state->workers()[result.second]->wake();

// The above post will only fail if the idle worker's queue is full, which is an extremely
// low probability scenario. In that case, we wake the worker and let it get to work on
Expand All @@ -219,24 +216,24 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
auto initialWorkerId = id;
do
{
if (m_workers[id]->tryPost(std::forward<Handler>(handler)))
if (m_state->workers()[id]->tryPost(std::forward<Handler>(handler)))
{
// The following section increases the probability that tasks will not be dropped.
// This is a soft constraint, the strict task dropping bound is covered by the Rouser
// thread's functionality. This code experimentally lowers task response time under
// low thread pool utilization without incurring significant performance penalties at
// high thread pool utilization.
if (m_num_busy_waiters.load(std::memory_order_acquire) == 0)
if (m_state->numBusyWaiters().load(std::memory_order_acquire) == 0)
{
auto result = m_idle_workers.tryEmptyAny();
auto result = m_state->idleWorkers().tryEmptyAny();
if (result.first)
m_workers[result.second]->wake();
m_state->workers()[result.second]->wake();
}

return true;
}

++id %= m_workers.size();
++id %= m_state->workers().size();
} while (id != initialWorkerId);

// All Queues in our thread pool are full during one whole iteration.
Expand All @@ -249,8 +246,8 @@ inline size_t GenericThreadPool<Task, Queue>::getWorkerId()
{
auto id = Worker<Task, Queue>::getWorkerIdForCurrentThread();

if (id > m_workers.size())
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
if (id > m_state->workers().size())
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_state->workers().size();

return id;
}
Expand Down
Loading