diff --git a/src/libraries/thread_description/include/m/thread_description/thread_description.h b/src/libraries/thread_description/include/m/thread_description/thread_description.h index 3b56fd33..1769436b 100644 --- a/src/libraries/thread_description/include/m/thread_description/thread_description.h +++ b/src/libraries/thread_description/include/m/thread_description/thread_description.h @@ -13,6 +13,7 @@ #include #include +#include namespace m { @@ -49,6 +50,17 @@ namespace m thread_description_impl::set_thread_description(description, m_saved_state); } + explicit thread_description(m::cwzstring description) + { + std::wstring_view view{}; + if (description != nullptr) + { + view = std::wstring_view(description); + } + + thread_description_impl::set_thread_description(view, m_saved_state); + } + void set(std::wstring_view description) { diff --git a/src/libraries/threadpool/include/m/threadpool/work_queue.h b/src/libraries/threadpool/include/m/threadpool/work_queue.h index 55a4ba16..e5b71a17 100644 --- a/src/libraries/threadpool/include/m/threadpool/work_queue.h +++ b/src/libraries/threadpool/include/m/threadpool/work_queue.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -14,6 +15,7 @@ #include #include +#include #include #include #include @@ -32,7 +34,7 @@ namespace m class work_item { public: - virtual ~work_item() {} + virtual ~work_item() = default; utc_time_point enqueue_time() @@ -75,7 +77,7 @@ namespace m return do_state(); } - std::wstring + m::not_null description() { return do_description(); @@ -141,7 +143,7 @@ namespace m virtual work_item_state do_state() = 0; - virtual std::wstring + virtual m::not_null do_description() = 0; virtual bool @@ -160,246 +162,54 @@ namespace m do_wait_until(m::time_point tp) = 0; }; - namespace work_queue_impl - { - inline std::atomic work_item_id_counter{0}; - - class work_item : public m::work_item - { - public: - work_item(work_item const&) = delete; - work_item(work_item&&) = delete; - - void - work() - { - do_work(); - } - - protected: - work_item(std::wstring description); - work_item(); - ~work_item() = default; - - utc_time_point - do_enqueue_time() override - { - auto l = std::unique_lock(m_mutex); - return m_work_item_times.m_enqueue_time; - } - - std::optional - do_start_time() override - { - auto l = std::unique_lock(m_mutex); - return m_work_item_times.m_start_time; - } - - std::optional - do_end_time() override - { - auto l = std::unique_lock(m_mutex); - return m_work_item_times.m_end_time; - } - - work_item_times - do_times() override - { - auto l = std::unique_lock(m_mutex); - return m_work_item_times; - } - - work_item_state - do_state() override - { - auto l = std::unique_lock(m_mutex); - return m_work_item_state; - } - - std::wstring - do_description() override - { - // description is not under the lock, it does not change after construction - // so we do not take the mutex - return m_description; - } - - bool - do_try_cancel() override - { - auto l = std::unique_lock(m_mutex); - return false; - } - - uint64_t - do_id() override - { - return m_id; - } - - virtual void - do_work() = 0; - - work_item_id_type m_id; // immutable once constructed - std::wstring m_description; // immutable once constructed - std::mutex m_mutex; - work_item_times m_work_item_times; - work_item_state m_work_item_state; - }; - - /// - /// The `work_item` class is the concrete implementation - /// of the `work_item` interface type, from which individual - /// work_item - /// - template - requires(std::is_void_v || std::regular) - class runnable_work_item : public work_item - { - public: - runnable_work_item() = delete; - runnable_work_item(runnable_work_item const&) = delete; - runnable_work_item(runnable_work_item&&) = delete; - template - requires(std::invocable) - runnable_work_item(std::wstring description, Fn&& fn, Args&&... args): - work_item(std::move(description)), - m_packaged_task(std::forward(fn), std::forward(args)...), - m_future(m_packaged_task.get_future()) - {} - - template - requires(std::invocable) - runnable_work_item(Fn&& fn, Args&&... args): - m_packaged_task(std::forward(fn), std::forward(args)...), - m_future(m_packaged_task.get_future()) - {} - - ~runnable_work_item() {} - - /// - /// gets the result of the task having run. Will throw any - /// exception that had occurred during the execution of the task. - /// - /// - R - get() - { - return m_future.get(); - } - - private: - void - do_work() noexcept override - { - // Before execution: - // - // If the work item was canceled, just bail out. - // - // Make sure that state is queued. - // - // Record the start time - // set the state to running. - // - { - auto l = std::unique_lock(m_mutex); - - if (m_work_item_state == work_item_state::canceled) - return; - - M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::queued); - - m_work_item_times.m_start_time = m::clock::now(); - m_work_item_state = work_item_state::running; - } - - m_packaged_task(); - - // After execution: - // - // Make sure that state is still running. - // - // Record the end time - // set the state to done. - // - { - auto l = std::unique_lock(m_mutex); - - M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::running); - - m_work_item_times.m_end_time = m::clock::now(); - m_work_item_state = work_item_state::done; - } - } - - void - do_wait() override - { - m_future.wait(); - } - - bool - do_wait_for(std::chrono::milliseconds const d) override - { - auto const future_status = m_future.wait_for(d); - - switch (future_status) - { - default: M_UNREACHABLE_CODE(); break; - - case std::future_status::deferred: return true; - case std::future_status::ready: return true; - case std::future_status::timeout: return false; - } - } - - bool - do_wait_until(m::time_point const tp) override - { - auto const future_status = m_future.wait_until(tp); - - switch (future_status) - { - default: M_UNREACHABLE_CODE(); break; - - case std::future_status::deferred: return true; - case std::future_status::ready: return true; - case std::future_status::timeout: return false; - } - } - - std::packaged_task m_packaged_task; - std::future m_future; - }; - } // namespace work_queue_impl - /// - /// A work queue is a separate + /// A work queue is a separate execution unit for work items. Work queues may in + /// the future have policies applied to them regarding the number of concurrent + /// work items, or how they are scheduled to processors, etc. /// class work_queue { public: - virtual ~work_queue() {} + virtual ~work_queue() = default; - template - requires(std::invocable) + template + requires(std::invocable) + [[nodiscard]] std::shared_ptr - enqueue(Fn&& fn, Args&&... args) + enqueue(Fn&& fn) { - auto wi = std::make_shared>( - std::forward(fn), std::forward(args)...); + auto const static empty_wsstring = m::wsstring(L""sv); - do_enqueue(wi); + return do_enqueue(std::packaged_task(std::forward(fn)), empty_wsstring); + } - return wi; + template + requires(std::invocable) + [[nodiscard]] + std::shared_ptr + enqueue(Fn&& fn, std::wformat_string fmt, Args&&... args) + { + return do_enqueue(std::packaged_task(std::forward(fn)), + m::wsstring(std::vformat( + fmt.get(), std::make_wformat_args(std::forward(args)...)))); } + /// + /// The `queue_size` member function returns the number of work items + /// currently in the queue. Not all work items in the queue are necessarily + /// ready to be executed. + /// + /// The count of work items in the queue. std::size_t queue_size() { return do_queue_size(); } + /// + /// Gets the number of currently running items. + /// + /// The count of running items. std::size_t running() { @@ -426,22 +236,18 @@ namespace m return do_wait_for(std::chrono::duration_cast(dur)); } - /// - /// Waits for the work items to complete, or for `when` to arrive. Returns `true` - /// if the work item completes, `false` if the time comes without the - /// work item completing. - /// - /// - /// - /// The time point to wait until. - /// `true` if the task completed before the time point, `false` if the - /// task did not complete by `when`. - template - bool - wait_until(std::chrono::time_point when) - { - return do_wait_until(m::time_point_cast(when)); - } + // + // "wait" and "wait_until" are omitted, intentionally. This is because + // waiting indefinitely for a potentially large number of work items + // to complete is fraught with peril and not of the good kind. + // + // If you want to wait indefinitely, use wait_for() with some + // reasonable duration (1sec? 5sec? 60sec?) in a loop, logging some + // progress regarding the number of remaining work items in the queue + // (`running()`, `queue_size()`) as you proceed, in order to assist + // diagnosibility of why a "hang" is occurring in your production + // scenarios. + // private: virtual std::size_t @@ -453,11 +259,8 @@ namespace m virtual bool do_wait_for(std::chrono::milliseconds dur) = 0; - virtual bool - do_wait_until(m::time_point when) = 0; - - virtual void - do_enqueue(std::shared_ptr wi) = 0; + virtual std::shared_ptr + do_enqueue(std::packaged_task&& task, m::wsstring const& description) = 0; }; } // namespace m diff --git a/src/libraries/threadpool/src/work_item.cpp b/src/libraries/threadpool/src/work_item.cpp index 723a3900..fafb9815 100644 --- a/src/libraries/threadpool/src/work_item.cpp +++ b/src/libraries/threadpool/src/work_item.cpp @@ -10,20 +10,154 @@ #include #include +#include "work_item.h" #include "work_queue_base.h" namespace m::work_queue_impl { - work_item::work_item(): - m_id(work_queue_impl::work_item_id_counter.fetch_add(1)), - m_work_item_state(work_item_state::queued) - {} - - work_item::work_item(std::wstring description): + work_item::work_item(m::wsstring description, std::packaged_task&& task): m_id(work_queue_impl::work_item_id_counter.fetch_add(1)), m_description(std::move(description)), - m_work_item_state(work_item_state::queued) - {} + m_work_item_state(work_item_state::queued), + m_packaged_task(std::move(task)), + m_future(m_packaged_task.get_future()) + { + m_work_item_times.m_enqueue_time = utc_time_point::clock::now(); + } + + utc_time_point + work_item::do_enqueue_time() + { + auto l = std::unique_lock(m_mutex); + return m_work_item_times.m_enqueue_time; + } + + std::optional + work_item::do_start_time() + { + auto l = std::unique_lock(m_mutex); + return m_work_item_times.m_start_time; + } + + std::optional + work_item::do_end_time() + { + auto l = std::unique_lock(m_mutex); + return m_work_item_times.m_end_time; + } + + work_item_times + work_item::do_times() + { + auto l = std::unique_lock(m_mutex); + return m_work_item_times; + } + + work_item_state + work_item::do_state() + { + auto l = std::unique_lock(m_mutex); + return m_work_item_state; + } + + m::not_null + work_item::do_description() + { + // description is not under the lock, it does not change after construction + // so we do not take the mutex + return m_description.c_str(); + } + + bool + work_item::do_try_cancel() + { + auto l = std::unique_lock(m_mutex); + return false; + } + + uint64_t + work_item::do_id() + { + return m_id; + } + + void + work_item::do_work() noexcept + { + // Before execution: + // + // If the work item was canceled, just bail out. + // + // Make sure that state is queued. + // + // Record the start time + // set the state to running. + // + { + auto l = std::unique_lock(m_mutex); + + if (m_work_item_state == work_item_state::canceled) + return; + + M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::queued); + + m_work_item_times.m_start_time = m::clock::now(); + m_work_item_state = work_item_state::running; + } + + m_packaged_task(); + + // After execution: + // + // Make sure that state is still running. + // + // Record the end time + // set the state to done. + // + { + auto l = std::unique_lock(m_mutex); + + M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::running); + + m_work_item_times.m_end_time = m::clock::now(); + m_work_item_state = work_item_state::done; + } + } + + void + work_item::do_wait() + { + m_future.wait(); + } + + bool + work_item::do_wait_for(std::chrono::milliseconds const d) + { + auto const future_status = m_future.wait_for(d); + + switch (future_status) + { + default: M_UNREACHABLE_CODE(); break; + + case std::future_status::deferred: return true; + case std::future_status::ready: return true; + case std::future_status::timeout: return false; + } + } + + bool + work_item::do_wait_until(m::time_point const tp) + { + auto const future_status = m_future.wait_until(tp); + + switch (future_status) + { + default: M_UNREACHABLE_CODE(); break; + case std::future_status::deferred: return true; + case std::future_status::ready: return true; + case std::future_status::timeout: return false; + } + } -} // namespace m::threadpool_impl +} // namespace m::work_queue_impl diff --git a/src/libraries/threadpool/src/work_item.h b/src/libraries/threadpool/src/work_item.h new file mode 100644 index 00000000..af6ae145 --- /dev/null +++ b/src/libraries/threadpool/src/work_item.h @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace m::work_queue_impl +{ + inline std::atomic work_item_id_counter{1}; + + class work_item : public m::work_item + { + public: + work_item() = delete; + work_item(work_item const&) = delete; + work_item(work_item&&) = delete; + + work_item(m::wsstring description, std::packaged_task&& task); + + ~work_item() = default; + + void + work() + { + do_work(); + } + + protected: + utc_time_point + do_enqueue_time() override; + + std::optional + do_start_time() override; + + std::optional + do_end_time() override; + + work_item_times + do_times() override; + + work_item_state + do_state() override; + + m::not_null + do_description() override; + + bool + do_try_cancel() override; + + uint64_t + do_id() override; + + virtual void + do_work() noexcept; + + void + do_wait() override; + + bool + do_wait_for(std::chrono::milliseconds const d) override; + + bool + do_wait_until(m::time_point const tp) override; + + work_item_id_type m_id; // immutable once constructed + m::wsstring m_description; // immutable once constructed + std::mutex m_mutex; + work_item_times m_work_item_times; + work_item_state m_work_item_state; + std::packaged_task m_packaged_task; + std::future m_future; + }; +} // namespace m::work_queue_impl diff --git a/src/libraries/threadpool/src/work_queue_base.cpp b/src/libraries/threadpool/src/work_queue_base.cpp index 7f652a12..e09784a6 100644 --- a/src/libraries/threadpool/src/work_queue_base.cpp +++ b/src/libraries/threadpool/src/work_queue_base.cpp @@ -47,17 +47,11 @@ namespace m::threadpool_impl l, dur, [this] { return m_ready_queue.empty() && m_running_work_items.empty(); }); } - bool - work_queue_base::do_wait_until(m::time_point when) + std::shared_ptr + work_queue_base::do_enqueue(std::packaged_task&& task, m::wsstring const& description) { - auto l = std::unique_lock(m_mutex); - return m_cv.wait_until( - l, when, [this] { return m_ready_queue.empty() && m_running_work_items.empty(); }); - } + auto wi = std::make_shared(description, std::move(task)); - void - work_queue_base::do_enqueue(std::shared_ptr wi) - { auto l = std::unique_lock(m_mutex); if (!m_platform_initialized) @@ -73,6 +67,8 @@ namespace m::threadpool_impl on_new_work_item(wi); m_cv.notify_all(); + + return wi; } } // namespace m::threadpool_impl diff --git a/src/libraries/threadpool/src/work_queue_base.h b/src/libraries/threadpool/src/work_queue_base.h index 04cd3635..0d110e39 100644 --- a/src/libraries/threadpool/src/work_queue_base.h +++ b/src/libraries/threadpool/src/work_queue_base.h @@ -13,6 +13,8 @@ #include #include +#include "work_item.h" + namespace m::threadpool_impl { /// @@ -43,8 +45,8 @@ namespace m::threadpool_impl class work_queue_base : public m::work_queue { protected: - work_queue_base() = default; - ~work_queue_base() = default; + work_queue_base() = default; + ~work_queue_base() = default; work_queue_base(work_queue_base const&) = delete; work_queue_base(work_queue_base&&) noexcept = delete; work_queue_base(work_queue_execution_policy wqep, std::wstring description); @@ -67,11 +69,8 @@ namespace m::threadpool_impl bool do_wait_for(std::chrono::milliseconds dur) override; - bool - do_wait_until(m::time_point when) override; - - void - do_enqueue(std::shared_ptr wi) override; + std::shared_ptr + do_enqueue(std::packaged_task&& task, m::wsstring const& description) override; virtual void perform_platform_initialization() = 0; diff --git a/src/libraries/threadpool/test/test_work_queue.cpp b/src/libraries/threadpool/test/test_work_queue.cpp index 3eb44cb6..2648041f 100644 --- a/src/libraries/threadpool/test/test_work_queue.cpp +++ b/src/libraries/threadpool/test/test_work_queue.cpp @@ -3,6 +3,7 @@ #include +#include #include #include #include @@ -28,7 +29,23 @@ TEST(WorkQueue, BasicCreation) TEST(WorkQueue, Queue1) { auto q = m::threadpool->create_work_queue(); - auto wi = q->enqueue([] { m::println("Hello, world!"); }); + auto wi = q->enqueue([] { m::println("Hello, world!"); }); + q->wait_for(5s); +} + +TEST(WorkQueue, QueueWithDescriptions) +{ + auto q = m::threadpool->create_work_queue(); + constexpr std::size_t n = 5; + + std::array, n> work_items; + for (std::size_t i = 0; i < work_items.size(); i++) + { + work_items[i] = q->enqueue( + [x = i] { m::println("Hello there number {}", x); }, + L"Work item number {}", i); + } + q->wait_for(5s); } @@ -40,7 +57,7 @@ TEST(WorkQueue, QueueN20) for (std::size_t i = 0; i < n; i++) { - work_items[i] = q->enqueue([x = i] { m::println("Hello there number {}", x); }); + work_items[i] = q->enqueue([x = i] { m::println("Hello there number {}", x); }); } q->wait_for(5s); @@ -60,7 +77,7 @@ TEST(WorkQueue, QueueNBig) for (std::size_t i = 0; i < n; i++) { - work_items[i] = q->enqueue([p = &flags[i]] { *p = 1; }); + work_items[i] = q->enqueue([p = &flags[i]] { *p = 1; }); } auto const after_queue = m::clock::now();