Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string_view>

#include <m/utility/pointers.h>
#include <m/utility/zstring.h>

namespace m
{
Expand Down Expand Up @@ -49,6 +50,17 @@ namespace m
thread_description_impl::set_thread_description(description, m_saved_state);
}

explicit thread_description(m::cwzstring description)
{
std::wstring_view view{};
if (description != nullptr)
{
view = std::wstring_view(description);
}

thread_description_impl::set_thread_description(view, m_saved_state);
}

void
set(std::wstring_view description)
{
Expand Down
295 changes: 49 additions & 246 deletions src/libraries/threadpool/include/m/threadpool/work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <atomic>
#include <chrono>
#include <format>
#include <functional>
#include <future>
#include <memory>
Expand All @@ -14,6 +15,7 @@

#include <m/chrono/chrono.h>
#include <m/error_handling/macros.h>
#include <m/sstring/sstring.h>
#include <m/threadpool/types.h>
#include <m/threadpool/work_item_state.h>
#include <m/utility/pointers.h>
Expand All @@ -32,7 +34,7 @@ namespace m
class work_item
{
public:
virtual ~work_item() {}
virtual ~work_item() = default;

utc_time_point
enqueue_time()
Expand Down Expand Up @@ -75,7 +77,7 @@ namespace m
return do_state();
}

std::wstring
m::not_null<m::cwzstring>
description()
{
return do_description();
Expand Down Expand Up @@ -141,7 +143,7 @@ namespace m
virtual work_item_state
do_state() = 0;

virtual std::wstring
virtual m::not_null<m::cwzstring>
do_description() = 0;

virtual bool
Expand All @@ -160,246 +162,54 @@ namespace m
do_wait_until(m::time_point tp) = 0;
};

namespace work_queue_impl
{
inline std::atomic<uint64_t> work_item_id_counter{0};

class work_item : public m::work_item
{
public:
work_item(work_item const&) = delete;
work_item(work_item&&) = delete;

void
work()
{
do_work();
}

protected:
work_item(std::wstring description);
work_item();
~work_item() = default;

utc_time_point
do_enqueue_time() override
{
auto l = std::unique_lock(m_mutex);
return m_work_item_times.m_enqueue_time;
}

std::optional<utc_time_point>
do_start_time() override
{
auto l = std::unique_lock(m_mutex);
return m_work_item_times.m_start_time;
}

std::optional<utc_time_point>
do_end_time() override
{
auto l = std::unique_lock(m_mutex);
return m_work_item_times.m_end_time;
}

work_item_times
do_times() override
{
auto l = std::unique_lock(m_mutex);
return m_work_item_times;
}

work_item_state
do_state() override
{
auto l = std::unique_lock(m_mutex);
return m_work_item_state;
}

std::wstring
do_description() override
{
// description is not under the lock, it does not change after construction
// so we do not take the mutex
return m_description;
}

bool
do_try_cancel() override
{
auto l = std::unique_lock(m_mutex);
return false;
}

uint64_t
do_id() override
{
return m_id;
}

virtual void
do_work() = 0;

work_item_id_type m_id; // immutable once constructed
std::wstring m_description; // immutable once constructed
std::mutex m_mutex;
work_item_times m_work_item_times;
work_item_state m_work_item_state;
};

/// <summary>
/// The `work_item` class is the concrete implementation
/// of the `work_item` interface type, from which individual
/// work_item
/// </summary>
template <typename R, typename... Args>
requires(std::is_void_v<R> || std::regular<R>)
class runnable_work_item : public work_item
{
public:
runnable_work_item() = delete;
runnable_work_item(runnable_work_item const&) = delete;
runnable_work_item(runnable_work_item&&) = delete;
template <typename Fn>
requires(std::invocable<Fn, Args...>)
runnable_work_item(std::wstring description, Fn&& fn, Args&&... args):
work_item(std::move(description)),
m_packaged_task(std::forward<Fn>(fn), std::forward<Args>(args)...),
m_future(m_packaged_task.get_future())
{}

template <typename Fn>
requires(std::invocable<Fn, Args...>)
runnable_work_item(Fn&& fn, Args&&... args):
m_packaged_task(std::forward<Fn>(fn), std::forward<Args>(args)...),
m_future(m_packaged_task.get_future())
{}

~runnable_work_item() {}

/// <summary>
/// gets the result of the task having run. Will throw any
/// exception that had occurred during the execution of the task.
/// </summary>
/// <returns></returns>
R
get()
{
return m_future.get();
}

private:
void
do_work() noexcept override
{
// Before execution:
//
// If the work item was canceled, just bail out.
//
// Make sure that state is queued.
//
// Record the start time
// set the state to running.
//
{
auto l = std::unique_lock(m_mutex);

if (m_work_item_state == work_item_state::canceled)
return;

M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::queued);

m_work_item_times.m_start_time = m::clock::now();
m_work_item_state = work_item_state::running;
}

m_packaged_task();

// After execution:
//
// Make sure that state is still running.
//
// Record the end time
// set the state to done.
//
{
auto l = std::unique_lock(m_mutex);

M_INTERNAL_ERROR_CHECK(m_work_item_state == work_item_state::running);

m_work_item_times.m_end_time = m::clock::now();
m_work_item_state = work_item_state::done;
}
}

void
do_wait() override
{
m_future.wait();
}

bool
do_wait_for(std::chrono::milliseconds const d) override
{
auto const future_status = m_future.wait_for(d);

switch (future_status)
{
default: M_UNREACHABLE_CODE(); break;

case std::future_status::deferred: return true;
case std::future_status::ready: return true;
case std::future_status::timeout: return false;
}
}

bool
do_wait_until(m::time_point const tp) override
{
auto const future_status = m_future.wait_until(tp);

switch (future_status)
{
default: M_UNREACHABLE_CODE(); break;

case std::future_status::deferred: return true;
case std::future_status::ready: return true;
case std::future_status::timeout: return false;
}
}

std::packaged_task<R(Args...)> m_packaged_task;
std::future<R> m_future;
};
} // namespace work_queue_impl

