Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock free, work stealing queue #54

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/mandelbrot/source/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void mandelbrot_threadpool(int image_width, int image_height, int max_iterations

std::cout << "calculating mandelbrot" << std::endl;

dp::thread_pool pool;
dp::thread_pool pool{};
std::vector<std::future<std::vector<rgb>>> futures;
futures.reserve(source.height());
const auto start = std::chrono::steady_clock::now();
Expand Down
16 changes: 9 additions & 7 deletions include/thread_pool/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
#endif

#include "thread_pool/thread_safe_queue.h"
#include "thread_pool/work_stealing_deque.h"

namespace dp {
namespace details {

#ifdef __cpp_lib_move_only_function
// TODO: use move only function, work stealing deque can't use move only types
#if __cpp_lib_move_only_function
using default_function_type = std::move_only_function<void()>;
#else
using default_function_type = std::function<void()>;
Expand Down Expand Up @@ -59,7 +61,7 @@ namespace dp {

do {
// invoke the task
while (auto task = tasks_[id].tasks.pop_front()) {
while (auto task = tasks_[id].tasks.pop_top()) {
// decrement the unassigned tasks as the task is now going
// to be executed
unassigned_tasks_.fetch_sub(1, std::memory_order_release);
Expand All @@ -74,7 +76,7 @@ namespace dp {
// try to steal a task
for (std::size_t j = 1; j < tasks_.size(); ++j) {
const std::size_t index = (id + j) % tasks_.size();
if (auto task = tasks_[index].tasks.steal()) {
if (auto task = tasks_[index].tasks.pop_top()) {
// steal a task
unassigned_tasks_.fetch_sub(1, std::memory_order_release);
std::invoke(std::move(task.value()));
Expand Down Expand Up @@ -141,8 +143,8 @@ namespace dp {
typename ReturnType = std::invoke_result_t<Function &&, Args &&...>>
requires std::invocable<Function, Args...>
[[nodiscard]] std::future<ReturnType> enqueue(Function f, Args... args) {
#ifdef __cpp_lib_move_only_function
// we can do this in C++23 because we now have support for move only functions
#if 0 // __cpp_lib_move_only_function
// we can do this in C++23 because we now have support for move only functions
std::promise<ReturnType> promise;
auto future = promise.get_future();
auto task = [func = std::move(f), ... largs = std::move(args),
Expand Down Expand Up @@ -262,12 +264,12 @@ namespace dp {
}

// assign work
tasks_[i].tasks.push_back(std::forward<Function>(f));
tasks_[i].tasks.push_bottom(std::forward<Function>(f));
tasks_[i].signal.release();
}

struct task_item {
dp::thread_safe_queue<FunctionType> tasks{};
dp::work_stealing_deque<FunctionType> tasks{};
std::binary_semaphore signal{0};
};

Expand Down
205 changes: 205 additions & 0 deletions include/thread_pool/work_stealing_deque.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#pragma once

#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>

namespace dp {

#ifdef __cpp_lib_hardware_interference_size
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
inline constexpr std::size_t hardware_destructive_interference_size =
2 * sizeof(std::max_align_t);
#endif

/**
* @brief Chase-Lev work stealing queue
* @details Support single producer, multiple consumer. The producer owns the back, consumers
* own the top. Consumers can also take from the top of the queue. The queue is "lock-free" in
* that it does not directly use mutexes or locks.
*
* This is an implementation of the deque described in "Correct and Efficient Work-Stealing for
* Weak Memory Models" and "Dynamic Circular Work-Stealing Deque" by Chase,Lev.
*
* This implementation is taken from the following implementations
* - https://github.com/ConorWilliams/ConcurrentDeque/blob/main/include/riften/deque.hpp
* - https://github.com/taskflow/work-stealing-queue/blob/master/wsq.hpp
*
* I've made some minor edits and changes based on new C++ 23 features and other personal coding
* style choices/preferences.
*/
template <typename T>
requires std::is_destructible_v<T>
class work_stealing_deque final {
/**
* @brief Simple circular array buffer that can regrow
*/
class circular_buffer final {
std::int64_t size_;
std::int64_t mask_;
std::unique_ptr<T[]> buffer_ = std::make_unique_for_overwrite<T[]>(size_);

public:
explicit circular_buffer(const std::int64_t size) : size_(size), mask_(size - 1) {
// size must be a power of 2
assert((size % 2) == 0);
}

[[nodiscard]] std::int64_t capacity() const noexcept { return size_; }

void store(const std::size_t index, T&& value) noexcept
requires std::is_move_assignable_v<T>
{
buffer_[index & mask_] = std::move(value);
}

T&& load(const std::size_t index) noexcept {
if constexpr (std::is_move_constructible_v<T>) {
return std::move(buffer_[index & mask_]);
} else {
return buffer_[index & mask_];
}
}

/**
* @brief Resize the internal buffer. Copies [start, end) to the new buffer.
* @param start The start index
* @param end The end index
*/
circular_buffer* resize(const std::size_t start, const std::size_t end) {
auto temp = new circular_buffer(size_ * 2);
for (std::size_t i = start; i != end; ++i) {
temp->store(i, load(i));
}
return temp;
}
};

constexpr static std::size_t default_count = 1024;
alignas(hardware_destructive_interference_size) std::atomic_int64_t top_;
alignas(hardware_destructive_interference_size) std::atomic_int64_t bottom_;
alignas(hardware_destructive_interference_size) std::atomic<circular_buffer*> buffer_;

std::vector<std::unique_ptr<circular_buffer>> garbage_{32};

static constexpr std::memory_order relaxed = std::memory_order_relaxed;
static constexpr std::memory_order acquire = std::memory_order_acquire;
static constexpr std::memory_order consume = std::memory_order_consume;
static constexpr std::memory_order release = std::memory_order_release;
static constexpr std::memory_order seq_cst = std::memory_order_seq_cst;

public:
explicit work_stealing_deque(const std::size_t& capacity = default_count)
: top_(0), bottom_(0), buffer_(new circular_buffer(capacity)) {}

// queue is non-copyable
work_stealing_deque(work_stealing_deque&) = delete;
work_stealing_deque& operator=(work_stealing_deque&) = delete;

[[nodiscard]] std::size_t capacity() const { return buffer_.load(relaxed)->capacity(); }
[[nodiscard]] std::size_t size() const {
const auto bottom = bottom_.load(relaxed);
const auto top = top_.load(relaxed);
return static_cast<std::size_t>(bottom >= top ? bottom - top : 0);
}

[[nodiscard]] bool empty() const { return !size(); }

template <typename... Args>
void emplace(Args&&... args) {
// construct first in case it throws
T value(std::forward<Args>(args)...);
push_bottom(std::move(value));
}

void push_bottom(T&& value) {
auto bottom = bottom_.load(relaxed);
auto top = top_.load(acquire);
auto* buffer = buffer_.load(relaxed);

// check if the buffer is full
if (buffer->capacity() - 1 < (bottom - top)) {
garbage_.emplace_back(std::exchange(buffer, buffer->resize(top, bottom)));
buffer_.store(buffer, relaxed);
}

buffer->store(bottom, std::forward<T>(value));

// this synchronizes with other acquire fences
// memory operations about this line cannot be reordered
std::atomic_thread_fence(release);

bottom_.store(bottom + 1, relaxed);
}

std::optional<T> take_bottom() {
auto bottom = bottom_.load(relaxed) - 1;
auto* buffer = buffer_.load(relaxed);

// prevent stealing
bottom_.store(bottom, relaxed);

// this synchronizes with other release fences
// memory ops below this line cannot be reordered
std::atomic_thread_fence(seq_cst);

std::optional<T> item = std::nullopt;

auto top = top_.load(relaxed);
if (top <= bottom) {
// queue isn't empty
item = buffer->load(bottom);
if (top == bottom) {
// there is only 1 item left in the queue, we need the CAS to succeed
// since another thread may be trying to steal and could steal before we're able
// to take the bottom
if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) {
// failed race
bottom_.store(bottom + 1, relaxed);
item = std::nullopt;
}
bottom_.store(bottom + 1, relaxed);
}
} else {
bottom_.store(bottom + 1, relaxed);
}

return item;
}

/**
* @brief Steal from the top of the queue
*
* @return std::optional<T>
*/
std::optional<T> pop_top() {
auto top = top_.load(acquire);
// this synchronizes with other release fences
// memory ops below this line cannot be reordered with ops above this line
std::atomic_thread_fence(seq_cst);
auto bottom = bottom_.load(acquire);
std::optional<T> item;

if (top < bottom) {
// non-empty queue
auto* buffer = buffer_.load(consume);
item = buffer->load(top);

if (!top_.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) {
// failed the race
item = std::nullopt;
}
}
// empty queue
return item;
}
};
} // namespace dp
Loading
Loading