Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/core/concepts/scheduler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ using context_t = typename std::remove_cvref_t<T>::context_type;
*/
export template <typename Sch>
concept scheduler =
has_context_typedef<Sch> && requires (Sch scheduler, sched_handle<context_t<Sch>> handle) {
{ scheduler.post(handle) } -> std::same_as<void>;
has_context_typedef<Sch> && requires (Sch &&scheduler, sched_handle<context_t<Sch>> handle) {
{ static_cast<Sch &&>(scheduler).post(handle) } -> std::same_as<void>;
};

} // namespace lf
18 changes: 12 additions & 6 deletions src/core/promise.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ constexpr auto final_suspend_full(Context &context, frame_t<Context> *frame) noe
continue;
}

if (owner) {
// We were unable to resume the parent and we were its owner, as the
// resuming thread will take ownership of the parent's we must give it up.
context.stack().release(std::move(release_key));
}

return parent->handle();
}

if (owner) {
// We were unable to resume the parent and we were its owner, as the
// resuming thread will take ownership of the parent's we must give it up.
context.stack().release(std::move(release_key));
}

// We did not win the join-race, we cannot dereference the parent pointer now
// as the frame may now be freed by the winner. Parent has not reached join
// or we are not the last child to complete. We are now out of jobs, we must
Expand Down Expand Up @@ -182,6 +182,7 @@ constexpr auto final_suspend_trailing(Context &context, frame_t<Context> *parent
}
return final_suspend_full<Context>(context, parent);
}

return parent->handle();
}