/// <summary>
/// A work queue is a separate
/// A work queue is a separate execution unit for work items. Work queues may in
/// the future have policies applied to them regarding the number of concurrent
/// work items, or how they are scheduled to processors, etc.
/// </summary>
class work_queue
{
public:
virtual ~work_queue() {}
virtual ~work_queue() = default;

template <typename R, typename Fn, typename... Args>
requires(std::invocable<Fn, Args...>)
template <typename Fn>
requires(std::invocable<Fn>)
[[nodiscard]]
std::shared_ptr<work_item>
enqueue(Fn&& fn, Args&&... args)
enqueue(Fn&& fn)
{
auto wi = std::make_shared<work_queue_impl::runnable_work_item<R, Args...>>(
std::forward<Fn>(fn), std::forward<Args>(args)...);
auto const static empty_wsstring = m::wsstring(L""sv);

do_enqueue(wi);
return do_enqueue(std::packaged_task<void()>(std::forward<Fn>(fn)), empty_wsstring);
}

return wi;
template <typename Fn, typename... Args>
requires(std::invocable<Fn>)
[[nodiscard]]
std::shared_ptr<work_item>
enqueue(Fn&& fn, std::wformat_string<Args...> fmt, Args&&... args)
{
return do_enqueue(std::packaged_task<void()>(std::forward<Fn>(fn)),
m::wsstring(std::vformat(
fmt.get(), std::make_wformat_args(std::forward<Args>(args)...))));
}
Comment thread
EmJayGee marked this conversation as resolved.

/// <summary>
/// The `queue_size` member function returns the number of work items
/// currently in the queue. Not all work items in the queue are necessarily
/// ready to be executed.
/// </summary>
/// <returns>The count of work items in the queue.</returns>
std::size_t
queue_size()
{
return do_queue_size();
}

/// <summary>
/// Gets the number of currently running items.
/// </summary>
/// <returns>The count of running items.</returns>
std::size_t
running()
{
Expand All @@ -426,22 +236,18 @@ namespace m
return do_wait_for(std::chrono::duration_cast<std::chrono::milliseconds>(dur));
}

/// <summary>
/// Waits for the work items to complete, or for `when` to arrive. Returns `true`
/// if the work item completes, `false` if the time comes without the
/// work item completing.
/// </summary>
/// <typeparam name="Clock"></typeparam>
/// <typeparam name="Duration"></typeparam>
/// <param name="when">The time point to wait until.</param>
/// <returns>`true` if the task completed before the time point, `false` if the
/// task did not complete by `when`.</returns>
template <typename Clock, typename Duration>
bool
wait_until(std::chrono::time_point<Clock, Duration> when)
{
return do_wait_until(m::time_point_cast(when));
}
//
// "wait" and "wait_until" are omitted, intentionally. This is because
// waiting indefinitely for a potentially large number of work items
// to complete is fraught with peril and not of the good kind.
//
// If you want to wait indefinitely, use wait_for() with some
// reasonable duration (1sec? 5sec? 60sec?) in a loop, logging some
// progress regarding the number of remaining work items in the queue
// (`running()`, `queue_size()`) as you proceed, in order to assist
// diagnosibility of why a "hang" is occurring in your production
// scenarios.
//

private:
virtual std::size_t
Expand All @@ -453,11 +259,8 @@ namespace m
virtual bool
do_wait_for(std::chrono::milliseconds dur) = 0;

virtual bool
do_wait_until(m::time_point when) = 0;

virtual void
do_enqueue(std::shared_ptr<m::work_queue_impl::work_item> wi) = 0;
virtual std::shared_ptr<work_item>
do_enqueue(std::packaged_task<void()>&& task, m::wsstring const& description) = 0;
};

} // namespace m
Loading
Loading