diff --git a/CMakeLists.txt b/CMakeLists.txt index 468c0bb7..69e3e9ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ option(BOOST_CAPY_BUILD_TESTS "Build boost::capy tests" ${BUILD_TESTING}) option(BOOST_CAPY_BUILD_EXAMPLES "Build boost::capy examples" ${BOOST_CAPY_IS_ROOT}) option(BOOST_CAPY_BUILD_BENCH "Build boost::capy benchmarks" ${BOOST_CAPY_IS_ROOT}) option(BOOST_CAPY_BUILD_P2300_EXAMPLES "Build examples that depend on beman-execution (P2300)" OFF) +option(BOOST_CAPY_BUILD_NVEXEC_EXAMPLES "Build examples that depend on NVIDIA nvexec (CUDA)" OFF) option(BOOST_CAPY_MRDOCS_BUILD "Build the target for MrDocs: see mrdocs.yml" OFF) if(BOOST_CAPY_BUILD_P2300_EXAMPLES) @@ -49,6 +50,24 @@ if(BOOST_CAPY_BUILD_P2300_EXAMPLES) endif() endif() +if(BOOST_CAPY_BUILD_NVEXEC_EXAMPLES) + if(NOT BOOST_CAPY_BUILD_STDEXEC_EXAMPLES) + message(FATAL_ERROR + "BOOST_CAPY_BUILD_NVEXEC_EXAMPLES requires " + "BOOST_CAPY_BUILD_STDEXEC_EXAMPLES=ON") + endif() + if(NOT DEFINED CMAKE_CXX_STANDARD OR CMAKE_CXX_STANDARD LESS 23) + message(FATAL_ERROR + "BOOST_CAPY_BUILD_NVEXEC_EXAMPLES requires CMAKE_CXX_STANDARD >= 23") + endif() + enable_language(CUDA) + find_package(CUDAToolkit REQUIRED) + # Tell NVIDIA/stdexec to build the nvexec target when its + # FetchContent is processed (bench/ and/or the example itself). + set(STDEXEC_ENABLE_CUDA ON CACHE BOOL + "Build nvexec when configuring NVIDIA/stdexec" FORCE) +endif() + set_property(GLOBAL PROPERTY USE_FOLDERS ON) if(BOOST_CAPY_IS_ROOT AND BUILD_SHARED_LIBS) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index c5b378d0..bae311ea 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -28,6 +28,10 @@ if(BOOST_CAPY_BUILD_P2300_EXAMPLES) add_subdirectory(awaitable-sender) endif() +if(BOOST_CAPY_BUILD_NVEXEC_EXAMPLES) + add_subdirectory(gpu-pipeline) +endif() + if(TARGET Boost::asio) add_subdirectory(asio) endif() diff --git a/example/gpu-pipeline/CMakeLists.txt b/example/gpu-pipeline/CMakeLists.txt new file mode 100644 index 00000000..4873cf04 --- /dev/null +++ b/example/gpu-pipeline/CMakeLists.txt @@ -0,0 +1,58 @@ +# +# Copyright (c) 2026 Steve Gerbino +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +# CUDA was enabled at the top level when the option was flipped on. +# Honor a clean error if the user wired around it. +if(NOT CMAKE_CUDA_COMPILER) + message(FATAL_ERROR + "example/gpu-pipeline requires CUDA; " + "did you set BOOST_CAPY_BUILD_NVEXEC_EXAMPLES?") +endif() + +# Fetch NVIDIA/stdexec independently of bench so the example builds +# even with BOOST_CAPY_BUILD_BENCH=OFF. If bench has already declared +# the same content with the same name, this call is a no-op. +include(FetchContent) +FetchContent_Declare( + stdexec + GIT_REPOSITORY https://github.com/NVIDIA/stdexec + GIT_TAG 307b83c5689ea7c2e5b31561cdc428697705333e + SYSTEM + FIND_PACKAGE_ARGS NAMES stdexec +) +FetchContent_MakeAvailable(stdexec) + +if(NOT TARGET STDEXEC::nvexec) + message(FATAL_ERROR + "STDEXEC::nvexec target not found after configuring stdexec. " + "Ensure CUDA is enabled and STDEXEC_ENABLE_CUDA=ON.") +endif() + +file(GLOB_RECURSE PFILES CONFIGURE_DEPENDS + *.cu *.cuh *.hpp + CMakeLists.txt + README.md) + +source_group(TREE ${CMAKE_CURRENT_SOURCE_DIR} PREFIX "" FILES ${PFILES}) + +add_executable(capy_example_gpu_pipeline ${PFILES}) + +set_target_properties(capy_example_gpu_pipeline PROPERTIES + FOLDER "examples" + CUDA_STANDARD 20 + CUDA_STANDARD_REQUIRED ON + CUDA_SEPARABLE_COMPILATION OFF) + +target_compile_features(capy_example_gpu_pipeline PRIVATE cxx_std_23) + +target_link_libraries(capy_example_gpu_pipeline PRIVATE + Boost::capy + STDEXEC::stdexec + STDEXEC::nvexec + CUDA::cudart) diff --git a/example/gpu-pipeline/Jamfile b/example/gpu-pipeline/Jamfile new file mode 100644 index 00000000..bbc1545a --- /dev/null +++ b/example/gpu-pipeline/Jamfile @@ -0,0 +1,12 @@ +# +# Copyright (c) 2026 Steve Gerbino +# +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +# +# Official repository: https://github.com/cppalliance/capy +# + +# This example requires CUDA, nvc++, and NVIDIA/stdexec (nvexec). +# It is built only via the CMake build (BOOST_CAPY_BUILD_NVEXEC_EXAMPLES=ON); +# the b2 build does not currently provide CUDA support for capy. diff --git a/example/gpu-pipeline/README.md b/example/gpu-pipeline/README.md new file mode 100644 index 00000000..4ffb7ae4 --- /dev/null +++ b/example/gpu-pipeline/README.md @@ -0,0 +1,88 @@ +# GPU pipeline example + +This example demonstrates that `boost::capy::await_sender` and +`boost::capy::as_sender` compose with NVIDIA's `nvexec::stream_scheduler`, +not just with CPU schedulers. Two scenes: + +1. **Scene 1 (Direction 1).** A `boost::capy::task` running on + `boost::capy::thread_pool` `co_await`s a sender whose terminal action is + a real `__global__` SAXPY kernel scheduled on `nvexec::stream_scheduler`. + When the CUDA stream signals completion, the coroutine resumes on the + capy executor with the kernel's result. + +2. **Scene 2 (Direction 2).** `boost::capy::test::stream::read_some` is + exposed as a stdexec sender via `boost::capy::as_sender`, composed with + `stdexec::upon_error`, and driven by `stdexec::sync_wait`. Two runs: a + happy-path read, and a peer-close that exercises the `upon_error` arm. + + The example wraps `read_some` (a raw IoAwaitable) rather than + `boost::capy::read` (a `task>`). The bridge's `start()` + does not perform symmetric transfer to a wrapped task's own coroutine + handle, so wrapping a task in `as_sender` hangs. Wrapping a raw + IoAwaitable works because its `await_suspend` is either ready-with-data + or returns `noop_coroutine()` after stashing the continuation for the + peer to resume. + +The bridge headers (`awaitable_sender.hpp`, `sender_awaitable.hpp`) are +copied verbatim from `bench/stdexec/`; the bridge in the bench was already +written against NVIDIA/stdexec. + +## Prerequisites + +- NVIDIA GPU and driver visible to `nvidia-smi`. +- CUDA toolkit. On Arch: `pacman -S cuda`. CUDA 13.x works. +- A C++23-capable compiler with both `` support and CUDA + device-side compilation. Verified locally with clang 22 as host *and* + CUDA compiler. +- `CMAKE_CXX_STANDARD=23`. + +nvc++ from the NVHPC SDK is the nominally blessed compiler for nvexec, +but nvc++ 26.3 does not enable C++20 coroutines (no `__cpp_impl_coroutine`, +`co_return` parses as undefined). capy is built on coroutines, so nvc++ +cannot compile capy at present. Clang-cuda is the working alternative. + +## Building and running + +``` +CXX=clang++ cmake -S . -B build \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_CXX_STANDARD=23 \ + -DCMAKE_CUDA_COMPILER=clang++ \ + -DCMAKE_CUDA_HOST_COMPILER=clang++ \ + -DCMAKE_CUDA_ARCHITECTURES=89 \ + -DCUDAToolkit_ROOT=/opt/cuda \ + -DBOOST_CAPY_BUILD_STDEXEC_EXAMPLES=ON \ + -DBOOST_CAPY_BUILD_NVEXEC_EXAMPLES=ON +cmake --build build --config Release --target capy_example_gpu_pipeline +./build/example/gpu-pipeline/capy_example_gpu_pipeline +``` + +Replace `89` with your GPU's compute capability (`nvidia-smi +--query-gpu=compute_cap --format=csv,noheader`). + +## Expected output + +The exact thread ids vary, but the structure is fixed: + +``` +main thread: +--- scene 1: await_sender( gpu sender ) --- + scene1: pre-await on thread + scene1: post-await on thread + scene1: y[0] = 5 +--- scene 2a: as_sender( read_some ) happy --- + scene2 happy: read 13 bytes +--- scene 2b: as_sender( read_some ) error --- + scene2 error: upon_error fired with "eof" (n=0) +all scenes passed +``` + +Exit status is 0 on success and non-zero on any failed assertion or CUDA +error. + +## Scope + +Correctness only. No performance measurement; no GPU-side cancellation; +no multi-device topologies. See +`docs/superpowers/specs/2026-05-27-stdexec-gpu-example-design.md` for the +full scope statement. diff --git a/example/gpu-pipeline/awaitable_sender.hpp b/example/gpu-pipeline/awaitable_sender.hpp new file mode 100644 index 00000000..f13050bd --- /dev/null +++ b/example/gpu-pipeline/awaitable_sender.hpp @@ -0,0 +1,568 @@ +// +// Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_BENCH_STDEXEC_AWAITABLE_SENDER_HPP +#define BOOST_CAPY_BENCH_STDEXEC_AWAITABLE_SENDER_HPP + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost::capy { + +// Query CPO for obtaining a Capy-compatible executor +// from a P2300 environment. The returned object must +// satisfy Capy's Executor concept. Environments that +// host IoAwaitables via the as_sender bridge must +// answer this query. +struct get_io_executor_t +{ + static consteval auto query( + stdexec::forwarding_query_t) noexcept -> bool + { + return true; + } + + template + requires requires(Env const& env) { + env.query( + std::declval()); + } + auto operator()(Env const& env) const noexcept + { + return env.query(*this); + } +}; + +inline constexpr get_io_executor_t get_io_executor{}; + +namespace detail { + +template +struct has_tuple_protocol : std::false_type {}; + +template +struct has_tuple_protocol::type, + typename std::tuple_element<0, T>::type>> + : std::true_type {}; + +template::value> +struct is_ec_outcome : std::is_same {}; + +template +struct is_ec_outcome + : std::bool_constant< + std::tuple_size_v == 1 && + std::is_same_v< + std::tuple_element_t<0, T>, + std::error_code>> +{}; + +template +constexpr bool is_ec_outcome_v = + std::is_same_v || + is_ec_outcome::value; + +template::value> +struct is_compound_ec_result : std::false_type {}; + +template +struct is_compound_ec_result + : std::bool_constant< + std::tuple_size_v >= 2 && + std::is_same_v< + std::tuple_element_t<0, T>, + std::error_code>> +{}; + +template +constexpr bool is_compound_ec_result_v = + is_compound_ec_result::value; + +struct frame_cb +{ + void (*resume)(frame_cb*); + void (*destroy)(frame_cb*); + void* data; +}; + +// Return the concrete executor by value, trying get_io_executor +// on the env first, then falling back to the start scheduler. +template +auto resolve_executor(Env const& env) +{ + if constexpr (requires { get_io_executor(env); }) + return get_io_executor(env); + else + return stdexec::get_start_scheduler(env) + .query(get_io_executor_t{}); +} + +} // namespace detail + +/** Sender that wraps an IoAwaitable. + + When connected or co_awaited, the bridge queries + the receiver's or promise's environment for a + Capy-compatible executor via get_io_executor. + The executor is stored by value in the operation + state and used to construct the io_env passed to + the IoAwaitable's await_suspend. + + @tparam IoAw The IoAwaitable type. +*/ +template +struct awaitable_sender +{ + using sender_concept = stdexec::sender_tag; + + using result_type = decltype( + std::declval&>().await_resume()); + + static auto make_sigs() + { + if constexpr (std::is_void_v) + return stdexec::completion_signatures< + stdexec::set_value_t(), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t()>{}; + else if constexpr ( + detail::is_compound_ec_result_v) + return stdexec::completion_signatures< + stdexec::set_value_t( + std::tuple_element_t<1, result_type>), + stdexec::set_error_t(std::error_code), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t()>{}; + else if constexpr ( + detail::is_ec_outcome_v) + return stdexec::completion_signatures< + stdexec::set_value_t(), + stdexec::set_error_t(std::error_code), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t()>{}; + else + return stdexec::completion_signatures< + stdexec::set_value_t(result_type), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t()>{}; + } + + using completion_signatures = decltype(make_sigs()); + + IoAw aw_; + + template + struct op_state + { + using operation_state_concept = + stdexec::operation_state_tag; + + // Concrete executor type deduced from the receiver's + // environment. Stored by value to avoid the dangling + // pointer that executor_ref would produce when the + // source is a temporary (scheduler query or prop). + using executor_type = decltype( + detail::resolve_executor( + stdexec::get_env( + std::declval()))); + + IoAw aw_; + Receiver rcvr_; + executor_type ex_; + io_env env_; + detail::frame_cb cb_; + + op_state(IoAw aw, Receiver rcvr) + : aw_(std::move(aw)) + , rcvr_(std::move(rcvr)) + , ex_{} + , cb_{} + { + } + + op_state(op_state const&) = delete; + op_state(op_state&&) = delete; + op_state& operator=(op_state const&) = delete; + op_state& operator=(op_state&&) = delete; + + static void + on_resume(detail::frame_cb* p) noexcept + { + auto* self = static_cast(p->data); + self->complete(); + } + + static void + on_destroy(detail::frame_cb*) noexcept + { + } + + void complete() noexcept + { + try + { + if constexpr (std::is_void_v) + { + aw_.await_resume(); + if(env_.stop_token.stop_requested()) + stdexec::set_stopped( + std::move(rcvr_)); + else + stdexec::set_value( + std::move(rcvr_)); + } + else if constexpr ( + detail::is_compound_ec_result_v) + { + auto result = aw_.await_resume(); + if(env_.stop_token.stop_requested()) + { + stdexec::set_stopped( + std::move(rcvr_)); + } + else + { + auto ec = get<0>(result); + if(!ec) + stdexec::set_value( + std::move(rcvr_), + get<1>(std::move(result))); + else + stdexec::set_error( + std::move(rcvr_), ec); + } + } + else if constexpr ( + detail::is_ec_outcome_v) + { + auto result = aw_.await_resume(); + if(env_.stop_token.stop_requested()) + { + stdexec::set_stopped( + std::move(rcvr_)); + } + else + { + std::error_code ec; + if constexpr (std::is_same_v< + result_type, std::error_code>) + ec = result; + else + ec = get<0>(result); + if(!ec) + stdexec::set_value( + std::move(rcvr_)); + else + stdexec::set_error( + std::move(rcvr_), ec); + } + } + else + { + auto result = aw_.await_resume(); + if(env_.stop_token.stop_requested()) + stdexec::set_stopped( + std::move(rcvr_)); + else + stdexec::set_value( + std::move(rcvr_), + std::move(result)); + } + } + catch(...) + { + stdexec::set_error( + std::move(rcvr_), + std::current_exception()); + } + } + + void start() noexcept + { + auto renv = stdexec::get_env(rcvr_); + ex_ = detail::resolve_executor(renv); + + std::stop_token st; + if constexpr (requires { + { renv.query(stdexec::get_stop_token_t{}) } + -> std::convertible_to; }) + { + st = renv.query( + stdexec::get_stop_token_t{}); + } + + env_ = io_env{ex_, st, nullptr}; + + if(aw_.await_ready()) + { + complete(); + return; + } + + cb_.resume = &on_resume; + cb_.destroy = &on_destroy; + cb_.data = this; + + auto h = std::coroutine_handle<>::from_address( + static_cast(&cb_)); + + auto resumed = detail::call_await_suspend( + &aw_, h, &env_); + if(resumed == h) + complete(); + } + }; + + template + auto connect(Receiver rcvr) && + -> op_state + { + return op_state( + std::move(aw_), std::move(rcvr)); + } + + template + auto connect(Receiver rcvr) const& + -> op_state + { + return op_state(aw_, std::move(rcvr)); + } + + // Bypass stdexec's sender_awaitable when co_awaited + // from a coroutine that provides get_io_executor or + // a start scheduler with get_io_executor. Adapts the + // IoAwaitable's 2-arg await_suspend to the standard + // 1-arg protocol. + template + auto as_awaitable(Promise& promise) && + { + auto penv = promise.get_env(); + auto ex = detail::resolve_executor(penv); + + std::stop_token st; + if constexpr (requires { + { penv.query(stdexec::get_stop_token_t{}) } + -> std::convertible_to; }) + { + st = penv.query( + stdexec::get_stop_token_t{}); + } + + using executor_type = decltype(ex); + + struct aw + { + IoAw aw_; + executor_type ex_; + std::stop_token st_; + io_env env_; + + bool await_ready() noexcept + { + return aw_.await_ready(); + } + + std::coroutine_handle<> + await_suspend(std::coroutine_handle<> h) + { + env_ = io_env{ex_, st_, nullptr}; + return aw_.await_suspend(h, &env_); + } + + auto await_resume() + { + return aw_.await_resume(); + } + }; + + return aw{std::move(aw_), std::move(ex), st, {}}; + } +}; + +/** Create a stdexec sender from an IoAwaitable. + + The bridge routes the awaitable's result through sender + channels based on its type: + + - `void` - calls `set_value()`. + - `error_code` (or a single-element tuple-like whose + element 0 is `error_code`) - calls `set_value()` + when the code is zero, `set_error(ec)` otherwise. + - Any other single value `T` - calls `set_value(T)`. + - Compound results whose element 0 is `error_code` + with additional elements are rejected at compile + time. Wrap the operation in a `task` + that inspects the compound result and returns the + error code. + + When connected or co_awaited, the bridge queries the + receiver's or promise's environment for a Capy executor + via get_io_executor. The environment must answer this + query with an object satisfying Capy's Executor concept. + + @param aw The IoAwaitable to wrap. + @return A sender whose completion channels reflect + the awaitable's result type. +*/ +template +auto as_sender(IoAw&& aw) +{ + return awaitable_sender>{ + std::forward(aw)}; +} + +// split_ec: sender adapter that routes error_code to +// set_value() or set_error(ec) at runtime. + +namespace detail { + +template +struct split_ec_sender +{ + using sender_concept = stdexec::sender_tag; + + using completion_signatures = + stdexec::completion_signatures< + stdexec::set_value_t(), + stdexec::set_error_t(std::error_code), + stdexec::set_error_t(std::exception_ptr), + stdexec::set_stopped_t()>; + + Sender sndr_; + + template + struct ec_receiver + { + using receiver_concept = stdexec::receiver_tag; + + Receiver rcvr_; + + auto get_env() const noexcept + { + return stdexec::get_env(rcvr_); + } + + void set_value(std::error_code ec) && noexcept + { + if (!ec) + stdexec::set_value( + std::move(rcvr_)); + else + stdexec::set_error( + std::move(rcvr_), ec); + } + + void set_value() && noexcept + { + stdexec::set_value( + std::move(rcvr_)); + } + + template + void set_error(E&& e) && noexcept + { + stdexec::set_error( + std::move(rcvr_), + std::forward(e)); + } + + void set_stopped() && noexcept + { + stdexec::set_stopped( + std::move(rcvr_)); + } + }; + + template + struct op_state + { + using operation_state_concept = + stdexec::operation_state_tag; + + using inner_op_t = decltype( + stdexec::connect( + std::declval(), + std::declval>())); + + inner_op_t op_; + + op_state(Sender sndr, Receiver rcvr) + : op_(stdexec::connect( + std::move(sndr), + ec_receiver{std::move(rcvr)})) + { + } + + op_state(op_state const&) = delete; + op_state(op_state&&) = delete; + op_state& operator=(op_state const&) = delete; + op_state& operator=(op_state&&) = delete; + + void start() noexcept + { + stdexec::start(op_); + } + }; + + template + auto connect(Receiver rcvr) && + -> op_state + { + return op_state( + std::move(sndr_), std::move(rcvr)); + } + + template + auto connect(Receiver rcvr) const& + -> op_state + { + return op_state( + sndr_, std::move(rcvr)); + } +}; + +} // namespace detail + +/** Split an `error_code` value channel into success and error channels. + + Takes a sender that completes with `set_value(error_code)` and + routes it at runtime: `set_value()` when the code is zero, + `set_error(ec)` otherwise. No exceptions. + + @param sndr The predecessor sender. + @return A sender completing with `set_value()`, + `set_error(error_code)`, or `set_stopped()`. +*/ +template +auto split_ec(Sender&& sndr) +{ + return detail::split_ec_sender< + std::decay_t>{ + std::forward(sndr)}; +} + +} // namespace boost::capy + +#endif diff --git a/example/gpu-pipeline/gpu_pipeline.cu b/example/gpu-pipeline/gpu_pipeline.cu new file mode 100644 index 00000000..471c2fc4 --- /dev/null +++ b/example/gpu-pipeline/gpu_pipeline.cu @@ -0,0 +1,320 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// +// Scene 1 (Direction 1): a capy coroutine awaits a sender whose +// terminal action is a real CUDA __global__ kernel scheduled on +// nvexec::stream_scheduler. +// +// Scene 2 (Direction 2): a capy IoAwaitable (capy::read over a +// deterministic in-process stream pair) is exposed as a stdexec +// sender, then composed with stdexec::upon_error, and consumed +// via stdexec::sync_wait. Both the happy path and an injected-eof +// path are exercised. +// + +#include "awaitable_sender.hpp" +#include "sender_awaitable.hpp" + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace capy = boost::capy; +namespace ex = stdexec; + +namespace { + +void cuda_check(cudaError_t e, char const* where) +{ + if (e != cudaSuccess) + { + std::cerr << where << ": " << cudaGetErrorString(e) << '\n'; + std::abort(); + } +} + +// Scene 1: capy coroutine awaits a nvexec-scheduled SAXPY kernel. +// Returns the host-side value at y[0] after kernel completion. +// +// Pipeline: +// just(N, a, x, y) +// | continues_on(gpu) switch onto nvexec stream +// | nvexec::launch(<<>>) __global__ kernel on stream +// | continues_on(cpu) transfer completion back to host +// +// The trailing continues_on(cpu) is load-bearing: the as-written +// nvexec adapters complete on device, but the bridge's +// bridge_receiver is host-only. The host hop must happen before the +// bridge connects. +capy::task +scene1(nvexec::stream_scheduler gpu, + stdexec::scheduler auto cpu) +{ + constexpr int N = 1 << 16; + constexpr int BLOCK = 256; + constexpr int GRID = (N + BLOCK - 1) / BLOCK; + constexpr float a = 3.0f; + + float* d_x = nullptr; + float* d_y = nullptr; + cuda_check(cudaMalloc(&d_x, N * sizeof(float)), "cudaMalloc x"); + cuda_check(cudaMalloc(&d_y, N * sizeof(float)), "cudaMalloc y"); + + std::vector h_x(N, 1.0f); + std::vector h_y(N, 2.0f); + cuda_check(cudaMemcpy(d_x, h_x.data(), + N * sizeof(float), cudaMemcpyHostToDevice), "H2D x"); + cuda_check(cudaMemcpy(d_y, h_y.data(), + N * sizeof(float), cudaMemcpyHostToDevice), "H2D y"); + + auto const enter_tid = std::this_thread::get_id(); + std::cout + << " scene1: pre-await on thread " + << enter_tid << '\n'; + + co_await capy::await_sender( + ex::just(N, a, d_x, d_y) + | ex::continues_on(gpu) + | nvexec::launch({.grid_size = GRID, .block_size = BLOCK}, + [] (cudaStream_t, int n, float k, float const* x, float* y) { + int i = blockIdx.x * blockDim.x + threadIdx.x; + if (i < n) + y[i] = k * x[i] + y[i]; + }) + | ex::continues_on(cpu)); + + auto const resume_tid = std::this_thread::get_id(); + std::cout + << " scene1: post-await on thread " + << resume_tid << '\n'; + + // The resume thread is a capy worker that never touched CUDA; it + // has no current context. cudaSetDevice establishes one before + // the cleanup calls run. + cuda_check(cudaSetDevice(0), "cudaSetDevice"); + + float h_y0 = 0.0f; + cuda_check(cudaMemcpy(&h_y0, d_y, + sizeof(float), cudaMemcpyDeviceToHost), "D2H y[0]"); + + cuda_check(cudaFree(d_x), "cudaFree x"); + cuda_check(cudaFree(d_y), "cudaFree y"); + + co_return h_y0; +} + +// Adapter run_async-like driver: kicks off scene1 on the capy +// thread_pool, blocks the caller until it completes, and returns +// the result via the supplied storage. +void +run_scene1(capy::thread_pool& pool, float& out) +{ + std::latch done(1); + std::exception_ptr err; + + auto on_ok = [&](float v) noexcept { + out = v; + done.count_down(); + }; + auto on_err = [&](std::exception_ptr ep) noexcept { + err = ep; + done.count_down(); + }; + + nvexec::stream_context stream_ctx; + exec::static_thread_pool cpu_pool(1); + capy::run_async( + pool.get_executor(), + on_ok, + on_err)(scene1( + stream_ctx.get_scheduler(), + cpu_pool.get_scheduler())); + + done.wait(); + if (err) + std::rethrow_exception(err); +} + +// Scene 2: capy::read exposed as a stdexec sender, composed with +// stdexec::upon_error, driven by sync_wait. write_env injects the +// capy executor that the as_sender bridge needs to drive the +// underlying IoAwaitable. +// stream::read_some returns a raw IoAwaitable, which the bridge +// expects. (capy::read returns a task>, and the +// bridge's start() does not perform symmetric transfer to the +// task's own handle, so wrapping a task hangs.) +void +scene2_happy_path(capy::thread_pool& pool) +{ + constexpr std::string_view payload = "payload bytes"; + + auto [a, b] = capy::test::make_stream_pair(); + b.provide(payload); + + char buf[64]; + auto sndr = ex::write_env( + capy::as_sender( + a.read_some(capy::mutable_buffer(buf, sizeof buf))), + ex::prop{capy::get_io_executor, pool.get_executor()}) + | ex::upon_error([](auto e) noexcept -> std::size_t { + if constexpr (std::is_same_v< + std::decay_t, std::error_code>) + { + std::cerr + << " scene2 happy: unexpected error: " + << e.message() << '\n'; + } + std::abort(); + }); + + auto result = ex::sync_wait(std::move(sndr)); + assert(result.has_value()); + auto const [n] = *result; + assert(n == payload.size()); + assert(std::string_view(buf, n) == payload); + + std::cout + << " scene2 happy: read " << n + << " bytes\n"; +} + +void +scene2_error_path(capy::thread_pool& pool) +{ + auto [a, b] = capy::test::make_stream_pair(); + b.close(); + + char buf[64]; + bool fired = false; + std::error_code observed; + + auto sndr = ex::write_env( + capy::as_sender( + a.read_some(capy::mutable_buffer(buf, sizeof buf))), + ex::prop{capy::get_io_executor, pool.get_executor()}) + | ex::upon_error([&](auto e) noexcept -> std::size_t { + if constexpr (std::is_same_v< + std::decay_t, std::error_code>) + { + fired = true; + observed = e; + } + return 0; + }); + + auto result = ex::sync_wait(std::move(sndr)); + assert(result.has_value()); + auto const [n] = *result; + + assert(fired); + assert(observed); + std::cout + << " scene2 error: upon_error fired with \"" + << observed.message() << "\" (n=" << n << ")\n"; +} + +} // namespace + +// Minimal "send a value through the GPU, get it back" coroutine. +// Sanity check that the smallest plausible shape compiles and runs. +namespace mini { + +capy::task +gpu_add_one(int input, + nvexec::stream_scheduler gpu, + stdexec::scheduler auto cpu) +{ + int* d_out = nullptr; + cudaMalloc(&d_out, sizeof(int)); + + co_await capy::await_sender( + ex::just(input, d_out) + | ex::continues_on(gpu) + | nvexec::launch({.grid_size = 1, .block_size = 1}, + [](cudaStream_t, int x, int* y) { *y = x + 1; }) + | ex::continues_on(cpu)); + + cudaSetDevice(0); + int h_out; + cudaMemcpy(&h_out, d_out, sizeof(int), + cudaMemcpyDeviceToHost); + cudaFree(d_out); + co_return h_out; +} + +void +run(capy::thread_pool& pool, int input, int& out) +{ + std::latch done(1); + std::exception_ptr err; + + nvexec::stream_context stream_ctx; + exec::static_thread_pool cpu_pool(1); + capy::run_async( + pool.get_executor(), + [&](int v) noexcept { out = v; done.count_down(); }, + [&](std::exception_ptr ep) noexcept { + err = ep; done.count_down(); })( + gpu_add_one(input, + stream_ctx.get_scheduler(), + cpu_pool.get_scheduler())); + + done.wait(); + if (err) std::rethrow_exception(err); +} + +} // namespace mini + +int main() +{ + std::cout + << "main thread: " + << std::this_thread::get_id() << '\n'; + + capy::thread_pool pool; + + std::cout << "--- scene 0: minimal gpu_add_one ---\n"; + int out = 0; + mini::run(pool, 41, out); + std::cout << " scene 0: 41 + 1 -> " << out << '\n'; + assert(out == 42); + + std::cout << "--- scene 1: await_sender( gpu sender ) ---\n"; + float y0 = 0.0f; + run_scene1(pool, y0); + std::cout << " scene1: y[0] = " << y0 << '\n'; + // a*x + y = 3*1 + 2 = 5 + assert(y0 == 5.0f); + + std::cout << "--- scene 2a: as_sender( read_some ) happy ---\n"; + scene2_happy_path(pool); + + std::cout << "--- scene 2b: as_sender( read_some ) error ---\n"; + scene2_error_path(pool); + + std::cout << "all scenes passed\n"; + return 0; +} diff --git a/example/gpu-pipeline/sender_awaitable.hpp b/example/gpu-pipeline/sender_awaitable.hpp new file mode 100644 index 00000000..c27ac505 --- /dev/null +++ b/example/gpu-pipeline/sender_awaitable.hpp @@ -0,0 +1,429 @@ +// +// Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com) +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_BENCH_STDEXEC_SENDER_AWAITABLE_HPP +#define BOOST_CAPY_BENCH_STDEXEC_SENDER_AWAITABLE_HPP + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost::capy { + +namespace detail { + +struct stopped_t {}; + +struct operation_cancelled {}; + +struct bridge_env +{ + std::stop_token st_; + + auto query( + stdexec::get_stop_token_t const&) + const noexcept + { + return st_; + } +}; + +template +using sender_single_value_t = + stdexec::value_types_of_t< + Sender, + bridge_env, + std::tuple, + std::type_identity_t>; + +// Detect whether a sender can complete with +// set_error(std::error_code). +template +struct has_error_code_completion +{ + template + struct checker + { + static constexpr bool value = + (std::is_same_v< + Es, std::error_code> || ...); + }; + + static constexpr bool value = + stdexec::error_types_of_t< + Sender, + bridge_env, + checker>::value; +}; + +template +constexpr bool has_error_code_v = + has_error_code_completion::value; + +// Variant when sender can complete with +// set_error(error_code): separate slot so +// error_code is not wrapped in exception_ptr. +template +using ec_result_variant = std::variant< + std::monostate, + ValueTuple, + std::error_code, + std::exception_ptr, + stopped_t>; + +// Variant when sender does not complete with +// set_error(error_code). +template +using no_ec_result_variant = std::variant< + std::monostate, + ValueTuple, + std::exception_ptr, + stopped_t>; + +template +using result_variant = std::conditional_t< + HasEc, + ec_result_variant, + no_ec_result_variant>; + +// Bridge receiver that stores the sender's +// completion result and resumes the coroutine. +// Uses an atomic flag shared with await_suspend +// to handle synchronous completion safely: +// whichever side (set_value or await_suspend) +// arrives second is responsible for resumption. +template +struct bridge_receiver +{ + using receiver_concept = + stdexec::receiver_t; + + result_variant* result_; + std::coroutine_handle<> cont_; + std::stop_token st_; + std::atomic* done_; + + auto get_env() const noexcept -> bridge_env + { + return {st_}; + } + + void resume_if_ready() noexcept + { + if(done_->exchange( + true, std::memory_order_acq_rel)) + cont_.resume(); + } + + template + void set_value(Args&&... args) && noexcept + { + result_->template emplace<1>( + std::forward(args)...); + resume_if_ready(); + } + + template + void set_error(E&& e) && noexcept + { + if constexpr ( + HasEc && + std::is_same_v< + std::decay_t, + std::error_code>) + result_->template emplace<2>( + std::forward(e)); + else if constexpr ( + std::is_same_v< + std::decay_t, + std::exception_ptr>) + { + constexpr auto idx = HasEc ? 3 : 2; + result_->template emplace( + std::forward(e)); + } + else + { + constexpr auto idx = HasEc ? 3 : 2; + result_->template emplace( + std::make_exception_ptr( + std::forward(e))); + } + resume_if_ready(); + } + + void set_stopped() && noexcept + { + constexpr auto idx = HasEc ? 4 : 3; + result_->template emplace( + stopped_t{}); + resume_if_ready(); + } +}; + +} // namespace detail + +/** Awaitable that bridges a stdexec sender + into a Capy coroutine. + + Satisfies IoAwaitable. When co_awaited inside + a capy::task, connects the sender to a bridge + receiver, starts the operation, and resumes + the coroutine when the sender completes. + + Uses an atomic exchange protocol to handle + senders that complete synchronously during + start(): whichever side arrives second + (receiver or await_suspend) resumes the + coroutine. + + The bridge inspects the sender's error + completion signatures at compile time. If the + sender can complete with + set_error(std::error_code), await_resume + returns io_result so the error code is a + value, not an exception. Otherwise + await_resume returns the value directly and + genuine exceptions are rethrown. + + @tparam Sender The stdexec sender type. +*/ +template +struct [[nodiscard]] sender_awaitable +{ + static constexpr bool has_ec = + detail::has_error_code_v; + + using value_tuple = + detail::sender_single_value_t; + using variant_type = + detail::result_variant< + value_tuple, has_ec>; + using receiver_type = + detail::bridge_receiver< + value_tuple, has_ec>; + using op_state_type = decltype( + stdexec::connect( + std::declval(), + std::declval())); + + Sender sndr_; + variant_type result_{}; + + alignas(op_state_type) + unsigned char op_buf_[sizeof(op_state_type)]; + bool op_constructed_ = false; + std::atomic done_{false}; + + explicit sender_awaitable(Sender sndr) + : sndr_(std::move(sndr)) + { + } + + sender_awaitable(sender_awaitable&& o) + noexcept( + std::is_nothrow_move_constructible_v< + Sender>) + : sndr_(std::move(o.sndr_)) + { + } + + sender_awaitable( + sender_awaitable const&) = delete; + sender_awaitable& operator=( + sender_awaitable const&) = delete; + sender_awaitable& operator=( + sender_awaitable&&) = delete; + + ~sender_awaitable() + { + if(op_constructed_) + std::launder( + reinterpret_cast( + op_buf_))->~op_state_type(); + } + + bool await_ready() const noexcept + { + return false; + } + + std::coroutine_handle<> + await_suspend( + std::coroutine_handle<> h, + io_env const* env) + { + ::new(op_buf_) op_state_type( + stdexec::connect( + std::move(sndr_), + receiver_type{ + &result_, h, + env->stop_token, &done_})); + op_constructed_ = true; + stdexec::start( + *std::launder( + reinterpret_cast< + op_state_type*>( + op_buf_))); + + // If the sender completed during start(), + // the receiver already stored the result. + // Return h to resume without suspending. + if(done_.exchange( + true, std::memory_order_acq_rel)) + return h; + return std::noop_coroutine(); + } + + auto await_resume() + { + if constexpr (has_ec) + return await_resume_ec(); + else + return await_resume_no_ec(); + } + +private: + // Sender can complete with + // set_error(error_code). Return io_result + // so the error code is a value, not an + // exception. + auto await_resume_ec() + { + // exception_ptr at index 3 + if(result_.index() == 3) + std::rethrow_exception( + std::get<3>(result_)); + + if constexpr ( + std::tuple_size_v< + value_tuple> == 0) + { + // stopped at index 4 + if(result_.index() == 4) + return io_result<>{ + make_error_code( + error::canceled)}; + if(result_.index() == 2) + return io_result<>{ + std::get<2>(result_)}; + return io_result<>{}; + } + else if constexpr ( + std::tuple_size_v< + value_tuple> == 1) + { + using T = std::tuple_element_t< + 0, value_tuple>; + if(result_.index() == 4) + return io_result{ + make_error_code( + error::canceled), T{}}; + if(result_.index() == 2) + return io_result{ + std::get<2>(result_), T{}}; + return io_result{ + {}, + std::get<0>( + std::get<1>( + std::move(result_)))}; + } + else + { + if(result_.index() == 4) + return io_result{ + make_error_code( + error::canceled), value_tuple{}}; + if(result_.index() == 2) + return io_result{ + std::get<2>(result_), value_tuple{}}; + return io_result{ + {}, + std::get<1>( + std::move(result_))}; + } + } + + // Sender does not complete with + // set_error(error_code). Return the value + // directly; rethrow exceptions. + auto await_resume_no_ec() + { + // exception_ptr at index 2 + if(result_.index() == 2) + std::rethrow_exception( + std::get<2>(result_)); + // stopped at index 3 + if(result_.index() == 3) + throw detail::operation_cancelled{}; + + if constexpr ( + std::tuple_size_v< + value_tuple> == 0) + return; + else if constexpr ( + std::tuple_size_v< + value_tuple> == 1) + return std::get<0>( + std::get<1>( + std::move(result_))); + else + return std::get<1>( + std::move(result_)); + } +}; + +/** Create an IoAwaitable from a stdexec sender. + + If the sender can complete with + set_error(std::error_code), the returned + awaitable yields io_result so the error code + is a value, not an exception. Otherwise the + awaitable yields the value directly. + + @par Example + @code + capy::task compute(auto sched) + { + auto result = co_await await_sender( + stdexec::schedule(sched) + | stdexec::then( + [] { return 42; })); + co_return result; + } + @endcode + + @param sndr The sender to bridge. + @return An IoAwaitable that can be co_awaited + in a capy::task. +*/ +template +auto await_sender(Sender&& sndr) +{ + return sender_awaitable< + std::decay_t>( + std::forward(sndr)); +} + +} // namespace boost::capy + +#endif