diff --git a/.cmake-format b/.cmake-format index c111c97..323ac76 100644 --- a/.cmake-format +++ b/.cmake-format @@ -2,6 +2,7 @@ format: tab_size: 4 line_width: 100 dangle_parens: true + max_pargs_hwrap: 4 parse: additional_commands: diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml index 82fd25c..a4baa0d 100644 --- a/.github/workflows/style.yml +++ b/.github/workflows/style.yml @@ -29,7 +29,7 @@ jobs: pip3 install cmake_format==0.6.11 pyyaml - name: configure - run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF + run: cmake -G Ninja -S . -B build -DTP_BUILD_EXAMPLES=OFF -DTP_BUILD_BENCHMARKS=OFF - name: check style run: cmake --build build --target check-format diff --git a/.github/workflows/ubuntu.yml b/.github/workflows/ubuntu.yml index 9e76e53..7f3261e 100644 --- a/.github/workflows/ubuntu.yml +++ b/.github/workflows/ubuntu.yml @@ -33,7 +33,7 @@ jobs: - name: Set up Clang uses: egor-tensin/setup-clang@v1 with: - version: 13 + version: 14 platform: x64 - name: set up GCC diff --git a/.gitignore b/.gitignore index cee2d7e..f857967 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ .vs out CMakeFiles +.idea +cmake-build* diff --git a/CMakeLists.txt b/CMakeLists.txt index 8228675..035eb00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,5 @@ cmake_minimum_required(VERSION 3.14 FATAL_ERROR) -# ---- Project ---- - -# Note: update this to your new project's name and version project( ThreadPool VERSION 0.4.1 @@ -19,9 +16,14 @@ if(PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR) ) endif() +find_program(CCACHE_EXE ccache) +if(EXISTS ${CCACHE_EXE}) + message(STATUS "Found ccache ${CCACHE_EXE}") + set(CMAKE_CXX_COMPILER_LAUNCHER ${CCACHE_EXE}) +endif() + # ---- Add dependencies via CPM ---- # see https://github.com/TheLartians/CPM.cmake for more info - include(cmake/CPM.cmake) # PackageProject.cmake will be used to make our target installable @@ -60,6 +62,12 @@ target_link_libraries(${PROJECT_NAME} INTERFACE Threads::Threads) # being a cross-platform target, we enforce standards conformance on MSVC target_compile_options(${PROJECT_NAME} INTERFACE "$<$:/permissive->") +target_compile_options(${PROJECT_NAME} INTERFACE "$<$:-fcoroutines>") + +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libstdc++") + set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lc++") +endif() target_include_directories( ${PROJECT_NAME} INTERFACE $ diff --git a/README.md b/README.md index 23c7712..a89d9f7 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,8 @@ thread-pool [![say thanks](https://img.shields.io/badge/Say%20Thanks-👍-1EAEDB.svg)](https://github.com/DeveloperPaul123/periodic-function/stargazers) [![Discord](https://img.shields.io/discord/652515194572111872)](https://discord.gg/CX2ybByRnt) +![License](https://img.shields.io/github/license/DeveloperPaul123/thread-pool?color=blue) +![Release](https://img.shields.io/github/v/release/DeveloperPaul123/thread-pool) [![Ubuntu](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/ubuntu.yml/badge.svg)](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/ubuntu.yml) [![Windows](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/windows.yml/badge.svg)](https://github.com/DeveloperPaul123/thread-pool/actions/workflows/windows.yml) @@ -80,12 +82,12 @@ See the `./benchmark` folder for the benchmark code. The benchmarks are set up t Matrix sizes are all square (MxM). Each multiplication is `(MxM) * (MxM)` where `*` refers to a matrix multiplication operation. Times recorded were the best of at least 3 runs. | Matrix Size | Number of multiplications | `std::async` time (ms) | `dp::thread_pool` time (ms) | -|:---:|:---:|:---:|:---:| -| 8 | 25,000 | 77.9 | 65.3 | -| 64 | 5,000 | 100 | 65.2 | -| 256 | 250 | 295 | 59.2 | -| 512 | 75 | 713 | 60.4 | -| 1024 | 10 | 1160 | 55.8 | +|:-----------:|:-------------------------:|:----------------------:|:---------------------------:| +| 8 | 25,000 | 77.9 | 65.3 | +| 64 | 5,000 | 100 | 65.2 | +| 256 | 250 | 295 | 59.2 | +| 512 | 75 | 713 | 60.4 | +| 1024 | 10 | 1160 | 55.8 | ## Building @@ -105,10 +107,10 @@ cmake --build build ### Build Options -| Option | Description | Default | -|:-------|:------------|:--------:| -| `TP_BUILD_TESTS` | Turn on to build unit tests. Required for formatting build targets. | ON | -| `TP_BUILD_EXAMPLES` | Turn on to build examples | ON | +| Option | Description | Default | +|:--------------------|:--------------------------------------------------------------------|:-------:| +| `TP_BUILD_TESTS` | Turn on to build unit tests. Required for formatting build targets. | ON | +| `TP_BUILD_EXAMPLES` | Turn on to build examples | ON | ### Run clang-format diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 6205011..7fd33e1 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -12,7 +12,14 @@ CPMAddPackage( NAME benchmark GITHUB_REPOSITORY google/benchmark VERSION 1.6.1 - OPTIONS "BENCHMARK_ENABLE_TESTING OFF" + OPTIONS "BENCHMARK_ENABLE_TESTING OFF" "CMAKE_CROSSCOMPILING ON" "HAVE_STD_REGEX OFF" + "HAVE_POSIX_REGEX OFF" +) + +CPMAddPackage( + NAME cppcoro + GITHUB_REPOSITORY andreasbuhr/cppcoro + GIT_TAG 10bbcdbf2be3ad3aa56febcf4c7662d771460a99 ) if(benchmark_ADDED) @@ -26,7 +33,7 @@ file(GLOB sources CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/source/*.cpp) file(GLOB headers CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/include/*.h) add_executable(${PROJECT_NAME} ${headers} ${sources}) -target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool) +target_link_libraries(${PROJECT_NAME} benchmark ThreadPool::ThreadPool cppcoro) set_target_properties(${PROJECT_NAME} PROPERTIES CXX_STANDARD 20) target_include_directories(${PROJECT_NAME} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) diff --git a/benchmark/include/utilities.h b/benchmark/include/utilities.h index c16969a..cfee63d 100644 --- a/benchmark/include/utilities.h +++ b/benchmark/include/utilities.h @@ -2,11 +2,14 @@ #include #include +#include #include #include #include #include +#include "thread_pool/thread_pool.h" + inline std::size_t index(std::size_t row, std::size_t col, std::size_t width) { return row * width + col; } @@ -22,6 +25,12 @@ inline void multiply_array(std::span a, std::span b, std::span re } } +inline cppcoro::task<> co_multiply_array(std::span a, std::span b, std::span result, + dp::thread_pool<>& pool) { + co_await pool.schedule(); + multiply_array(a, b, result); +} + template using multiplication_pair = std::pair, std::vector>; @@ -30,6 +39,7 @@ template const std::int64_t& array_size, const std::int64_t& number_of_multiplications) { static std::uniform_int_distribution distribution(std::numeric_limits::min(), std::numeric_limits::max()); + // yes, predictable values static std::default_random_engine generator{}; std::vector> computations; diff --git a/benchmark/source/thread_pool.cpp b/benchmark/source/thread_pool.cpp index 40676d7..5001315 100644 --- a/benchmark/source/thread_pool.cpp +++ b/benchmark/source/thread_pool.cpp @@ -1,9 +1,12 @@ #include #include +#include +#include + #include "utilities.h" -static void BM_array_multiplication(benchmark::State& state) { +static void BM_array_multiplication_thread_pool(benchmark::State& state) { const auto array_size = state.range(0); const std::size_t multiplications_to_perform = state.range(1); @@ -28,14 +31,100 @@ static void BM_array_multiplication(benchmark::State& state) { } } -BENCHMARK(BM_array_multiplication) +BENCHMARK(BM_array_multiplication_thread_pool) +#if defined(NDEBUG) ->Args({8, 25'000}) ->Args({64, 5'000}) ->Args({256, 250}) ->Args({512, 75}) ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif ->Unit(benchmark::kMillisecond) ->ReportAggregatesOnly(true) ->MeasureProcessCPUTime() ->UseRealTime() ->Name("dp::thread_pool array mult"); + +static void BM_array_multiplication_batch_thread_pool(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + + // generate the data + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + + // create our tasks + std::vector> tasks{}; + tasks.reserve(computations.size()); + + // task that is run on a new thread + auto thread_task = [](multiplication_pair pair) { + std::vector result(pair.first.size()); + multiply_array(pair.first, pair.second, result); + }; + + for (const auto& computation : computations) { + auto task = [comp = computation, execution_task = thread_task]() { execution_task(comp); }; + tasks.emplace_back(task); + } + + // create our thread pool using the default size + dp::thread_pool pool{}; + + for (auto _ : state) { + pool.enqueue(tasks.begin(), tasks.end()); + } +} + +BENCHMARK(BM_array_multiplication_batch_thread_pool) +#if defined(NDEBUG) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool batched array mult"); + +inline cppcoro::task mult_task(multiplication_pair pair, dp::thread_pool<>& pool) { + std::vector result(pair.first.size()); + co_await co_multiply_array(pair.first, pair.second, result, pool); +} + +static void BM_array_multiplication_thread_pool_coroutine(benchmark::State& state) { + const auto array_size = state.range(0); + const std::size_t multiplications_to_perform = state.range(1); + const auto computations = generate_benchmark_data(array_size, multiplications_to_perform); + dp::thread_pool pool{}; + std::vector> tasks; + for (auto _ : state) { + for (auto mult_pair : computations) { + tasks.emplace_back(mult_task(mult_pair, pool)); + } + } + + cppcoro::sync_wait(cppcoro::when_all(std::move(tasks))); +} + +BENCHMARK(BM_array_multiplication_thread_pool_coroutine) +#if defined(NDEBUG) + ->Args({8, 25'000}) + ->Args({64, 5'000}) + ->Args({256, 250}) + ->Args({512, 75}) + ->Args({1024, 10}) +#else + ->Args({8, 50}) +#endif + ->Unit(benchmark::kMillisecond) + ->ReportAggregatesOnly(true) + ->MeasureProcessCPUTime() + ->UseRealTime() + ->Name("dp::thread_pool coroutine array mult"); diff --git a/cmake/CPM.cmake b/cmake/CPM.cmake index 59208e4..bacb192 100644 --- a/cmake/CPM.cmake +++ b/cmake/CPM.cmake @@ -1,4 +1,4 @@ -set(CPM_DOWNLOAD_VERSION 0.32.0) +set(CPM_DOWNLOAD_VERSION 0.35.6) if(CPM_SOURCE_CACHE) # Expand relative path. This is important if the provided path contains a tilde (~) diff --git a/include/thread_pool/thread_pool.h b/include/thread_pool/thread_pool.h index c296fc8..2e78b35 100644 --- a/include/thread_pool/thread_pool.h +++ b/include/thread_pool/thread_pool.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -9,19 +10,15 @@ #include #include #include +#include #include "thread_pool/thread_safe_queue.h" namespace dp { + namespace details { - // leave clang detection out for now as there is not support for std::move_only_function -#if defined(__GNUC__) && !defined(__clang_major__) -# define GCC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) -#else -# define GCC_VERSION 0 -#endif -#if (defined(_MSC_VER) && (_MSC_VER >= 1930) && (_MSVC_LANG > 202002L)) || (GCC_VERSION >= 120000) -# define TP_HAS_MOVE_ONLY_FUNCTION_SUPPORT + +#if __cpp_lib_move_only_function using default_function_type = std::move_only_function; #else using default_function_type = std::function; @@ -46,8 +43,8 @@ namespace dp { do { // invoke the task while (auto task = tasks_[id].tasks.pop()) { + pending_tasks_.fetch_sub(1, std::memory_order_release); try { - pending_tasks_.fetch_sub(1, std::memory_order_release); std::invoke(std::move(task.value())); } catch (...) { } @@ -57,9 +54,12 @@ namespace dp { for (std::size_t j = 1; j < tasks_.size(); ++j) { const std::size_t index = (id + j) % tasks_.size(); if (auto task = tasks_[index].tasks.steal()) { - // steal a task pending_tasks_.fetch_sub(1, std::memory_order_release); - std::invoke(std::move(task.value())); + try { + // invoke the stolen task + std::invoke(std::move(task.value())); + } catch (...) { + } // stop stealing once we have invoked a stolen task break; } @@ -100,7 +100,7 @@ namespace dp { typename ReturnType = std::invoke_result_t> requires std::invocable [[nodiscard]] std::future enqueue(Function f, Args... args) { -#ifdef TP_HAS_MOVE_ONLY_FUNCTION_SUPPORT +#if __cpp_lib_move_only_function // we can do this in C++23 because we now have support for move only functions std::promise promise; auto future = promise.get_future(); @@ -143,6 +143,25 @@ namespace dp { #endif } + /** + * @brief Enqueue a list of tasks into the thread pool. + * @tparam Iterator An iterator type + * @tparam IteratorType The underlying value type of the iterator + * @param begin The start of the task range + * @param end The end of the task range + */ + template ::value_type> + requires std::input_iterator && std::invocable && + std::is_same_v> + void enqueue(Iterator &&begin, Iterator &&end) { + // simple range check + if (begin >= end) return; + + // enqueue all the tasks + enqueue_tasks(std::forward(begin), std::forward(end)); + } + /** * @brief Enqueue a task to be executed in the thread pool that returns void. * @tparam Function An invokable type. @@ -165,15 +184,79 @@ namespace dp { })); } + /** + * @brief Allows you to schedule coroutines to run on the thread pool. + */ + [[nodiscard]] auto schedule() { + /// @brief Simple awaitable type that we can return. + struct scheduled_operation { + dp::thread_pool<> *thread_pool_; + static bool await_ready() { return false; } + void await_suspend(std::coroutine_handle<> handle) { + if (thread_pool_) { + thread_pool_->enqueue_detach([](std::coroutine_handle<> h) { h.resume(); }, + handle); + } + } + static void await_resume() {} + }; + + return scheduled_operation{this}; + } + private: + template + [[maybe_unused]] auto assign_task_to_thread(Function &&f) -> std::size_t { + tasks_[index_].tasks.push(std::forward(f)); + const auto &i = index_; + if (++index_ >= tasks_.size()) index_ = 0; + return i; + } + template void enqueue_task(Function &&f) { - const std::size_t i = count_++ % tasks_.size(); pending_tasks_.fetch_add(1, std::memory_order_relaxed); - tasks_[i].tasks.push(std::forward(f)); - tasks_[i].signal.release(); + const auto &assigned_index = assign_task_to_thread(std::forward(f)); + tasks_[assigned_index].signal.release(); + } + + template ::value_type> + void enqueue_tasks(Iterator &&begin, Iterator &&end) { + // get the count of tasks + const auto &tasks = std::distance(begin, end); + pending_tasks_.fetch_add(tasks, std::memory_order_relaxed); + + // get the number of threads once and re-use + const auto &task_size = tasks_.size(); + + // start index of where we're adding tasks + std::optional start = std::nullopt; + // total count of threads we need to wake up. + const auto &count = ::std::min(tasks, task_size); + + for (auto it = begin; it < end; ++it) { + // push the task + const auto &assigned_index = assign_task_to_thread(std::move([f = *it]() { + // suppress exceptions + try { + std::invoke(f); + } catch (...) { + } + })); + if (!start) start = assigned_index; + } + + // release all the needed signals to wake all threads + for (std::size_t j = 0; j < count; j++) { + const auto &index = (start.value() + j) % task_size; + tasks_[index].signal.release(); + } } + /** + * @brief Task item that holds list of tasks and signal semaphore for each thread. + */ struct task_item { dp::thread_safe_queue tasks{}; std::binary_semaphore signal{0}; @@ -181,7 +264,7 @@ namespace dp { std::vector threads_; std::deque tasks_; - std::size_t count_{}; + std::uint_fast8_t index_{0}; std::atomic_int_fast64_t pending_tasks_{}; }; diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 5076989..1556933 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -2,6 +2,7 @@ #include #include +#include #include auto multiply(int a, int b) { return a * b; } @@ -130,19 +131,19 @@ TEST_CASE("Ensure task load is spread evenly across threads") { } TEST_CASE("Ensure task exception doesn't kill worker thread") { - auto throw_task = [](int) -> int { throw std::logic_error("Error occurred."); }; + auto throw_task = [](int) -> int { throw std::logic_error(std::string("Error occurred.")); }; auto regular_task = [](int input) -> int { return input * 2; }; std::atomic_uint_fast64_t count(0); - auto throw_no_return = []() { throw std::logic_error("Error occurred."); }; + auto throw_no_return = []() { throw std::logic_error(std::string("Error occurred.")); }; auto no_throw_no_return = [&count]() { std::this_thread::sleep_for(std::chrono::seconds(1)); count += 1; }; { - dp::thread_pool pool; + dp::thread_pool pool(4); auto throw_future = pool.enqueue(throw_task, 1); auto no_throw_future = pool.enqueue(regular_task, 2); @@ -157,3 +158,24 @@ TEST_CASE("Ensure task exception doesn't kill worker thread") { CHECK_EQ(count.load(), 1); } + +TEST_CASE("Ensure work completes upon destruction when batching tasks") { + std::atomic counter; + constexpr auto total_tasks = 30; + { + dp::thread_pool pool(4); + std::vector> tasks; + tasks.reserve(total_tasks); + for (auto i = 0; i < total_tasks; i++) { + auto task = [i, &counter]() { + std::this_thread::sleep_for(std::chrono::milliseconds((i + 1) * 100)); + ++counter; + }; + tasks.emplace_back(task); + } + + pool.enqueue(tasks.begin(), tasks.end()); + } + + CHECK_EQ(counter.load(), total_tasks); +}