diff --git a/src/core/concepts/scheduler.cxx b/src/core/concepts/scheduler.cxx index 137f3197..bd081ce6 100644 --- a/src/core/concepts/scheduler.cxx +++ b/src/core/concepts/scheduler.cxx @@ -22,8 +22,8 @@ using context_t = typename std::remove_cvref_t::context_type; */ export template concept scheduler = - has_context_typedef && requires (Sch scheduler, sched_handle> handle) { - { scheduler.post(handle) } -> std::same_as; + has_context_typedef && requires (Sch &&scheduler, sched_handle> handle) { + { static_cast(scheduler).post(handle) } -> std::same_as; }; } // namespace lf diff --git a/src/core/promise.cxx b/src/core/promise.cxx index 24b1908a..6ce7ff60 100644 --- a/src/core/promise.cxx +++ b/src/core/promise.cxx @@ -137,15 +137,15 @@ constexpr auto final_suspend_full(Context &context, frame_t *frame) noe continue; } - if (owner) { - // We were unable to resume the parent and we were its owner, as the - // resuming thread will take ownership of the parent's we must give it up. - context.stack().release(std::move(release_key)); - } - return parent->handle(); } + if (owner) { + // We were unable to resume the parent and we were its owner, as the + // resuming thread will take ownership of the parent's we must give it up. + context.stack().release(std::move(release_key)); + } + // We did not win the join-race, we cannot dereference the parent pointer now // as the frame may now be freed by the winner. Parent has not reached join // or we are not the last child to complete. We are now out of jobs, we must @@ -182,6 +182,7 @@ constexpr auto final_suspend_trailing(Context &context, frame_t *parent } return final_suspend_full(context, parent); } + return parent->handle(); } @@ -421,6 +422,11 @@ struct join_awaitable { LF_ASSUME(self.frame->joins == k_u16_max); // Outside parallel regions so can touch non-atomically. + // + // A task that completes by responding to cancellation will drop any + // exceptions however, a task may still throw exceptions even if cancelled. + // Here we must rethrow even if cancelled because we can't re-suspend at + // this point. if constexpr (LF_COMPILER_EXCEPTIONS) { if (self.frame->exception_bit) [[unlikely]] { self.rethrow_exception(); diff --git a/src/core/receiver.cxx b/src/core/receiver.cxx index 73289b7a..3e78b9e2 100644 --- a/src/core/receiver.cxx +++ b/src/core/receiver.cxx @@ -19,103 +19,107 @@ export struct broken_receiver_error final : libfork_exception { } }; +export struct operation_cancelled_error final : libfork_exception { + [[nodiscard]] + constexpr auto what() const noexcept -> const char * override { + return "operation was cancelled"; + } +}; + /** * @brief Shared state between a scheduled task and its receiver handle. - * - * @tparam T The return type of the scheduled coroutine. - * @tparam Stoppable If true, the state owns a stop_source that can be used - * to cancel the root task externally. - * - * Constructors forward arguments for in-place construction of the return value. - * Internal access is gated behind a hidden friend: `get(key_t, receiver_state&)`. */ -export template -class receiver_state { - public: - struct empty {}; +template +struct hidden_receiver_state { - /// Default construction — return value is default-initialised (or empty for void). - constexpr receiver_state() = default; + struct empty_1 {}; + struct empty_2 {}; - /// In-place construction of the return value from arbitrary args. - template - requires (!std::is_void_v) && std::constructible_from - constexpr explicit receiver_state(Args &&...args) : m_return_value(std::forward(args)...) {} + alignas(k_new_align) std::array buffer{}; - /** - * @brief Request that the associated task stop. - * - * Only available when Stoppable=true. Safe to call before scheduling — - * the root frame checks stop_requested() before executing the task body. - */ - constexpr auto request_stop() noexcept -> void - requires Stoppable - { - m_stop.request_stop(); - } + [[no_unique_address]] + std::conditional_t, empty_1, T> return_value{}; - private: - template - friend class receiver; + std::exception_ptr exception; + std::atomic_flag ready; - /** - * @brief Internal accessor returned by `get(key_t, receiver_state&)`. - * - * Not reachable by name from outside this translation unit because view - * is a private nested type. Callers use `auto` with the hidden friend. - */ - struct view { - receiver_state *p; + [[no_unique_address]] + std::conditional_t stop; - constexpr void set_exception(std::exception_ptr e) noexcept { p->m_exception = std::move(e); } + constexpr hidden_receiver_state() = default; - constexpr void notify_ready() noexcept { - p->m_ready.test_and_set(); - p->m_ready.notify_one(); - } + template + requires (sizeof...(Args) > 0) && std::constructible_from + constexpr explicit(sizeof...(Args) == 1) hidden_receiver_state(Args &&...args) + : return_value(std::forward(args)...) {} +}; - [[nodiscard]] - constexpr auto return_value_address() noexcept -> T * - requires (!std::is_void_v) - { - return std::addressof(p->m_return_value); - } +/// Convenience alias — used throughout the core partitions. +template +using state_handle = std::shared_ptr>; - [[nodiscard]] - constexpr auto get_stop_token() noexcept -> stop_source::stop_token - requires Stoppable - { - return p->m_stop.token(); - } - }; +/** + * @brief Lightweight move-only handle owning a pre-allocated root task state. + * + * Construction allocates a `hidden_receiver_state` which embeds a + * 1 KiB aligned buffer; the root coroutine frame is placement-constructed + * into that buffer by `schedule`. + * + * Constructors mirror `make_shared` / `allocate_shared`: + * + * recv_state s; // default-init return value + * recv_state s{v1, v2}; // in-place init: T{v1, v2} + * recv_state s{allocator_arg, alloc}; // default-init, custom allocator + * recv_state s{allocator_arg, alloc, v1, v2}; // in-place init + custom allocator + */ +export template +class recv_state { + using state_type = hidden_receiver_state; - /** - * @brief Hidden friend accessor for internal library use. - * - * Only callable via ADL when a `key_t` is available (i.e. by calling `key()`). - * Returns a `view` proxy to manipulate the state's private members. - */ + public: + /// Default: value-initialise via `std::make_shared`. + constexpr recv_state() : m_ptr(std::make_shared()) {} + + /// Value-init from args: forwards `args` to `hidden_receiver_state`'s constructor + /// (in-place construction of the return value) via `std::make_shared`. + template + requires (sizeof...(Args) > 0) && std::constructible_from + constexpr explicit(sizeof...(Args) == 1) recv_state(Args &&...args) + : m_ptr(std::make_shared(std::forward(args)...)) {} + + /// Allocator-aware, default return value: allocate via `std::allocate_shared`. + template + constexpr recv_state(std::allocator_arg_t, Alloc const &alloc) + : m_ptr(std::allocate_shared(alloc)) {} + + /// Allocator-aware with value-init args. + template + requires std::constructible_from + constexpr recv_state(std::allocator_arg_t, Alloc const &alloc, Args &&...args) + : m_ptr(std::allocate_shared(alloc, std::forward(args)...)) {} + + // Move-only. + constexpr recv_state(recv_state &&) noexcept = default; + constexpr auto operator=(recv_state &&) noexcept -> recv_state & = default; + constexpr recv_state(recv_state const &) = delete; + constexpr auto operator=(recv_state const &) -> recv_state & = delete; + + private: [[nodiscard]] - friend constexpr auto get(key_t, receiver_state &self) noexcept -> view { - return {&self}; + friend constexpr auto get(key_t, recv_state &&self) noexcept -> state_handle { + return std::move(self.m_ptr); } - [[no_unique_address]] - std::conditional_t, empty, T> m_return_value{}; - std::exception_ptr m_exception; - std::atomic_flag m_ready; - - [[no_unique_address]] - std::conditional_t m_stop; + state_handle m_ptr; }; export template class receiver { - using state_type = receiver_state; + using state_type = hidden_receiver_state; public: - constexpr receiver(key_t, std::shared_ptr &&state) : m_state(std::move(state)) {} + constexpr receiver(key_t, state_handle state) noexcept : m_state(std::move(state)) {} // Move only constexpr receiver(receiver &&) noexcept = default; @@ -123,57 +127,59 @@ class receiver { constexpr auto operator=(receiver &&) noexcept -> receiver & = default; constexpr auto operator=(const receiver &) -> receiver & = delete; + /** + * @brief Test if connected to a receiver state. + */ [[nodiscard]] constexpr auto valid() const noexcept -> bool { return m_state != nullptr; } + /** + * @brief Test if the associated task has completed (either successfully or with an exception/cancellation). + */ [[nodiscard]] constexpr auto ready() const -> bool { if (!valid()) { LF_THROW(broken_receiver_error{}); } - return m_state->m_ready.test(); + return m_state->ready.test(); } + /** + * @brief Wait for the associated task to complete (either successfully or with an exception/cancellation). + * + * May be called multiple times. + */ constexpr void wait() const { if (!valid()) { LF_THROW(broken_receiver_error{}); } - m_state->m_ready.wait(false); + m_state->ready.wait(false); } /** - * @brief Returns a stop_token for this task's stop source. + * @brief Get a reference to the stop_source for this task, allowing the caller to request cancellation. * - * Only available when Stoppable=true. The token can be used to observe - * whether the associated task has been cancelled. + * Only available when Stoppable=true. */ [[nodiscard]] - constexpr auto token() noexcept -> stop_source::stop_token + constexpr auto stop_source() -> stop_source & requires Stoppable { if (!valid()) { LF_THROW(broken_receiver_error{}); } - return get(key(), *m_state).get_stop_token(); + return m_state->stop; } /** - * @brief Request that the associated task stop. + * @brief Wait for the associated task to complete and return its result, or rethrow. + * + * If the receiver was cancelled this will throw an exception. * - * Only available when Stoppable=true. Thread-safe; may be called - * concurrently with the task executing on worker threads. + * This may only be called once; the state is consumed and the receiver becomes invalid. */ - constexpr auto request_stop() -> void - requires Stoppable - { - if (!valid()) { - LF_THROW(broken_receiver_error{}); - } - m_state->m_stop.request_stop(); - } - [[nodiscard]] constexpr auto get() && -> T { @@ -184,17 +190,23 @@ class receiver { LF_ASSUME(state != nullptr); - if (state->m_exception) { - std::rethrow_exception(state->m_exception); + if (state->exception) { + std::rethrow_exception(state->exception); + } + + if constexpr (Stoppable) { + if (state->stop.stop_requested()) { + LF_THROW(operation_cancelled_error{}); + } } if constexpr (!std::is_void_v) { - return std::move(state->m_return_value); + return std::move(state->return_value); } } private: - std::shared_ptr m_state; + state_handle m_state; }; } // namespace lf diff --git a/src/core/root.cxx b/src/core/root.cxx index 00990260..fa4b6285 100644 --- a/src/core/root.cxx +++ b/src/core/root.cxx @@ -17,7 +17,15 @@ import :task; namespace lf { -// TODO: allocator aware! -> IDEA embed in frame/state +/** + * @brief Thrown if the root coroutine frame is too large for the embedded buffer. + */ +export struct root_alloc_error final : libfork_exception { + [[nodiscard]] + constexpr auto what() const noexcept -> const char * override { + return "root coroutine frame exceeds hidden_receiver_state buffer size"; + } +}; struct get_frame_t {}; @@ -27,6 +35,29 @@ struct root_task { frame_type frame{Checkpoint{}}; + /// Owns a ref to the hidden_receiver_state hosting this frame's buffer. + std::shared_ptr keep_alive; + + template + constexpr explicit promise_type(state_handle const &recv, Args const &...) noexcept + : keep_alive(recv) {} + + template + static auto + operator new(std::size_t size, state_handle const &recv, Args const &...) -> void * { + + LF_ASSUME(recv != nullptr); + + if (size > recv->buffer.size()) { + LF_THROW(root_alloc_error{}); + } + + return recv->buffer.data(); + } + + /// No-op: the buffer is owned by the hidden_receiver_state, not the frame. + static auto operator delete(void * /*ptr*/, std::size_t /*size*/) noexcept -> void {} + struct frame_awaitable : std::suspend_never { frame_type *frame; [[nodiscard]] @@ -54,7 +85,31 @@ struct root_task { constexpr static auto initial_suspend() noexcept -> std::suspend_always { return {}; } - constexpr static auto final_suspend() noexcept -> std::suspend_never { return {}; } + /** + * @brief Custom final_suspend. + * + * The root coroutine frame lives inside the hidden_receiver_state's embedded + * buffer, so the hidden_receiver_state must outlive the frame teardown. + * + * 1. `std::exchange` the keep-alive shared_ptr into a local on the + * host stack, leaving the promise member null. + * 2. `handle.destroy()` — runs parameter + promise destructors (including + * the now-null `keep_alive`) and our no-op `operator delete`. + * No frame-memory access occurs after the handle returns. + * 3. On return, the stack-local `shared_ptr` dies; if its ref + * was the last, it destroys the hidden_receiver_state cleanly — we are + * no longer executing inside the buffer. + */ + struct final_awaiter : std::suspend_always { + void await_suspend(std::coroutine_handle handle) const noexcept { + std::shared_ptr local = std::exchange(handle.promise().keep_alive, nullptr); + LF_ASSUME(local != nullptr); + handle.destroy(); + // `local` released here — possibly freeing hidden_receiver_state on return. + } + }; + + constexpr static auto final_suspend() noexcept -> final_awaiter { return {}; } constexpr static void return_void() noexcept {} @@ -72,18 +127,17 @@ template [[nodiscard]] auto // -root_pkg(std::shared_ptr> recv, Fn fn, Args... args) - -> root_task> { +root_pkg(state_handle recv, Fn fn, Args... args) -> root_task> { // This should be resumed on a valid context. LF_ASSUME(thread_local_context != nullptr); using checkpoint = checkpoint_t; - // This is a pointer to the current root_task's frame + // Pointer to this root_task's own frame. frame_type *root = not_null(co_await get_frame_t{}); - // Now we do a manual "call" invocation. + // Manual "call" invocation of the user-supplied task. using result_type = async_result_t; using promise_type = promise_type; @@ -100,7 +154,7 @@ root_pkg(std::shared_ptr> recv, Fn fn, Args... args // Potentially throwing child = get(key(), ctx_invoke_t{}(std::move(fn), std::move(args)...)); } LF_CATCH_ALL { - get(key(), *recv).set_exception(std::current_exception()); + recv->exception = std::current_exception(); goto cleanup; } @@ -113,7 +167,7 @@ root_pkg(std::shared_ptr> recv, Fn fn, Args... args LF_ASSUME(child->frame.kind == category::call); if constexpr (!std::is_void_v>) { - child->return_address = get(key(), *recv).return_value_address(); + child->return_address = std::addressof(recv->return_value); } // Begin normal execution of the child task, it will clean itself @@ -124,21 +178,22 @@ root_pkg(std::shared_ptr> recv, Fn fn, Args... args // // - Normal return // - Exception - // - Cancellation + // - Cancellation (in which case it would have dropped any exceptions) // - // We return any exception stashed unconditionally + // For symmetry with a normal task we unconditionally propagate exceptions here, + // effectively this is an `await_resume`. if constexpr (LF_COMPILER_EXCEPTIONS) { if (root->exception_bit) { // The child threw an exception, propagate it to the receiver. - get(key(), *recv).set_exception(extract_exception(root)); + recv->exception = extract_exception(root); } } cleanup: - // Now do that which we would otherwise do at a final suspend. // Notify the receiver that the task is done. - get(key(), *recv).notify_ready(); + recv->ready.test_and_set(); + recv->ready.notify_one(); LF_ASSUME(root->steals == 0); LF_ASSUME(root->joins == k_u16_max); diff --git a/src/core/schedule.cxx b/src/core/schedule.cxx index 5980ea43..8c5e510a 100644 --- a/src/core/schedule.cxx +++ b/src/core/schedule.cxx @@ -20,9 +20,6 @@ import :receiver; namespace lf { -export template -concept schedulable_return = std::is_void_v || (std::default_initializable && std::movable); - export struct schedule_error final : libfork_exception { [[nodiscard]] constexpr auto what() const noexcept -> const char * override { @@ -33,38 +30,24 @@ export struct schedule_error final : libfork_exception { template concept decay_copyable = std::convertible_to>; -template -concept schedulable_decayed = - async_invocable && schedulable_return>; - -export template -concept schedulable = schedulable_decayed, Context, std::decay_t...>; - -template -using invoke_decay_result_t = async_result_t, Context, std::decay_t...>; - -template -using schedule_state_t = receiver_state, Stoppable>; - -export template - requires schedulable -using schedule_result_t = receiver>; - /** - * @brief Schedule a function with a pre-allocated receiver state. + * @brief Schedule a function using a caller-provided `recv_state`. * - * This is the primary overload: the caller provides a shared_ptr to the - * receiver state, allowing custom allocation. The stop_token bound to the - * root frame depends on whether the state is cancellable. + * This will create a root task that stores decayed copies of `Fn` and + * `Args...` in its frame, then post it to the scheduler. The root task must + * then be resumed by a worker which will perform the invocation of `Fn`. * - * This function is strongly exception safe. + * The return address/exception and possibly stop token of the root task are + * bound to the provided `recv_state` and can be observed by the caller via the + * returned `receiver`. + * + * Strongly exception safe. */ export template - requires schedulable, Args...> && - std::same_as, Args...>> + requires async_invocable_to, R, context_t, std::decay_t...> +[[nodiscard("Fire and forget is an anti-pattern")]] constexpr auto -schedule(Sch &&sch, std::shared_ptr> state, Fn &&fn, Args &&...args) - -> receiver { +schedule(Sch &&sch, recv_state state, Fn &&fn, Args &&...args) -> receiver { using context_type = context_t; @@ -72,8 +55,13 @@ schedule(Sch &&sch, std::shared_ptr> state, Fn &&fn LF_THROW(schedule_error{}); } - // Package takes shared ownership of the state; fine if this throws. - root_task task = root_pkg(state, std::forward(fn), std::forward(args)...); + state_handle state_ptr = get(key(), std::move(state)); + + LF_ASSUME(state_ptr != nullptr); + + // root_pkg's operator new may throw root_alloc_error if the frame is + // too large; if so, `state_ptr` goes out of scope and destroys the state. + root_task task = root_pkg(state_ptr, std::forward(fn), std::forward(args)...); LF_ASSUME(task.promise != nullptr); @@ -81,39 +69,45 @@ schedule(Sch &&sch, std::shared_ptr> state, Fn &&fn task.promise->frame.parent = nullptr; if constexpr (Stoppable) { - task.promise->frame.stop_token = get(key(), *state).get_stop_token(); + task.promise->frame.stop_token = state_ptr->stop.token(); } else { task.promise->frame.stop_token = stop_source::stop_token{}; // non-cancellable root } LF_TRY { - // TODO: forward sch + modify concept - sch.post(sched_handle{key(), &task.promise->frame}); + std::forward(sch).post(sched_handle{key(), &task.promise->frame}); // If ^ didn't throw then the root_task will destroy itself at the final suspend. } LF_CATCH_ALL { + // Otherwise, if it did throw, we must clean up task.promise->frame.handle().destroy(); LF_RETHROW; } - return {key(), std::move(state)}; + return {key(), std::move(state_ptr)}; } +template +concept schedulable_return = std::is_void_v || (std::default_initializable && std::movable); + +template +concept default_schedulable = + async_invocable && schedulable_return>; + +template +using async_decay_result_t = async_result_t, Context, std::decay_t...>; + /** - * @brief Convenience overload: allocates receiver state via make_shared. + * @brief Convenience overload: default-constructs a non-cancellable recv_state. * - * Defaults to non-cancellable (Stoppable=false). Delegates to the primary - * overload above. + * Uses the default allocator (`make_shared`) for all allocations. */ export template - requires schedulable, Args...> + requires default_schedulable, context_t, std::decay_t...> +[[nodiscard("Fire and forget is an anti-pattern")]] constexpr auto -schedule(Sch &&sch, Fn &&fn, Args &&...args) -> receiver, Args...>> { - - using context_type = context_t; - using R = invoke_decay_result_t; - - auto state = std::make_shared>(); - +schedule(Sch &&sch, Fn &&fn, Args &&...args) -> receiver, Args...>> { + using result_type = async_decay_result_t, Args...>; + recv_state state; return schedule( std::forward(sch), std::move(state), std::forward(fn), std::forward(args)...); } diff --git a/src/core/stop.cxx b/src/core/stop.cxx index 4b3b8c52..6d23d9ca 100644 --- a/src/core/stop.cxx +++ b/src/core/stop.cxx @@ -7,7 +7,7 @@ import libfork.utils; namespace lf { /** - * @brief Similar to a linked-list of std::stop_sorce but with an embedded stop_state. + * @brief Similar to a linked-list of std::stop_source but with an embedded stop_state. */ export class stop_source { public: @@ -72,11 +72,14 @@ export class stop_source { /** * @brief Get a handle to this stop source. */ - constexpr auto token() const noexcept -> stop_token { return stop_token{this}; } + [[nodiscard]] + constexpr auto token() const noexcept -> stop_token { + return stop_token{this}; + } /** * @brief Returns true if any stop source in the ancestor chain has been stopped. - + * * Complexity: O(chain depth). Every task that creates a child_scope adds one * node to the chain, so deeply-nested task hierarchies pay proportionally more * per stop check. diff --git a/test/src/cancel.cpp b/test/src/cancel.cpp index c648d8a7..386e59ee 100644 --- a/test/src/cancel.cpp +++ b/test/src/cancel.cpp @@ -35,8 +35,13 @@ import libfork; // source propagates through the chain to the inner scope. // // H. Stoppable receiver / pre-cancelled root: -// receiver_state::request_stop() before schedule() triggers -// the goto-cleanup fast path in root.cxx — task body never executes. +// recv_state + receiver::request_stop() immediately after +// schedule() — covers the goto-cleanup fast path in root.cxx on +// schedulers where the task has not yet begun running. Racy in +// principle, so the test only asserts completion, not that the body +// was skipped. +// +// I. Stress tests: concurrent cancellation under contention. namespace { @@ -353,17 +358,170 @@ auto test_nested_child_scope_chain(lf::env) -> lf::task // ============================================================ // H. Stoppable receiver / pre-cancelled root. // -// receiver_state::request_stop() before schedule() makes the root -// frame's stop_token immediately satisfied, triggering the goto-cleanup -// fast path in root.cxx so the task body never runs. +// Using recv_state + receiver::request_stop() exercises the +// goto-cleanup fast path in root.cxx when stop is requested before the +// worker resumes the task. // ============================================================ template -auto pre_cancelled_root_fn(lf::env, bool *ran) -> lf::task { - *ran = true; +auto pre_cancelled_root_fn(lf::env, std::atomic *ran) -> lf::task { + ran->store(true, std::memory_order_relaxed); co_return; } +// ============================================================ +// I. Stress tests: concurrent cancellation under contention. +// +// These tests fork many tasks across multiple threads to maximize the +// probability of hitting the concurrent paths in final_suspend_full, +// final_suspend_trailing, and join_awaitable::await_suspend with +// stop_requested() == true. +// ============================================================ + +// --- Leaf task: does a tiny amount of work then returns. +struct leaf_work { + template + static auto operator()(lf::env, std::atomic &count) -> lf::task { + count.fetch_add(1, std::memory_order_relaxed); + co_return; + } +}; + +// --- Fan-out many forks, one sibling cancels the scope mid-flight. +// +// With enough forks and threads, some children will be in-flight when +// stop fires. This exercises: +// - final_suspend_trailing: child completes, wins join race, sees stop +// - final_suspend_full: iterative ancestor climb on stopped frames +// - awaitable::await_suspend: children launched after stop are skipped +// - join_awaitable: stop detected at join with steals > 0 + +struct stress_fan_cancel_inner { + template + static auto operator()(lf::env, lf::stop_source &my_stop, std::atomic &count, int width) + -> lf::task { + auto sc = co_await lf::scope(); + + // Fork width children; the last one cancels this scope. + for (int i = 0; i < width; ++i) { + if (i == width / 2) { + co_await sc.fork_drop(cancel_source{}, my_stop, count); + } else { + co_await sc.fork_drop(leaf_work{}, count); + } + } + co_await sc.join(); + // Should not be reached — cancel_source fired mid-fan. + count.fetch_add(10000, std::memory_order_relaxed); + } +}; + +template +auto stress_fan_cancel(lf::env, int width) -> lf::task { + std::atomic count = 0; + auto outer = co_await lf::child_scope(); + co_await outer.call_drop(stress_fan_cancel_inner{}, outer, count, width); + co_await outer.join(); +} + +// --- Deep recursive fork tree with cancellation at a specific depth. +// +// This creates a binary tree of forks. When a node at the target depth +// fires, it cancels the scope. This stresses: +// - final_suspend_full loop: many frames in the ancestor chain may be +// stopped, causing iterative climbing +// - final_suspend_trailing: stolen forks completing concurrently +// - Stack ownership transfer under cancellation + +struct tree_cancel_node { + template + static auto + operator()(lf::env, lf::stop_source &root_stop, std::atomic &count, int depth, int cancel_at) + -> lf::task { + count.fetch_add(1, std::memory_order_relaxed); + + if (depth <= 0) { + co_return; + } + + if (depth == cancel_at) { + root_stop.request_stop(); + co_return; + } + + auto sc = co_await lf::scope(); + co_await sc.fork_drop(tree_cancel_node{}, root_stop, count, depth - 1, cancel_at); + co_await sc.fork_drop(tree_cancel_node{}, root_stop, count, depth - 1, cancel_at); + co_await sc.join(); + } +}; + +template +auto stress_tree_cancel(lf::env, int depth, int cancel_at) -> lf::task { + std::atomic count = 0; + auto outer = co_await lf::child_scope(); + co_await outer.call_drop(tree_cancel_node{}, outer, count, depth, cancel_at); + co_await outer.join(); +} + +// --- Repeated schedule + cancel: exercises root.cxx stop path and receiver. +// +// Rapidly schedules tasks and immediately cancels them. The race between +// the worker picking up the task and the cancellation request stresses +// the root_pkg pre-cancelled path and final_suspend from root frames. + +struct busy_leaf { + template + static auto operator()(lf::env) -> lf::task { + co_return; + } +}; + +// --- Many-fork cancel with nested child_scopes at multiple levels. +// +// An outer child_scope forks N tasks. Each inner task creates its own +// child_scope and forks M children. Mid-way, the outer scope is cancelled. +// This tests chain propagation under concurrent fork completion, hitting +// final_suspend_full's iterative climb through multiple nested stopped +// frames. + +struct nested_inner_worker { + template + static auto operator()(lf::env, std::atomic &count, int width) -> lf::task { + auto sc = co_await lf::scope(); + for (int i = 0; i < width; ++i) { + co_await sc.fork_drop(leaf_work{}, count); + } + co_await sc.join(); + } +}; + +struct nested_cancel_orchestrator { + template + static auto operator()(lf::env, lf::stop_source &root_stop, std::atomic &count, int width) + -> lf::task { + auto sc = co_await lf::scope(); + for (int i = 0; i < width; ++i) { + if (i == width / 2) { + // Cancel after forking half the work + root_stop.request_stop(); + } + co_await sc.fork_drop(nested_inner_worker{}, count, width); + } + co_await sc.join(); + // Should not be reached + count.fetch_add(100000, std::memory_order_relaxed); + } +}; + +template +auto stress_nested_cancel(lf::env, int width) -> lf::task { + std::atomic count = 0; + auto outer = co_await lf::child_scope(); + co_await outer.call_drop(nested_cancel_orchestrator{}, outer, count, width); + co_await outer.join(); +} + // ============================================================ // Run all tests against a given scheduler // ============================================================ @@ -439,14 +597,79 @@ void tests(Sch &scheduler) { REQUIRE(std::move(recv).get()); } - SECTION("stoppable receiver: pre-cancelled root task body never executes") { - bool ran = false; - auto state = std::make_shared>(); - state->request_stop(); + SECTION("stoppable receiver: recv_state + request_stop completes cleanly") { + std::atomic ran = false; + lf::recv_state state; auto recv = lf::schedule(scheduler, std::move(state), pre_cancelled_root_fn, &ran); REQUIRE(recv.valid()); + recv.stop_source().request_stop(); + +#if LF_COMPILER_EXCEPTIONS + REQUIRE_THROWS_AS(std::move(recv).get(), lf::operation_cancelled_error); +#else + recv.wait(); +#endif + + // The task body may or may not have run depending on scheduler timing; + // what matters is that get() completes without error. + std::ignore = ran.load(); + } + + // --- Stress tests (paths C/D/E under contention) --- + + SECTION("stress: fan-out cancel, width=16") { + auto recv = schedule(scheduler, stress_fan_cancel, 16); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: fan-out cancel, width=64") { + auto recv = schedule(scheduler, stress_fan_cancel, 64); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: tree cancel depth=6, cancel at depth=3") { + auto recv = schedule(scheduler, stress_tree_cancel, 6, 3); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: tree cancel depth=8, cancel at depth=1 (near leaf)") { + auto recv = schedule(scheduler, stress_tree_cancel, 8, 1); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: tree cancel depth=8, cancel at depth=7 (near root)") { + auto recv = schedule(scheduler, stress_tree_cancel, 8, 7); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: nested child_scope cancel, width=8") { + auto recv = schedule(scheduler, stress_nested_cancel, 8); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + + SECTION("stress: nested child_scope cancel, width=32") { + auto recv = schedule(scheduler, stress_nested_cancel, 32); + REQUIRE(recv.valid()); std::move(recv).get(); - REQUIRE(!ran); + } + + SECTION("stress: rapid schedule + cancel") { + for (int i = 0; i < 50; ++i) { + lf::recv_state state; + auto recv = lf::schedule(scheduler, std::move(state), busy_leaf{}); + recv.stop_source().request_stop(); +#if LF_COMPILER_EXCEPTIONS + REQUIRE_THROWS_AS(std::move(recv).get(), lf::operation_cancelled_error); +#else + recv.wait(); +#endif + } } #if LF_COMPILER_EXCEPTIONS @@ -500,3 +723,51 @@ TEMPLATE_TEST_CASE("Busy cancel", "[cancel]", mono_busy_thread_pool, poly_busy_t } } } + +namespace { + +// Stress tests repeated at higher thread counts to maximize contention. +template +void stress_tests(Sch &scheduler) { + using Ctx = lf::context_t; + + SECTION("stress: repeated fan cancel") { + for (int rep = 0; rep < 20; ++rep) { + auto recv = schedule(scheduler, stress_fan_cancel, 32); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + } + + SECTION("stress: repeated tree cancel") { + for (int rep = 0; rep < 20; ++rep) { + auto recv = schedule(scheduler, stress_tree_cancel, 7, 3); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + } + + SECTION("stress: repeated nested cancel") { + for (int rep = 0; rep < 20; ++rep) { + auto recv = schedule(scheduler, stress_nested_cancel, 16); + REQUIRE(recv.valid()); + std::move(recv).get(); + } + } +} + +} // namespace + +TEMPLATE_TEST_CASE("Busy cancel stress", "[cancel][stress]", mono_busy_thread_pool, poly_busy_thread_pool) { + + STATIC_REQUIRE(lf::scheduler); + + std::size_t max_thr = std::min(8UZ, static_cast(std::thread::hardware_concurrency())); + + for (std::size_t thr = 2; thr <= max_thr; thr *= 2) { + DYNAMIC_SECTION("threads=" << thr) { + TestType scheduler{thr}; + stress_tests(scheduler); + } + } +} diff --git a/test/src/schedule.cpp b/test/src/schedule.cpp index f4c82aca..1f951a56 100644 --- a/test/src/schedule.cpp +++ b/test/src/schedule.cpp @@ -28,6 +28,14 @@ auto throwing_function(env /*unused*/) -> task { co_return; } +// Task whose argument is a big enough value-type to push the root coroutine +// frame past the 1 KiB embedded buffer in hidden_receiver_state. +template +auto big_arg_function(env /*unused*/, std::array /*unused*/) + -> task { + co_return; +} + template void simple_tests(Sch &scheduler) { SECTION("void") { @@ -42,12 +50,47 @@ void simple_tests(Sch &scheduler) { REQUIRE(std::move(recv).get() == true); } + SECTION("explicit recv_state") { + lf::recv_state state; + auto recv = schedule(scheduler, std::move(state), simple_function>); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get() == true); + } + + SECTION("stoppable recv_state") { + lf::recv_state state; + auto recv = schedule(scheduler, std::move(state), simple_function>); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get() == true); + } + + SECTION("recv_state with explicit allocator") { + std::allocator alloc; + lf::recv_state state{std::allocator_arg, alloc}; + auto recv = schedule(scheduler, std::move(state), simple_function>); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get() == true); + } + + SECTION("recv_state with value-init") { + // Pre-initialise the return slot; the task overwrites it. + lf::recv_state state{false}; + auto recv = schedule(scheduler, std::move(state), simple_function>); + REQUIRE(recv.valid()); + REQUIRE(std::move(recv).get() == true); + } + #if LF_COMPILER_EXCEPTIONS SECTION("throwing") { auto recv = schedule(scheduler, throwing_function>); REQUIRE(recv.valid()); REQUIRE_THROWS_AS(std::move(recv).get(), std::runtime_error); } + + SECTION("frame too large -> root_alloc_error") { + std::array big{}; + REQUIRE_THROWS_AS(schedule(scheduler, big_arg_function>, big), lf::root_alloc_error); + } #endif }