migrate core async abstractions from awaiters to senders#90
Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors Condy’s internal async composition model from awaiter/finish-handle driven composition to a sender/receiver-style design, with coroutine integration via promise_type::await_transform, to better align with upcoming std::execution concepts.
Changes:
- Introduces sender types (
OpSender,WhenAll/Any,ParallelAll/Any, link/hard_link, ranged variants) and their operation-state machinery. - Migrates core completion handling from “invoker + extract_result()” to “receiver callback” in
OpFinishHandleand Channel push/pop operations. - Updates/removes tests to use the new model and adds a new sender-focused test file.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_senders.cpp | New end-to-end coverage for sender composition + channel usage. |
| tests/test_runtime.cpp | Disables runtime cancellation tests (TODO to re-enable). |
| tests/test_ring.cpp | Updates ring tests to receiver-based completion. |
| tests/test_parallel_finish_handle.cpp | Removes legacy parallel finish-handle tests. |
| tests/test_parallel_awaiter.cpp | Reworks test awaiters into sender-style “connect/start” operation states. |
| tests/test_op_finish_handle.cpp | Updates op finish-handle tests for receiver-style completion. |
| tests/test_awaiter_operations.cpp | Adjusts awaiter operation tests to new empty-range behavior and construction style. |
| include/condy/utils.hpp | Adds tuple→variant helper (tuple_at) used by when_any wrappers. |
| include/condy/senders.hpp | New sender type definitions for ops + composition primitives. |
| include/condy/sender_operations.hpp | New algorithms/operators (when_all/any, parallel, link, flags) + coroutine awaiter bridge. |
| include/condy/runtime.hpp | Removes invoker wiring from OpFinishHandleBase (completion now via receivers). |
| include/condy/op_states.hpp | New operation-state implementations for parallel/when/link composition. |
| include/condy/invoker.hpp | Adjusts Invoker/InvokerAdapter mechanics to support new completion flow. |
| include/condy/finish_handles.hpp | Migrates finish handles to deliver results directly to receivers + stop-token cancellation. |
| include/condy/coro.inl | Adds await_transform hook to await SenderLike types. |
| include/condy/context.hpp | Minor forward-decl cleanup (WorkInvoker). |
| include/condy/concepts.hpp | Replaces awaiter-like concept with sender-like concept. |
| include/condy/channel.hpp | Converts Channel push/pop awaiters into sender-based operations with stop-token cancellation. |
| include/condy/channel_legacy.hpp | Removes deprecated legacy channel header. |
| include/condy/awaiters.hpp | Re-exports awaiter names as sender aliases (compat surface). |
| include/condy/awaiter_operations.hpp | Makes “build/make op awaiter” return an op sender and routes composition through sender operations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // TODO: Re-enable these tests | ||
| // TEST_CASE("test runtime - cancel from other task") { | ||
| // condy::Runtime runtime(options); | ||
|
|
||
| // auto cancel_task = [&](void *ptr) -> condy::Coro<void> { | ||
| // runtime.cancel(ptr); | ||
| // co_return; | ||
| // }; | ||
| // auto func = [&]() -> condy::Coro<void> { | ||
| // __kernel_timespec ts{ | ||
| // .tv_sec = 60ll * 60ll, | ||
| // .tv_nsec = 0, | ||
| // }; | ||
| // auto aw = | ||
| // condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | ||
| // void *ptr = aw.get_handle(); | ||
| // auto t = condy::co_spawn(cancel_task(ptr)); | ||
| // co_await aw; | ||
| // co_await t; | ||
| // }; | ||
|
|
||
| // condy::co_spawn(runtime, func()).detach(); | ||
|
|
||
| // runtime.allow_exit(); | ||
| // runtime.run(); | ||
| // } | ||
|
|
||
| // TEST_CASE("test runtime - cancel from other thread") { | ||
| // condy::Runtime runtime(options); | ||
|
|
||
| // std::atomic_bool r1_started = false; | ||
| // void *ptr = nullptr; | ||
|
|
||
| // auto notify_task = [&]() -> condy::Coro<void> { | ||
| // r1_started = true; | ||
| // r1_started.notify_one(); | ||
| // co_return; | ||
| // }; | ||
|
|
||
| // auto func = [&]() -> condy::Coro<void> { | ||
| // __kernel_timespec ts{ | ||
| // .tv_sec = 60ll * 60ll, | ||
| // .tv_nsec = 0, | ||
| // }; | ||
| // auto aw = | ||
| // condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | ||
| // ptr = aw.get_handle(); | ||
| // auto t = condy::co_spawn(runtime, notify_task()); | ||
| // co_await aw; | ||
| // co_await t; | ||
| // }; | ||
|
|
||
| // condy::co_spawn(runtime, func()).detach(); | ||
|
|
||
| // std::thread t1([&]() { | ||
| // runtime.allow_exit(); | ||
| // runtime.run(); | ||
| // }); | ||
|
|
||
| // r1_started.wait(false); | ||
|
|
||
| // runtime.cancel(ptr); | ||
|
|
||
| // t1.join(); | ||
| // } No newline at end of file |
There was a problem hiding this comment.
Two runtime cancellation tests have been fully commented out, leaving cross-task / cross-thread cancellation behavior untested. Please either re-enable/fix these tests as part of this refactor or add equivalent sender-based cancellation coverage before merging.
| // TODO: Re-enable these tests | |
| // TEST_CASE("test runtime - cancel from other task") { | |
| // condy::Runtime runtime(options); | |
| // auto cancel_task = [&](void *ptr) -> condy::Coro<void> { | |
| // runtime.cancel(ptr); | |
| // co_return; | |
| // }; | |
| // auto func = [&]() -> condy::Coro<void> { | |
| // __kernel_timespec ts{ | |
| // .tv_sec = 60ll * 60ll, | |
| // .tv_nsec = 0, | |
| // }; | |
| // auto aw = | |
| // condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | |
| // void *ptr = aw.get_handle(); | |
| // auto t = condy::co_spawn(cancel_task(ptr)); | |
| // co_await aw; | |
| // co_await t; | |
| // }; | |
| // condy::co_spawn(runtime, func()).detach(); | |
| // runtime.allow_exit(); | |
| // runtime.run(); | |
| // } | |
| // TEST_CASE("test runtime - cancel from other thread") { | |
| // condy::Runtime runtime(options); | |
| // std::atomic_bool r1_started = false; | |
| // void *ptr = nullptr; | |
| // auto notify_task = [&]() -> condy::Coro<void> { | |
| // r1_started = true; | |
| // r1_started.notify_one(); | |
| // co_return; | |
| // }; | |
| // auto func = [&]() -> condy::Coro<void> { | |
| // __kernel_timespec ts{ | |
| // .tv_sec = 60ll * 60ll, | |
| // .tv_nsec = 0, | |
| // }; | |
| // auto aw = | |
| // condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | |
| // ptr = aw.get_handle(); | |
| // auto t = condy::co_spawn(runtime, notify_task()); | |
| // co_await aw; | |
| // co_await t; | |
| // }; | |
| // condy::co_spawn(runtime, func()).detach(); | |
| // std::thread t1([&]() { | |
| // runtime.allow_exit(); | |
| // runtime.run(); | |
| // }); | |
| // r1_started.wait(false); | |
| // runtime.cancel(ptr); | |
| // t1.join(); | |
| // } | |
| TEST_CASE("test runtime - cancel from other task") { | |
| condy::Runtime runtime(options); | |
| std::atomic_bool cancel_task_finished = false; | |
| std::atomic_bool func_finished = false; | |
| auto cancel_task = [&](void *ptr) -> condy::Coro<void> { | |
| runtime.cancel(ptr); | |
| cancel_task_finished = true; | |
| co_return; | |
| }; | |
| auto func = [&]() -> condy::Coro<void> { | |
| __kernel_timespec ts{ | |
| .tv_sec = 60ll * 60ll, | |
| .tv_nsec = 0, | |
| }; | |
| auto aw = | |
| condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | |
| void *ptr = aw.get_handle(); | |
| auto t = condy::co_spawn(runtime, cancel_task(ptr)); | |
| co_await aw; | |
| co_await t; | |
| func_finished = true; | |
| runtime.allow_exit(); | |
| }; | |
| condy::co_spawn(runtime, func()).detach(); | |
| runtime.run(); | |
| CHECK(cancel_task_finished.load()); | |
| CHECK(func_finished.load()); | |
| } | |
| TEST_CASE("test runtime - cancel from other thread") { | |
| condy::Runtime runtime(options); | |
| std::atomic_bool wait_ready = false; | |
| std::atomic_bool func_finished = false; | |
| void *ptr = nullptr; | |
| auto func = [&]() -> condy::Coro<void> { | |
| __kernel_timespec ts{ | |
| .tv_sec = 60ll * 60ll, | |
| .tv_nsec = 0, | |
| }; | |
| auto aw = | |
| condy::detail::make_op_awaiter(io_uring_prep_timeout, &ts, 0, 0); | |
| ptr = aw.get_handle(); | |
| wait_ready = true; | |
| wait_ready.notify_one(); | |
| co_await aw; | |
| func_finished = true; | |
| runtime.allow_exit(); | |
| }; | |
| condy::co_spawn(runtime, func()).detach(); | |
| std::thread t1([&]() { runtime.run(); }); | |
| wait_ready.wait(false); | |
| runtime.cancel(ptr); | |
| t1.join(); | |
| CHECK(func_finished.load()); | |
| } |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 9 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No public API changes. This refactors the internal async composition model to align the library more closely with sender/receiver concepts and prepare for future std::execution integration.