From 9d27555678b39de765ab76c8cd17459e80cabc58 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:03:01 -0400 Subject: [PATCH 01/34] Update CPM version --- cmake/CPM.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 59208e4..6aacba2 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.32.0) +set(CPM_DOWNLOAD_VERSION 0.35.5) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) From da808131500efb4ed7f4e693d99cd57d576f2542 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:03:45 -0400 Subject: [PATCH 02/34] Add basic support for running coroutines on pool Added a basic awaitable type and function to allow for running coroutines on the thread pool --- include/thread_pool/thread_pool.h | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index c296fc8..acf5e40 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -165,6 +166,26 @@ namespace dp { })); } + /** + * @brief Allows you to schedule coroutines to run on the thread pool. + */ + auto schedule() { + /// @brief Simple awaitable type that we can return. + struct scheduled_operation { + dp::thread_pool<> *thread_pool_; + bool await_ready() { return false; }; + void await_suspend(std::coroutine_handle<> handle) { + if (thread_pool_) { + thread_pool_->enqueue_detach([](std::coroutine_handle<> h) { h.resume(); }, + handle); + } + } + void await_resume() {} + }; + + return scheduled_operation{this}; + } + private: template void enqueue_task(Function &&f) { From 83bec54426f356c6b6806115061153a24e7c3297 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:04:14 -0400 Subject: [PATCH 03/34] WIP: Add coroutine thread pool benchmark --- benchmark/CMakeLists.txt | 8 ++++++- benchmark/include/utilities.h | 10 ++++++++ benchmark/source/thread_pool.cpp | 41 +++++++++++++++++++++++++++++--- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 6205011..2c62c7d 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -15,6 +15,12 @@ CPMAddPackage( OPTIONS "BENCHMARK_ENABLE_TESTING OFF" ) +CPMAddPackage( + NAME cppcoro + GITHUB_REPOSITORY andreasbuhr/cppcoro + GIT_TAG 10bbcdbf2be3ad3aa56febcf4c7662d771460a99 +) + if(benchmark_ADDED) # patch benchmark target set_target_properties(benchmark PROPERTIES CXX_STANDARD 20) @@ -26,7 +32,7 @@ file(GLOB sources CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/source/*.cpp) file(GLOB headers CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/include/*.h) add_executable(${PROJECT_NAME} ${headers} ${sources}) -target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool) +target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool cppcoro) set_target_properties(${PROJECT_NAME} PROPERTIES CXX_STANDARD 20) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/benchmark/include/utilities.h b/benchmark/include/utilities.h index c16969a..cfee63d 100644 --- a/benchmark/include/utilities.h +++ b/benchmark/include/utilities.h @@ -2,11 +2,14 @@ #include #include +#include #include #include #include #include +#include "thread_pool/thread_pool.h" + inline std::size_t index(std::size_t row, std::size_t col, std::size_t width) { return row * width + col; } @@ -22,6 +25,12 @@ inline void multiply_array(std::span a, std::span b, std::span re } } +inline cppcoro::task<> co_multiply_array(std::span a, std::span b, std::span result, + dp::thread_pool<>& pool) { + co_await pool.schedule(); + multiply_array(a, b, result); +} + template using multiplication_pair = std::pair, std::vector>; @@ -30,6 +39,7 @@ template const std::int64_t& array_size, const std::int64_t& number_of_multiplications) { static std::uniform_int_distribution distribution(std::numeric_limits::min(), std::numeric_limits::max()); + // yes, predictable values static std::default_random_engine generator{}; std::vector> computations; diff --git a/benchmark/source/thread_pool.cpp b/benchmark/source/thread_pool.cpp index 40676d7..333efe4 100644 --- a/benchmark/source/thread_pool.cpp +++ b/benchmark/source/thread_pool.cpp @@ -1,14 +1,18 @@ #include #include +#include +#include + #include "utilities.h" -static void BM_array_multiplication(benchmark::State& state) { +static void BM_array_multiplication_thread_pool(benchmark::State& state) { const auto array_size = state.range(0); const std::size_t multiplications_to_perform = state.range(1); // generate the data - const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + const auto computations = generate_benchmark_data(array_size, + multiplications_to_perform); // task that is run on a new thread auto thread_task = [](multiplication_pair pair) { @@ -28,7 +32,7 @@ static void BM_array_multiplication(benchmark::State& state) { } } -BENCHMARK(BM_array_multiplication) +BENCHMARK(BM_array_multiplication_thread_pool) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) @@ -39,3 +43,34 @@ BENCHMARK(BM_array_multiplication) ->MeasureProcessCPUTime() ->UseRealTime() ->Name("dp::thread_pool array mult"); + +inline cppcoro::task<> mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { + std::vector result(pair.first.size()); + co_await co_multiply_array(pair.first, pair.second, result, pool); +} + +static void BM_array_multiplication_thread_pool_coroutine(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + dp::thread_pool pool{}; + std::vector> tasks; + for (auto _ : state) { + for (auto mult_pair : computations) { + tasks.emplace_back(mult_task(mult_pair, pool)); + } + } + cppcoro::sync_wait(cppcoro::when_all(std::move(tasks))); +} + +BENCHMARK(BM_array_multiplication_thread_pool_coroutine) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool coroutine array mult"); From c493f52e4406b57cbc1cf5c3f8151f110036dec8 Mon Sep 17 00:00:00 2001 From: Paul T Date: Tue, 13 Sep 2022 11:54:56 -0400 Subject: [PATCH 04/34] Add bulk enqueue function Add ability to enqueue a range of void() functions --- CMakeLists.txt | 5 ++- include/thread_pool/thread_pool.h | 57 +++++++++++++++++++++++++++++-- test/source/thread_pool.cpp | 21 ++++++++++++ 3 files changed, 77 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8228675..05a0d35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,5 @@ cmake_minimum_required(VERSION 3.14 FATAL_ERROR) -# ---- Project ---- - -# Note: update this to your new project's name and version project( ThreadPool VERSION 0.4.1 @@ -60,6 +57,8 @@ target_link_libraries(${PROJECT_NAME} INTERFACE Threads::Threads) # being a cross-platform target, we enforce standards conformance on MSVC target_compile_options(${PROJECT_NAME} INTERFACE "$<$:/permissive->") +target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines>") +target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines-ts>") target_include_directories( ${PROJECT_NAME} INTERFACE $ diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index acf5e40..52009c1 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -144,6 +144,25 @@ namespace dp { #endif } + /** + * @brief Enqueue a list of tasks into the thread pool. + * @tparam Iterator An iterator type + * @tparam IteratorType The underlying value type of the iterator + * @param begin The start of the task range + * @param end The end of the task range + */ + template ::value_type> + requires std::input_iterator && std::invocable && + std::is_same_v> + void enqueue(Iterator begin, Iterator end) { + // simple range check + if (begin >= end) return; + + // enqueue all the tasks + enqueue_tasks(begin, end); + } + /** * @brief Enqueue a task to be executed in the thread pool that returns void. * @tparam Function An invokable type. @@ -167,20 +186,21 @@ namespace dp { } /** - * @brief Allows you to schedule coroutines to run on the thread pool. + * @brief Allows you to schedule coroutines to run on the thread pool. */ auto schedule() { /// @brief Simple awaitable type that we can return. struct scheduled_operation { dp::thread_pool<> *thread_pool_; - bool await_ready() { return false; }; + static bool await_ready() { return false; } void await_suspend(std::coroutine_handle<> handle) { if (thread_pool_) { thread_pool_->enqueue_detach([](std::coroutine_handle<> h) { h.resume(); }, handle); } } - void await_resume() {} + + static void await_resume() {} }; return scheduled_operation{this}; @@ -195,6 +215,37 @@ namespace dp { tasks_[i].signal.release(); } + template + void enqueue_tasks(Iterator begin, Iterator end) { + // get the count of tasks + const auto &tasks = std::distance(begin, end); + pending_tasks_.fetch_add(tasks, std::memory_order_relaxed); + + // get the number of threads once and re-use + const auto &task_size = tasks_.size(); + + // split tasks among all threads, go through tasks [begin, end) + auto i = count_++ % task_size; + + // start index of where we're adding tasks + const auto &start = i; + // total count of threads we need to wake up. + const auto &count = ::std::min(tasks, task_size); + + for (auto it = begin; it < end; ++it) { + // push the task + tasks_[i].tasks.push(std::move(*it)); + // recalculate the index + i = count_++ % task_size; + } + + // release all the needed signals to wake all threads + for (std::size_t j = 0; j < count; j++) { + const auto &index = (start + j) % task_size; + tasks_[index].signal.release(); + } + } + struct task_item { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 5076989..5c8d80d 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -157,3 +157,24 @@ TEST_CASE("Ensure task exception doesn't kill worker thread") { CHECK_EQ(count.load(), 1); } + +TEST_CASE("Ensure work completes upon destruction when batching tasks") { + std::atomic counter; + constexpr auto total_tasks = 30; + { + dp::thread_pool pool(4); + std::vector> tasks; + tasks.reserve(total_tasks); + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + ++counter; + }; + tasks.emplace_back(task); + } + + pool.enqueue(tasks.begin(), tasks.end()); + } + + CHECK_EQ(counter.load(), total_tasks); +} From e8b9e6d80d3252f8b85c7d313632bbccf866f0e0 Mon Sep 17 00:00:00 2001 From: Paul T Date: Tue, 13 Sep 2022 12:03:07 -0400 Subject: [PATCH 05/34] Do not build benchmarks for style check CI --- .github/workflows/style.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml index 82fd25c..a4baa0d 100644 --- a/.github/workflows/style.yml +++ b/.github/workflows/style.yml @@ -29,7 +29,7 @@ jobs: pip3 install cmake_format==0.6.11 pyyaml - name: configure - run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF + run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF - name: check style run: cmake --build build --target check-format From 3be61eac0a0028a38f72e11ef937c23946abe865 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 11:33:15 -0400 Subject: [PATCH 06/34] Add ccache support --- CMakeLists.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 05a0d35..f4bfcc8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,9 +16,14 @@ if(PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR) ) endif() +find_program(CCACHE_EXE ccache) +if(EXISTS ${CCACHE_EXE}) + message(STATUS "Found ccache ${CCACHE_EXE}") + set(CMAKE_CXX_COMPILER_LAUNCHER ${CCACHE_EXE}) +endif() + # ---- Add dependencies via CPM ---- # see https://github.com/TheLartians/CPM.cmake for more info - include(cmake/CPM.cmake) # PackageProject.cmake will be used to make our target installable From e42fa8f3c4e1c3211891eead8427e5e02d12c677 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:05:35 -0400 Subject: [PATCH 07/34] Try to correct errors in tests --- test/source/thread_pool.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 5c8d80d..1556933 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -2,6 +2,7 @@ #include #include +#include #include auto multiply(int a, int b) { return a * b; } @@ -130,19 +131,19 @@ TEST_CASE("Ensure task load is spread evenly across threads") { } TEST_CASE("Ensure task exception doesn't kill worker thread") { - auto throw_task = [](int) -> int { throw std::logic_error("Error occurred."); }; + auto throw_task = [](int) -> int { throw std::logic_error(std::string("Error occurred.")); }; auto regular_task = [](int input) -> int { return input * 2; }; std::atomic_uint_fast64_t count(0); - auto throw_no_return = []() { throw std::logic_error("Error occurred."); }; + auto throw_no_return = []() { throw std::logic_error(std::string("Error occurred.")); }; auto no_throw_no_return = [&count]() { std::this_thread::sleep_for(std::chrono::seconds(1)); count += 1; }; { - dp::thread_pool pool; + dp::thread_pool pool(4); auto throw_future = pool.enqueue(throw_task, 1); auto no_throw_future = pool.enqueue(regular_task, 2); From 107305eaf72ff476705e1c8b898acbbe36efcf49 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:05:46 -0400 Subject: [PATCH 08/34] Another CPM update --- cmake/CPM.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 6aacba2..bacb192 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.35.5) +set(CPM_DOWNLOAD_VERSION 0.35.6) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) From fa20ee662ef6271d2e78c7d98aa3a72aca9b1b3e Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:06:42 -0400 Subject: [PATCH 09/34] Add benchmark for batched task execution Also changed benchmarks so that only 1 runs in debug mode for the dp::thread_pool --- benchmark/source/thread_pool.cpp | 60 ++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/benchmark/source/thread_pool.cpp b/benchmark/source/thread_pool.cpp index 333efe4..5001315 100644 --- a/benchmark/source/thread_pool.cpp +++ b/benchmark/source/thread_pool.cpp @@ -11,8 +11,7 @@ static void BM_array_multiplication_thread_pool(benchmark::State& state) { const std::size_t multiplications_to_perform = state.range(1); // generate the data - const auto computations = generate_benchmark_data(array_size, - multiplications_to_perform); + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); // task that is run on a new thread auto thread_task = [](multiplication_pair pair) { @@ -33,18 +32,68 @@ static void BM_array_multiplication_thread_pool(benchmark::State& state) { } BENCHMARK(BM_array_multiplication_thread_pool) +#if defined(NDEBUG) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) ->Args({512, 75}) ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif ->Unit(benchmark::kMillisecond) ->ReportAggregatesOnly(true) ->MeasureProcessCPUTime() ->UseRealTime() ->Name("dp::thread_pool array mult"); -inline cppcoro::task<> mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { +static void BM_array_multiplication_batch_thread_pool(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + + // generate the data + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + + // create our tasks + std::vector> tasks{}; + tasks.reserve(computations.size()); + + // task that is run on a new thread + auto thread_task = [](multiplication_pair pair) { + std::vector result(pair.first.size()); + multiply_array(pair.first, pair.second, result); + }; + + for (const auto& computation : computations) { + auto task = [comp = computation, execution_task = thread_task]() { execution_task(comp); }; + tasks.emplace_back(task); + } + + // create our thread pool using the default size + dp::thread_pool pool{}; + + for (auto _ : state) { + pool.enqueue(tasks.begin(), tasks.end()); + } +} + +BENCHMARK(BM_array_multiplication_batch_thread_pool) +#if defined(NDEBUG) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool batched array mult"); + +inline cppcoro::task mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { std::vector result(pair.first.size()); co_await co_multiply_array(pair.first, pair.second, result, pool); } @@ -60,15 +109,20 @@ static void BM_array_multiplication_thread_pool_coroutine(benchmark::State& stat tasks.emplace_back(mult_task(mult_pair, pool)); } } + cppcoro::sync_wait(cppcoro::when_all(std::move(tasks))); } BENCHMARK(BM_array_multiplication_thread_pool_coroutine) +#if defined(NDEBUG) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) ->Args({512, 75}) ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif ->Unit(benchmark::kMillisecond) ->ReportAggregatesOnly(true) ->MeasureProcessCPUTime() From 94fc4667047f3ec873b1b05106994ef503d2e04d Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:07:19 -0400 Subject: [PATCH 10/34] Build fixes for coroutines and clang --- CMakeLists.txt | 6 +++++- benchmark/CMakeLists.txt | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f4bfcc8..035eb00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,11 @@ target_link_libraries(${PROJECT_NAME} INTERFACE Threads::Threads) # being a cross-platform target, we enforce standards conformance on MSVC target_compile_options(${PROJECT_NAME} INTERFACE "$<$:/permissive->") target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines>") -target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines-ts>") + +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libstdc++") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lc++") +endif() target_include_directories( ${PROJECT_NAME} INTERFACE $ diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 2c62c7d..05b08c0 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -12,7 +12,11 @@ CPMAddPackage( NAME benchmark GITHUB_REPOSITORY google/benchmark VERSION 1.6.1 - OPTIONS "BENCHMARK_ENABLE_TESTING OFF" + OPTIONS + "BENCHMARK_ENABLE_TESTING OFF" + "CMAKE_CROSSCOMPILING ON" + "HAVE_STD_REGEX OFF" + "HAVE_POSIX_REGEX OFF" ) CPMAddPackage( From 691414b2b7f517109b0b79bb266e0b7c86dde76e Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:11:24 -0400 Subject: [PATCH 11/34] Improve batched task execution and remove modulus Removed modulus calculation for assigning tasks to thread queues. --- include/thread_pool/thread_pool.h | 42 +++++++++++++++++++------------ 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 52009c1..1bc7918 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -155,12 +155,12 @@ namespace dp { typename IteratorType = typename std::iterator_traits::value_type> requires std::input_iterator && std::invocable && std::is_same_v> - void enqueue(Iterator begin, Iterator end) { + void enqueue(Iterator &&begin, Iterator &&end) { // simple range check if (begin >= end) return; // enqueue all the tasks - enqueue_tasks(begin, end); + enqueue_tasks(std::forward(begin), std::forward(end)); } /** @@ -207,16 +207,24 @@ namespace dp { } private: + template + [[maybe_unused]] auto assign_task_to_thread(Function &&f) -> std::size_t { + tasks_[index_].tasks.push(std::forward(f)); + const auto &i = index_; + if (++index_ >= tasks_.size()) index_ = 0; + return i; + } + template void enqueue_task(Function &&f) { - const std::size_t i = count_++ % tasks_.size(); pending_tasks_.fetch_add(1, std::memory_order_relaxed); - tasks_[i].tasks.push(std::forward(f)); - tasks_[i].signal.release(); + const auto &assigned_index = assign_task_to_thread(std::forward(f)); + tasks_[assigned_index].signal.release(); } - template - void enqueue_tasks(Iterator begin, Iterator end) { + template ::value_type> + void enqueue_tasks(Iterator &&begin, Iterator &&end) { // get the count of tasks const auto &tasks = std::distance(begin, end); pending_tasks_.fetch_add(tasks, std::memory_order_relaxed); @@ -224,24 +232,26 @@ namespace dp { // get the number of threads once and re-use const auto &task_size = tasks_.size(); - // split tasks among all threads, go through tasks [begin, end) - auto i = count_++ % task_size; - // start index of where we're adding tasks - const auto &start = i; + std::optional start = std::nullopt; // total count of threads we need to wake up. const auto &count = ::std::min(tasks, task_size); for (auto it = begin; it < end; ++it) { // push the task - tasks_[i].tasks.push(std::move(*it)); - // recalculate the index - i = count_++ % task_size; + const auto &assigned_index = assign_task_to_thread(std::move([f = *it]() { + // suppress exceptions + try { + std::invoke(f); + } catch (...) { + } + })); + if (!start) start = assigned_index; } // release all the needed signals to wake all threads for (std::size_t j = 0; j < count; j++) { - const auto &index = (start + j) % task_size; + const auto &index = (start.value() + j) % task_size; tasks_[index].signal.release(); } } @@ -253,7 +263,7 @@ namespace dp { std::vector threads_; std::deque tasks_; - std::size_t count_{}; + std::uint_fast8_t index_{0}; std::atomic_int_fast64_t pending_tasks_{}; }; From dceb94fb0620984c314f75e045c305b681fb6b2b Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:11 -0400 Subject: [PATCH 12/34] Further safety for function invocation in thread --- include/thread_pool/thread_pool.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 1bc7918..04fa83f 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -47,8 +47,8 @@ namespace dp { do { // invoke the task while (auto task = tasks_[id].tasks.pop()) { + pending_tasks_.fetch_sub(1, std::memory_order_release); try { - pending_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); } catch (...) { } @@ -58,9 +58,12 @@ namespace dp { for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); if (auto task = tasks_[index].tasks.steal()) { - // steal a task pending_tasks_.fetch_sub(1, std::memory_order_release); - std::invoke(std::move(task.value())); + try { + // invoke the stolen task + std::invoke(std::move(task.value())); + } catch (...) { + } // stop stealing once we have invoked a stolen task break; } From 0b50e1fb38e45fc5d925f3d420aba6bbad987aa6 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:48 -0400 Subject: [PATCH 13/34] Make schedule() nodiscard --- include/thread_pool/thread_pool.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 04fa83f..1f8e257 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -191,7 +191,7 @@ namespace dp { /** * @brief Allows you to schedule coroutines to run on the thread pool. */ - auto schedule() { + [[nodiscard]] auto schedule() { /// @brief Simple awaitable type that we can return. struct scheduled_operation { dp::thread_pool<> *thread_pool_; @@ -202,7 +202,6 @@ namespace dp { handle); } } - static void await_resume() {} }; From adce8c81b37890cf49389b91986a54c5f31db658 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:56 -0400 Subject: [PATCH 14/34] Documentation update --- include/thread_pool/thread_pool.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 1f8e257..840cdf5 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -14,6 +14,7 @@ #include "thread_pool/thread_safe_queue.h" namespace dp { + namespace details { // leave clang detection out for now as there is not support for std::move_only_function #if defined(__GNUC__) && !defined(__clang_major__) @@ -258,6 +259,9 @@ namespace dp { } } + /** + * @brief Task item that holds list of tasks and signal semaphore for each thread. + */ struct task_item { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; From 136faeed4e4776631264b982c93d8f74fea7d994 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:13:13 -0400 Subject: [PATCH 15/34] Update to Clang 14 on CI --- .github/workflows/ubuntu.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 9e76e53..7f3261e 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -33,7 +33,7 @@ jobs: - name: Set up Clang uses: egor-tensin/setup-clang@v1 with: - version: 13 + version: 14 platform: x64 - name: set up GCC From 2f7116261a3917ae7698460db6b3ea42548a0bf2 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:22:09 -0400 Subject: [PATCH 16/34] Formatting updates --- .cmake-format | 1 + benchmark/CMakeLists.txt | 7 ++----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.cmake-format b/.cmake-format index c111c97..323ac76 100644 --- a/.cmake-format +++ b/.cmake-format @@ -2,6 +2,7 @@ format: tab_size: 4 line_width: 100 dangle_parens: true + max_pargs_hwrap: 4 parse: additional_commands: diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 05b08c0..7fd33e1 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -12,11 +12,8 @@ CPMAddPackage( NAME benchmark GITHUB_REPOSITORY google/benchmark VERSION 1.6.1 - OPTIONS - "BENCHMARK_ENABLE_TESTING OFF" - "CMAKE_CROSSCOMPILING ON" - "HAVE_STD_REGEX OFF" - "HAVE_POSIX_REGEX OFF" + OPTIONS "BENCHMARK_ENABLE_TESTING OFF" "CMAKE_CROSSCOMPILING ON" "HAVE_STD_REGEX OFF" + "HAVE_POSIX_REGEX OFF" ) CPMAddPackage( From 67c4c65ff739b7c06a78f9f54bd2c2baec8560ee Mon Sep 17 00:00:00 2001 From: Paul T Date: Tue, 27 Sep 2022 11:09:27 -0400 Subject: [PATCH 17/34] Add badges to README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 23c7712..1ed349e 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ thread-pool [![say thanks](https://img.shields.io/badge/Say%20Thanks-👍-1EAEDB.svg)](https://github.com/DeveloperPaul123/periodic-function/stargazers) [![Discord](https://img.shields.io/discord/652515194572111872)](https://discord.gg/CX2ybByRnt) +![License](https://img.shields.io/github/license/DeveloperPaul123/thread-pool?color=blue) +![Release](https://img.shields.io/github/v/release/DeveloperPaul123/thread-pool) [![Ubuntu](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/ubuntu.yml/badge.svg)](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/ubuntu.yml) [![Windows](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/windows.yml/badge.svg)](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/windows.yml) From 28c66097a951cf59e60988b2a7bfb7322781f2e0 Mon Sep 17 00:00:00 2001 From: yangli Date: Sun, 2 Oct 2022 14:38:42 -0500 Subject: [PATCH 18/34] use feature test macro to check feature available --- .gitignore | 2 ++ README.md | 20 ++++++++++---------- include/thread_pool/thread_pool.h | 13 ++++--------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index cee2d7e..f857967 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ .vs out CMakeFiles +.idea +cmake-build* diff --git a/README.md b/README.md index 1ed349e..a89d9f7 100644 --- a/README.md +++ b/README.md @@ -82,12 +82,12 @@ See the `./benchmark` folder for the benchmark code. The benchmarks are set up t Matrix sizes are all square (MxM). Each multiplication is `(MxM) * (MxM)` where `*` refers to a matrix multiplication operation. Times recorded were the best of at least 3 runs. | Matrix Size | Number of multiplications | `std::async` time (ms) | `dp::thread_pool` time (ms) | -|:---:|:---:|:---:|:---:| -| 8 | 25,000 | 77.9 | 65.3 | -| 64 | 5,000 | 100 | 65.2 | -| 256 | 250 | 295 | 59.2 | -| 512 | 75 | 713 | 60.4 | -| 1024 | 10 | 1160 | 55.8 | +|:-----------:|:-------------------------:|:----------------------:|:---------------------------:| +| 8 | 25,000 | 77.9 | 65.3 | +| 64 | 5,000 | 100 | 65.2 | +| 256 | 250 | 295 | 59.2 | +| 512 | 75 | 713 | 60.4 | +| 1024 | 10 | 1160 | 55.8 | ## Building @@ -107,10 +107,10 @@ cmake --build build ### Build Options -| Option | Description | Default | -|:-------|:------------|:--------:| -| `TP_BUILD_TESTS` | Turn on to build unit tests. Required for formatting build targets. | ON | -| `TP_BUILD_EXAMPLES` | Turn on to build examples | ON | +| Option | Description | Default | +|:--------------------|:--------------------------------------------------------------------|:-------:| +| `TP_BUILD_TESTS` | Turn on to build unit tests. Required for formatting build targets. | ON | +| `TP_BUILD_EXAMPLES` | Turn on to build examples | ON | ### Run clang-format diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index c296fc8..ab475d3 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -9,19 +9,14 @@ #include #include #include +#include #include "thread_pool/thread_safe_queue.h" namespace dp { namespace details { - // leave clang detection out for now as there is not support for std::move_only_function -#if defined(__GNUC__) && !defined(__clang_major__) -# define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) -#else -# define GCC_VERSION 0 -#endif -#if (defined(_MSC_VER) && (_MSC_VER >= 1930) && (_MSVC_LANG > 202002L)) || (GCC_VERSION >= 120000) -# define TP_HAS_MOVE_ONLY_FUNCTION_SUPPORT + +#if __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else using default_function_type = std::function; @@ -100,7 +95,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#ifdef TP_HAS_MOVE_ONLY_FUNCTION_SUPPORT +#if __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); From e7dc22c8194b6e95a2fa98e031c6a73e5f638087 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:03:01 -0400 Subject: [PATCH 19/34] Update CPM version --- cmake/CPM.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 59208e4..6aacba2 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.32.0) +set(CPM_DOWNLOAD_VERSION 0.35.5) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) From bd3986d8c53efa165345520c27762154207530a7 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:03:45 -0400 Subject: [PATCH 20/34] Add basic support for running coroutines on pool Added a basic awaitable type and function to allow for running coroutines on the thread pool --- include/thread_pool/thread_pool.h | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index ab475d3..ec4c3f6 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -160,6 +161,26 @@ namespace dp { })); } + /** + * @brief Allows you to schedule coroutines to run on the thread pool. + */ + auto schedule() { + /// @brief Simple awaitable type that we can return. + struct scheduled_operation { + dp::thread_pool<> *thread_pool_; + bool await_ready() { return false; }; + void await_suspend(std::coroutine_handle<> handle) { + if (thread_pool_) { + thread_pool_->enqueue_detach([](std::coroutine_handle<> h) { h.resume(); }, + handle); + } + } + void await_resume() {} + }; + + return scheduled_operation{this}; + } + private: template void enqueue_task(Function &&f) { From efe73867e95f3ffab72245ab386e746aaa7b6496 Mon Sep 17 00:00:00 2001 From: Paul T Date: Fri, 19 Aug 2022 17:04:14 -0400 Subject: [PATCH 21/34] WIP: Add coroutine thread pool benchmark --- benchmark/CMakeLists.txt | 8 ++++++- benchmark/include/utilities.h | 10 ++++++++ benchmark/source/thread_pool.cpp | 41 +++++++++++++++++++++++++++++--- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 6205011..2c62c7d 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -15,6 +15,12 @@ CPMAddPackage( OPTIONS "BENCHMARK_ENABLE_TESTING OFF" ) +CPMAddPackage( + NAME cppcoro + GITHUB_REPOSITORY andreasbuhr/cppcoro + GIT_TAG 10bbcdbf2be3ad3aa56febcf4c7662d771460a99 +) + if(benchmark_ADDED) # patch benchmark target set_target_properties(benchmark PROPERTIES CXX_STANDARD 20) @@ -26,7 +32,7 @@ file(GLOB sources CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/source/*.cpp) file(GLOB headers CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/include/*.h) add_executable(${PROJECT_NAME} ${headers} ${sources}) -target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool) +target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool cppcoro) set_target_properties(${PROJECT_NAME} PROPERTIES CXX_STANDARD 20) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/benchmark/include/utilities.h b/benchmark/include/utilities.h index c16969a..cfee63d 100644 --- a/benchmark/include/utilities.h +++ b/benchmark/include/utilities.h @@ -2,11 +2,14 @@ #include #include +#include #include #include #include #include +#include "thread_pool/thread_pool.h" + inline std::size_t index(std::size_t row, std::size_t col, std::size_t width) { return row * width + col; } @@ -22,6 +25,12 @@ inline void multiply_array(std::span a, std::span b, std::span re } } +inline cppcoro::task<> co_multiply_array(std::span a, std::span b, std::span result, + dp::thread_pool<>& pool) { + co_await pool.schedule(); + multiply_array(a, b, result); +} + template using multiplication_pair = std::pair, std::vector>; @@ -30,6 +39,7 @@ template const std::int64_t& array_size, const std::int64_t& number_of_multiplications) { static std::uniform_int_distribution distribution(std::numeric_limits::min(), std::numeric_limits::max()); + // yes, predictable values static std::default_random_engine generator{}; std::vector> computations; diff --git a/benchmark/source/thread_pool.cpp b/benchmark/source/thread_pool.cpp index 40676d7..333efe4 100644 --- a/benchmark/source/thread_pool.cpp +++ b/benchmark/source/thread_pool.cpp @@ -1,14 +1,18 @@ #include #include +#include +#include + #include "utilities.h" -static void BM_array_multiplication(benchmark::State& state) { +static void BM_array_multiplication_thread_pool(benchmark::State& state) { const auto array_size = state.range(0); const std::size_t multiplications_to_perform = state.range(1); // generate the data - const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + const auto computations = generate_benchmark_data(array_size, + multiplications_to_perform); // task that is run on a new thread auto thread_task = [](multiplication_pair pair) { @@ -28,7 +32,7 @@ static void BM_array_multiplication(benchmark::State& state) { } } -BENCHMARK(BM_array_multiplication) +BENCHMARK(BM_array_multiplication_thread_pool) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) @@ -39,3 +43,34 @@ BENCHMARK(BM_array_multiplication) ->MeasureProcessCPUTime() ->UseRealTime() ->Name("dp::thread_pool array mult"); + +inline cppcoro::task<> mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { + std::vector result(pair.first.size()); + co_await co_multiply_array(pair.first, pair.second, result, pool); +} + +static void BM_array_multiplication_thread_pool_coroutine(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + dp::thread_pool pool{}; + std::vector> tasks; + for (auto _ : state) { + for (auto mult_pair : computations) { + tasks.emplace_back(mult_task(mult_pair, pool)); + } + } + cppcoro::sync_wait(cppcoro::when_all(std::move(tasks))); +} + +BENCHMARK(BM_array_multiplication_thread_pool_coroutine) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool coroutine array mult"); From 48ebc32dd2ed646211e45c115e656eacc1207a1e Mon Sep 17 00:00:00 2001 From: Paul T Date: Tue, 13 Sep 2022 11:54:56 -0400 Subject: [PATCH 22/34] Add bulk enqueue function Add ability to enqueue a range of void() functions --- CMakeLists.txt | 5 ++- include/thread_pool/thread_pool.h | 57 +++++++++++++++++++++++++++++-- test/source/thread_pool.cpp | 21 ++++++++++++ 3 files changed, 77 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8228675..05a0d35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,5 @@ cmake_minimum_required(VERSION 3.14 FATAL_ERROR) -# ---- Project ---- - -# Note: update this to your new project's name and version project( ThreadPool VERSION 0.4.1 @@ -60,6 +57,8 @@ target_link_libraries(${PROJECT_NAME} INTERFACE Threads::Threads) # being a cross-platform target, we enforce standards conformance on MSVC target_compile_options(${PROJECT_NAME} INTERFACE "$<$:/permissive->") +target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines>") +target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines-ts>") target_include_directories( ${PROJECT_NAME} INTERFACE $ diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index ec4c3f6..31c14ba 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -139,6 +139,25 @@ namespace dp { #endif } + /** + * @brief Enqueue a list of tasks into the thread pool. + * @tparam Iterator An iterator type + * @tparam IteratorType The underlying value type of the iterator + * @param begin The start of the task range + * @param end The end of the task range + */ + template ::value_type> + requires std::input_iterator && std::invocable && + std::is_same_v> + void enqueue(Iterator begin, Iterator end) { + // simple range check + if (begin >= end) return; + + // enqueue all the tasks + enqueue_tasks(begin, end); + } + /** * @brief Enqueue a task to be executed in the thread pool that returns void. * @tparam Function An invokable type. @@ -162,20 +181,21 @@ namespace dp { } /** - * @brief Allows you to schedule coroutines to run on the thread pool. + * @brief Allows you to schedule coroutines to run on the thread pool. */ auto schedule() { /// @brief Simple awaitable type that we can return. struct scheduled_operation { dp::thread_pool<> *thread_pool_; - bool await_ready() { return false; }; + static bool await_ready() { return false; } void await_suspend(std::coroutine_handle<> handle) { if (thread_pool_) { thread_pool_->enqueue_detach([](std::coroutine_handle<> h) { h.resume(); }, handle); } } - void await_resume() {} + + static void await_resume() {} }; return scheduled_operation{this}; @@ -190,6 +210,37 @@ namespace dp { tasks_[i].signal.release(); } + template + void enqueue_tasks(Iterator begin, Iterator end) { + // get the count of tasks + const auto &tasks = std::distance(begin, end); + pending_tasks_.fetch_add(tasks, std::memory_order_relaxed); + + // get the number of threads once and re-use + const auto &task_size = tasks_.size(); + + // split tasks among all threads, go through tasks [begin, end) + auto i = count_++ % task_size; + + // start index of where we're adding tasks + const auto &start = i; + // total count of threads we need to wake up. + const auto &count = ::std::min(tasks, task_size); + + for (auto it = begin; it < end; ++it) { + // push the task + tasks_[i].tasks.push(std::move(*it)); + // recalculate the index + i = count_++ % task_size; + } + + // release all the needed signals to wake all threads + for (std::size_t j = 0; j < count; j++) { + const auto &index = (start + j) % task_size; + tasks_[index].signal.release(); + } + } + struct task_item { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 5076989..5c8d80d 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -157,3 +157,24 @@ TEST_CASE("Ensure task exception doesn't kill worker thread") { CHECK_EQ(count.load(), 1); } + +TEST_CASE("Ensure work completes upon destruction when batching tasks") { + std::atomic counter; + constexpr auto total_tasks = 30; + { + dp::thread_pool pool(4); + std::vector> tasks; + tasks.reserve(total_tasks); + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + ++counter; + }; + tasks.emplace_back(task); + } + + pool.enqueue(tasks.begin(), tasks.end()); + } + + CHECK_EQ(counter.load(), total_tasks); +} From abf4559be34064049fb903859a12097bf21bfe33 Mon Sep 17 00:00:00 2001 From: Paul T Date: Tue, 13 Sep 2022 12:03:07 -0400 Subject: [PATCH 23/34] Do not build benchmarks for style check CI --- .github/workflows/style.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml index 82fd25c..a4baa0d 100644 --- a/.github/workflows/style.yml +++ b/.github/workflows/style.yml @@ -29,7 +29,7 @@ jobs: pip3 install cmake_format==0.6.11 pyyaml - name: configure - run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF + run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF - name: check style run: cmake --build build --target check-format From 661b54c834b2eb398c55e1f7068d9c6a0bf1f84e Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 11:33:15 -0400 Subject: [PATCH 24/34] Add ccache support --- CMakeLists.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 05a0d35..f4bfcc8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,9 +16,14 @@ if(PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR) ) endif() +find_program(CCACHE_EXE ccache) +if(EXISTS ${CCACHE_EXE}) + message(STATUS "Found ccache ${CCACHE_EXE}") + set(CMAKE_CXX_COMPILER_LAUNCHER ${CCACHE_EXE}) +endif() + # ---- Add dependencies via CPM ---- # see https://github.com/TheLartians/CPM.cmake for more info - include(cmake/CPM.cmake) # PackageProject.cmake will be used to make our target installable From c9b6e58d91291a780a37201ba256bebf26741fc1 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:05:35 -0400 Subject: [PATCH 25/34] Try to correct errors in tests --- test/source/thread_pool.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 5c8d80d..1556933 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -2,6 +2,7 @@ #include #include +#include #include auto multiply(int a, int b) { return a * b; } @@ -130,19 +131,19 @@ TEST_CASE("Ensure task load is spread evenly across threads") { } TEST_CASE("Ensure task exception doesn't kill worker thread") { - auto throw_task = [](int) -> int { throw std::logic_error("Error occurred."); }; + auto throw_task = [](int) -> int { throw std::logic_error(std::string("Error occurred.")); }; auto regular_task = [](int input) -> int { return input * 2; }; std::atomic_uint_fast64_t count(0); - auto throw_no_return = []() { throw std::logic_error("Error occurred."); }; + auto throw_no_return = []() { throw std::logic_error(std::string("Error occurred.")); }; auto no_throw_no_return = [&count]() { std::this_thread::sleep_for(std::chrono::seconds(1)); count += 1; }; { - dp::thread_pool pool; + dp::thread_pool pool(4); auto throw_future = pool.enqueue(throw_task, 1); auto no_throw_future = pool.enqueue(regular_task, 2); From 8df59653439eb6e50006736d1b4a400ae97ab26c Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:05:46 -0400 Subject: [PATCH 26/34] Another CPM update --- cmake/CPM.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 6aacba2..bacb192 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.35.5) +set(CPM_DOWNLOAD_VERSION 0.35.6) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) From 463b9e8b874f91e370cc37659d1dfd1df93157b7 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:06:42 -0400 Subject: [PATCH 27/34] Add benchmark for batched task execution Also changed benchmarks so that only 1 runs in debug mode for the dp::thread_pool --- benchmark/source/thread_pool.cpp | 60 ++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/benchmark/source/thread_pool.cpp b/benchmark/source/thread_pool.cpp index 333efe4..5001315 100644 --- a/benchmark/source/thread_pool.cpp +++ b/benchmark/source/thread_pool.cpp @@ -11,8 +11,7 @@ static void BM_array_multiplication_thread_pool(benchmark::State& state) { const std::size_t multiplications_to_perform = state.range(1); // generate the data - const auto computations = generate_benchmark_data(array_size, - multiplications_to_perform); + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); // task that is run on a new thread auto thread_task = [](multiplication_pair pair) { @@ -33,18 +32,68 @@ static void BM_array_multiplication_thread_pool(benchmark::State& state) { } BENCHMARK(BM_array_multiplication_thread_pool) +#if defined(NDEBUG) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) ->Args({512, 75}) ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif ->Unit(benchmark::kMillisecond) ->ReportAggregatesOnly(true) ->MeasureProcessCPUTime() ->UseRealTime() ->Name("dp::thread_pool array mult"); -inline cppcoro::task<> mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { +static void BM_array_multiplication_batch_thread_pool(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + + // generate the data + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + + // create our tasks + std::vector> tasks{}; + tasks.reserve(computations.size()); + + // task that is run on a new thread + auto thread_task = [](multiplication_pair pair) { + std::vector result(pair.first.size()); + multiply_array(pair.first, pair.second, result); + }; + + for (const auto& computation : computations) { + auto task = [comp = computation, execution_task = thread_task]() { execution_task(comp); }; + tasks.emplace_back(task); + } + + // create our thread pool using the default size + dp::thread_pool pool{}; + + for (auto _ : state) { + pool.enqueue(tasks.begin(), tasks.end()); + } +} + +BENCHMARK(BM_array_multiplication_batch_thread_pool) +#if defined(NDEBUG) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool batched array mult"); + +inline cppcoro::task mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { std::vector result(pair.first.size()); co_await co_multiply_array(pair.first, pair.second, result, pool); } @@ -60,15 +109,20 @@ static void BM_array_multiplication_thread_pool_coroutine(benchmark::State& stat tasks.emplace_back(mult_task(mult_pair, pool)); } } + cppcoro::sync_wait(cppcoro::when_all(std::move(tasks))); } BENCHMARK(BM_array_multiplication_thread_pool_coroutine) +#if defined(NDEBUG) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) ->Args({512, 75}) ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif ->Unit(benchmark::kMillisecond) ->ReportAggregatesOnly(true) ->MeasureProcessCPUTime() From 3d3c35e905a9bc50c35eed56aa422f6bd333524b Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:07:19 -0400 Subject: [PATCH 28/34] Build fixes for coroutines and clang --- CMakeLists.txt | 6 +++++- benchmark/CMakeLists.txt | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f4bfcc8..035eb00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,11 @@ target_link_libraries(${PROJECT_NAME} INTERFACE Threads::Threads) # being a cross-platform target, we enforce standards conformance on MSVC target_compile_options(${PROJECT_NAME} INTERFACE "$<$:/permissive->") target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines>") -target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines-ts>") + +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libstdc++") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lc++") +endif() target_include_directories( ${PROJECT_NAME} INTERFACE $ diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 2c62c7d..05b08c0 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -12,7 +12,11 @@ CPMAddPackage( NAME benchmark GITHUB_REPOSITORY google/benchmark VERSION 1.6.1 - OPTIONS "BENCHMARK_ENABLE_TESTING OFF" + OPTIONS + "BENCHMARK_ENABLE_TESTING OFF" + "CMAKE_CROSSCOMPILING ON" + "HAVE_STD_REGEX OFF" + "HAVE_POSIX_REGEX OFF" ) CPMAddPackage( From 6eb7c36330143f4437dc78bc9dff63e08282cdb9 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:11:24 -0400 Subject: [PATCH 29/34] Improve batched task execution and remove modulus Removed modulus calculation for assigning tasks to thread queues. --- include/thread_pool/thread_pool.h | 42 +++++++++++++++++++------------ 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 31c14ba..851b57d 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -150,12 +150,12 @@ namespace dp { typename IteratorType = typename std::iterator_traits::value_type> requires std::input_iterator && std::invocable && std::is_same_v> - void enqueue(Iterator begin, Iterator end) { + void enqueue(Iterator &&begin, Iterator &&end) { // simple range check if (begin >= end) return; // enqueue all the tasks - enqueue_tasks(begin, end); + enqueue_tasks(std::forward(begin), std::forward(end)); } /** @@ -202,16 +202,24 @@ namespace dp { } private: + template + [[maybe_unused]] auto assign_task_to_thread(Function &&f) -> std::size_t { + tasks_[index_].tasks.push(std::forward(f)); + const auto &i = index_; + if (++index_ >= tasks_.size()) index_ = 0; + return i; + } + template void enqueue_task(Function &&f) { - const std::size_t i = count_++ % tasks_.size(); pending_tasks_.fetch_add(1, std::memory_order_relaxed); - tasks_[i].tasks.push(std::forward(f)); - tasks_[i].signal.release(); + const auto &assigned_index = assign_task_to_thread(std::forward(f)); + tasks_[assigned_index].signal.release(); } - template - void enqueue_tasks(Iterator begin, Iterator end) { + template ::value_type> + void enqueue_tasks(Iterator &&begin, Iterator &&end) { // get the count of tasks const auto &tasks = std::distance(begin, end); pending_tasks_.fetch_add(tasks, std::memory_order_relaxed); @@ -219,24 +227,26 @@ namespace dp { // get the number of threads once and re-use const auto &task_size = tasks_.size(); - // split tasks among all threads, go through tasks [begin, end) - auto i = count_++ % task_size; - // start index of where we're adding tasks - const auto &start = i; + std::optional start = std::nullopt; // total count of threads we need to wake up. const auto &count = ::std::min(tasks, task_size); for (auto it = begin; it < end; ++it) { // push the task - tasks_[i].tasks.push(std::move(*it)); - // recalculate the index - i = count_++ % task_size; + const auto &assigned_index = assign_task_to_thread(std::move([f = *it]() { + // suppress exceptions + try { + std::invoke(f); + } catch (...) { + } + })); + if (!start) start = assigned_index; } // release all the needed signals to wake all threads for (std::size_t j = 0; j < count; j++) { - const auto &index = (start + j) % task_size; + const auto &index = (start.value() + j) % task_size; tasks_[index].signal.release(); } } @@ -248,7 +258,7 @@ namespace dp { std::vector threads_; std::deque tasks_; - std::size_t count_{}; + std::uint_fast8_t index_{0}; std::atomic_int_fast64_t pending_tasks_{}; }; From ede27fa4e4d7668fa71e8ba1426932a4ca4cea69 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:11 -0400 Subject: [PATCH 30/34] Further safety for function invocation in thread --- include/thread_pool/thread_pool.h | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 851b57d..8029f76 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -42,8 +42,8 @@ namespace dp { do { // invoke the task while (auto task = tasks_[id].tasks.pop()) { + pending_tasks_.fetch_sub(1, std::memory_order_release); try { - pending_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); } catch (...) { } @@ -53,9 +53,12 @@ namespace dp { for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); if (auto task = tasks_[index].tasks.steal()) { - // steal a task pending_tasks_.fetch_sub(1, std::memory_order_release); - std::invoke(std::move(task.value())); + try { + // invoke the stolen task + std::invoke(std::move(task.value())); + } catch (...) { + } // stop stealing once we have invoked a stolen task break; } From bf8d3a1cfe8273692b7e4aeaf12b0899e68cfc4e Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:48 -0400 Subject: [PATCH 31/34] Make schedule() nodiscard --- include/thread_pool/thread_pool.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 8029f76..39db5f6 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -186,7 +186,7 @@ namespace dp { /** * @brief Allows you to schedule coroutines to run on the thread pool. */ - auto schedule() { + [[nodiscard]] auto schedule() { /// @brief Simple awaitable type that we can return. struct scheduled_operation { dp::thread_pool<> *thread_pool_; @@ -197,7 +197,6 @@ namespace dp { handle); } } - static void await_resume() {} }; From 1e17b2d11fbf24c38e2514870d1299ad98873e78 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:12:56 -0400 Subject: [PATCH 32/34] Documentation update --- include/thread_pool/thread_pool.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index 39db5f6..2e78b35 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -15,6 +15,7 @@ #include "thread_pool/thread_safe_queue.h" namespace dp { + namespace details { #if __cpp_lib_move_only_function @@ -253,6 +254,9 @@ namespace dp { } } + /** + * @brief Task item that holds list of tasks and signal semaphore for each thread. + */ struct task_item { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; From 6cea89ebc0e0b47917e5deb8bbbb120d25e596c6 Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:13:13 -0400 Subject: [PATCH 33/34] Update to Clang 14 on CI --- .github/workflows/ubuntu.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 9e76e53..7f3261e 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -33,7 +33,7 @@ jobs: - name: Set up Clang uses: egor-tensin/setup-clang@v1 with: - version: 13 + version: 14 platform: x64 - name: set up GCC From 4e910b807fcbbd3ea5f7f31f7c9a65ff9e91471c Mon Sep 17 00:00:00 2001 From: Paul T Date: Thu, 15 Sep 2022 12:22:09 -0400 Subject: [PATCH 34/34] Formatting updates --- .cmake-format | 1 + benchmark/CMakeLists.txt | 7 ++----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.cmake-format b/.cmake-format index c111c97..323ac76 100644 --- a/.cmake-format +++ b/.cmake-format @@ -2,6 +2,7 @@ format: tab_size: 4 line_width: 100 dangle_parens: true + max_pargs_hwrap: 4 parse: additional_commands: diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 05b08c0..7fd33e1 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -12,11 +12,8 @@ CPMAddPackage( NAME benchmark GITHUB_REPOSITORY google/benchmark VERSION 1.6.1 - OPTIONS - "BENCHMARK_ENABLE_TESTING OFF" - "CMAKE_CROSSCOMPILING ON" - "HAVE_STD_REGEX OFF" - "HAVE_POSIX_REGEX OFF" + OPTIONS "BENCHMARK_ENABLE_TESTING OFF" "CMAKE_CROSSCOMPILING ON" "HAVE_STD_REGEX OFF" + "HAVE_POSIX_REGEX OFF" ) CPMAddPackage(