Expand Down Expand Up @@ -421,6 +422,11 @@ struct join_awaitable {
LF_ASSUME(self.frame->joins == k_u16_max);

// Outside parallel regions so can touch non-atomically.
//
// A task that completes by responding to cancellation will drop any
// exceptions however, a task may still throw exceptions even if cancelled.
// Here we must rethrow even if cancelled because we can't re-suspend at
// this point.
if constexpr (LF_COMPILER_EXCEPTIONS) {
if (self.frame->exception_bit) [[unlikely]] {
self.rethrow_exception();
Expand Down
206 changes: 109 additions & 97 deletions src/core/receiver.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,161 +19,167 @@ export struct broken_receiver_error final : libfork_exception {
}
};

export struct operation_cancelled_error final : libfork_exception {
[[nodiscard]]
constexpr auto what() const noexcept -> const char * override {
return "operation was cancelled";
}
};

/**
* @brief Shared state between a scheduled task and its receiver handle.
*
* @tparam T The return type of the scheduled coroutine.
* @tparam Stoppable If true, the state owns a stop_source that can be used
* to cancel the root task externally.
*
* Constructors forward arguments for in-place construction of the return value.
* Internal access is gated behind a hidden friend: `get(key_t, receiver_state&)`.
*/
export template <typename T, bool Stoppable = false>
class receiver_state {
public:
struct empty {};
template <typename T, bool Stoppable = false>
struct hidden_receiver_state {

/// Default construction — return value is default-initialised (or empty for void).
constexpr receiver_state() = default;
struct empty_1 {};
struct empty_2 {};

/// In-place construction of the return value from arbitrary args.
template <typename... Args>
requires (!std::is_void_v<T>) && std::constructible_from<T, Args...>
constexpr explicit receiver_state(Args &&...args) : m_return_value(std::forward<Args>(args)...) {}
alignas(k_new_align) std::array<std::byte, 1024> buffer{};

/**
* @brief Request that the associated task stop.
*
* Only available when Stoppable=true. Safe to call before scheduling —
* the root frame checks stop_requested() before executing the task body.
*/
constexpr auto request_stop() noexcept -> void
requires Stoppable
{
m_stop.request_stop();
}
[[no_unique_address]]
std::conditional_t<std::is_void_v<T>, empty_1, T> return_value{};

private:
template <typename U, bool C>
friend class receiver;
std::exception_ptr exception;
std::atomic_flag ready;

/**
* @brief Internal accessor returned by `get(key_t, receiver_state&)`.
*
* Not reachable by name from outside this translation unit because view
* is a private nested type. Callers use `auto` with the hidden friend.
*/
struct view {
receiver_state *p;
[[no_unique_address]]
std::conditional_t<Stoppable, stop_source, empty_2> stop;

constexpr void set_exception(std::exception_ptr e) noexcept { p->m_exception = std::move(e); }
constexpr hidden_receiver_state() = default;

constexpr void notify_ready() noexcept {
p->m_ready.test_and_set();
p->m_ready.notify_one();
}
template <typename... Args>
requires (sizeof...(Args) > 0) && std::constructible_from<T, Args...>
constexpr explicit(sizeof...(Args) == 1) hidden_receiver_state(Args &&...args)
: return_value(std::forward<Args>(args)...) {}
};

[[nodiscard]]
constexpr auto return_value_address() noexcept -> T *
requires (!std::is_void_v<T>)
{
return std::addressof(p->m_return_value);
}
/// Convenience alias — used throughout the core partitions.
template <typename T, bool Stoppable = false>
using state_handle = std::shared_ptr<hidden_receiver_state<T, Stoppable>>;

[[nodiscard]]
constexpr auto get_stop_token() noexcept -> stop_source::stop_token
requires Stoppable
{
return p->m_stop.token();
}
};
/**
* @brief Lightweight move-only handle owning a pre-allocated root task state.
*
* Construction allocates a `hidden_receiver_state<T, Stoppable>` which embeds a
* 1 KiB aligned buffer; the root coroutine frame is placement-constructed
* into that buffer by `schedule`.
*
* Constructors mirror `make_shared` / `allocate_shared`:
*
* recv_state<T> s; // default-init return value
* recv_state<T> s{v1, v2}; // in-place init: T{v1, v2}
* recv_state<T> s{allocator_arg, alloc}; // default-init, custom allocator
* recv_state<T> s{allocator_arg, alloc, v1, v2}; // in-place init + custom allocator
*/
export template <typename T, bool Stoppable = false>
class recv_state {
using state_type = hidden_receiver_state<T, Stoppable>;

/**
* @brief Hidden friend accessor for internal library use.
*
* Only callable via ADL when a `key_t` is available (i.e. by calling `key()`).
* Returns a `view` proxy to manipulate the state's private members.
*/
public:
/// Default: value-initialise via `std::make_shared`.
constexpr recv_state() : m_ptr(std::make_shared<state_type>()) {}

/// Value-init from args: forwards `args` to `hidden_receiver_state`'s constructor
/// (in-place construction of the return value) via `std::make_shared`.
template <typename... Args>
requires (sizeof...(Args) > 0) && std::constructible_from<state_type, Args...>
constexpr explicit(sizeof...(Args) == 1) recv_state(Args &&...args)
: m_ptr(std::make_shared<state_type>(std::forward<Args>(args)...)) {}

/// Allocator-aware, default return value: allocate via `std::allocate_shared`.
template <simple_allocator Alloc>
constexpr recv_state(std::allocator_arg_t, Alloc const &alloc)
: m_ptr(std::allocate_shared<state_type>(alloc)) {}

/// Allocator-aware with value-init args.
template <simple_allocator Alloc, typename... Args>
requires std::constructible_from<state_type, Args...>
constexpr recv_state(std::allocator_arg_t, Alloc const &alloc, Args &&...args)
: m_ptr(std::allocate_shared<state_type>(alloc, std::forward<Args>(args)...)) {}

// Move-only.
constexpr recv_state(recv_state &&) noexcept = default;
constexpr auto operator=(recv_state &&) noexcept -> recv_state & = default;
constexpr recv_state(recv_state const &) = delete;
constexpr auto operator=(recv_state const &) -> recv_state & = delete;

private:
[[nodiscard]]
friend constexpr auto get(key_t, receiver_state &self) noexcept -> view {
return {&self};
friend constexpr auto get(key_t, recv_state &&self) noexcept -> state_handle<T, Stoppable> {
return std::move(self.m_ptr);
}

[[no_unique_address]]
std::conditional_t<std::is_void_v<T>, empty, T> m_return_value{};
std::exception_ptr m_exception;
std::atomic_flag m_ready;

[[no_unique_address]]
std::conditional_t<Stoppable, stop_source, empty> m_stop;
state_handle<T, Stoppable> m_ptr;
};

export template <typename T, bool Stoppable = false>
class receiver {

using state_type = receiver_state<T, Stoppable>;
using state_type = hidden_receiver_state<T, Stoppable>;

public:
constexpr receiver(key_t, std::shared_ptr<state_type> &&state) : m_state(std::move(state)) {}
constexpr receiver(key_t, state_handle<T, Stoppable> state) noexcept : m_state(std::move(state)) {}

// Move only
constexpr receiver(receiver &&) noexcept = default;
constexpr receiver(const receiver &) = delete;
constexpr auto operator=(receiver &&) noexcept -> receiver & = default;
constexpr auto operator=(const receiver &) -> receiver & = delete;

/**
* @brief Test if connected to a receiver state.
*/
[[nodiscard]]
constexpr auto valid() const noexcept -> bool {
return m_state != nullptr;
}

/**
* @brief Test if the associated task has completed (either successfully or with an exception/cancellation).
*/
[[nodiscard]]
constexpr auto ready() const -> bool {
if (!valid()) {
LF_THROW(broken_receiver_error{});
}
return m_state->m_ready.test();
return m_state->ready.test();
}

/**
* @brief Wait for the associated task to complete (either successfully or with an exception/cancellation).
*
* May be called multiple times.
*/
constexpr void wait() const {
if (!valid()) {
LF_THROW(broken_receiver_error{});
}
m_state->m_ready.wait(false);
m_state->ready.wait(false);
}

/**
* @brief Returns a stop_token for this task's stop source.
* @brief Get a reference to the stop_source for this task, allowing the caller to request cancellation.
*
* Only available when Stoppable=true. The token can be used to observe
* whether the associated task has been cancelled.
* Only available when Stoppable=true.
*/
[[nodiscard]]
constexpr auto token() noexcept -> stop_source::stop_token
constexpr auto stop_source() -> stop_source &
requires Stoppable
{
if (!valid()) {
LF_THROW(broken_receiver_error{});
}
return get(key(), *m_state).get_stop_token();
return m_state->stop;
Comment thread
ConorWilliams marked this conversation as resolved.
}

/**
* @brief Request that the associated task stop.
* @brief Wait for the associated task to complete and return its result, or rethrow.
*
* If the receiver was cancelled this will throw an exception.
*
* Only available when Stoppable=true. Thread-safe; may be called
* concurrently with the task executing on worker threads.
* This may only be called once; the state is consumed and the receiver becomes invalid.
*/
constexpr auto request_stop() -> void
requires Stoppable
{
if (!valid()) {
LF_THROW(broken_receiver_error{});
}
m_state->m_stop.request_stop();
}

[[nodiscard]]
constexpr auto get() && -> T {

Expand All @@ -184,17 +190,23 @@ class receiver {

LF_ASSUME(state != nullptr);

if (state->m_exception) {
std::rethrow_exception(state->m_exception);
if (state->exception) {
std::rethrow_exception(state->exception);
}

if constexpr (Stoppable) {
if (state->stop.stop_requested()) {
LF_THROW(operation_cancelled_error{});
}
Comment thread
ConorWilliams marked this conversation as resolved.
}

if constexpr (!std::is_void_v<T>) {
return std::move(state->m_return_value);
return std::move(state->return_value);
}
}

private:
std::shared_ptr<state_type> m_state;
state_handle<T, Stoppable> m_state;
};

} // namespace lf
Loading
Loading