diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c86b17d41..ceaa60f3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,13 +12,12 @@ name: CI on: pull_request: - branches: [ master, develop, develop-2 ] + branches: [ master, develop* ] workflow_dispatch: push: branches: - master - - develop - - develop-2 + - develop* - bugfix/** - feature/** - fix/** @@ -96,11 +95,31 @@ jobs: liburing-dev curl zip unzip tar pkg-config + # Use the capy branch with the same name as this build's branch + # (for pull requests: the source branch, then the target branch); + # fall back to capy's default branch. + - name: Resolve Capy branch + id: capy-ref + shell: bash + run: | + url=https://github.com/cppalliance/capy.git + ref= + for candidate in "${{ github.head_ref }}" "${{ github.base_ref }}" "${{ github.ref_name }}"; do + if [ -n "$candidate" ] && git ls-remote --exit-code --heads "$url" "$candidate" > /dev/null; then + ref="$candidate" + break + fi + done + if [ -z "$ref" ]; then + ref=$(git ls-remote --symref "$url" HEAD | awk '/^ref:/ { sub("refs/heads/", "", $2); print $2 }') + fi + echo "ref=$ref" >> "$GITHUB_OUTPUT" + - name: Clone Capy uses: actions/checkout@v4 with: repository: cppalliance/capy - ref: ${{ (github.ref_name == 'master' && github.ref_name) || 'develop' }} + ref: ${{ steps.capy-ref.outputs.ref }} path: capy-root # Test dependency of corosio @@ -768,11 +787,31 @@ jobs: with: path: corosio-root + # Use the capy branch with the same name as this build's branch + # (for pull requests: the source branch, then the target branch); + # fall back to capy's default branch. + - name: Resolve Capy branch + id: capy-ref + shell: bash + run: | + url=https://github.com/cppalliance/capy.git + ref= + for candidate in "${{ github.head_ref }}" "${{ github.base_ref }}" "${{ github.ref_name }}"; do + if [ -n "$candidate" ] && git ls-remote --exit-code --heads "$url" "$candidate" > /dev/null; then + ref="$candidate" + break + fi + done + if [ -z "$ref" ]; then + ref=$(git ls-remote --symref "$url" HEAD | awk '/^ref:/ { sub("refs/heads/", "", $2); print $2 }') + fi + echo "ref=$ref" >> "$GITHUB_OUTPUT" + - name: Clone Capy uses: actions/checkout@v4 with: repository: cppalliance/capy - ref: ${{ (github.ref_name == 'master' && github.ref_name) || 'develop' }} + ref: ${{ steps.capy-ref.outputs.ref }} path: capy-root - name: Clone Boost diff --git a/.github/workflows/code-coverage.yml b/.github/workflows/code-coverage.yml index 96c1a47bc..4c07b6f4e 100644 --- a/.github/workflows/code-coverage.yml +++ b/.github/workflows/code-coverage.yml @@ -22,7 +22,7 @@ on: push: branches: - master - - develop + - develop* paths: - 'src/**' - 'include/**' @@ -150,11 +150,31 @@ jobs: echo "CMAKE_WOLFSSL_INCLUDE=${vcpkg_installed}/include" >> $GITHUB_ENV echo "CMAKE_WOLFSSL_LIBRARY=${vcpkg_installed}/lib/libwolfssl.a" >> $GITHUB_ENV + # Use the capy branch with the same name as this build's branch + # (for pull requests: the source branch, then the target branch); + # fall back to capy's default branch. + - name: Resolve Capy branch + id: capy-ref + shell: bash + run: | + url=https://github.com/cppalliance/capy.git + ref= + for candidate in "${{ github.head_ref }}" "${{ github.base_ref }}" "${{ github.ref_name }}"; do + if [ -n "$candidate" ] && git ls-remote --exit-code --heads "$url" "$candidate" > /dev/null; then + ref="$candidate" + break + fi + done + if [ -z "$ref" ]; then + ref=$(git ls-remote --symref "$url" HEAD | awk '/^ref:/ { sub("refs/heads/", "", $2); print $2 }') + fi + echo "ref=$ref" >> "$GITHUB_OUTPUT" + - name: Clone Capy uses: actions/checkout@v6 with: repository: cppalliance/capy - ref: ${{ (github.ref_name == 'master' && github.ref_name) || 'develop' }} + ref: ${{ steps.capy-ref.outputs.ref }} path: capy-root - name: Clone Boost @@ -371,11 +391,31 @@ jobs: echo "CMAKE_WOLFSSL_LIBRARY=${vcpkg_installed}/lib/libwolfssl.a" >> $GITHUB_ENV echo "CMAKE_OPENSSL_ROOT=C:/msys64/mingw64" >> $GITHUB_ENV + # Use the capy branch with the same name as this build's branch + # (for pull requests: the source branch, then the target branch); + # fall back to capy's default branch. + - name: Resolve Capy branch + id: capy-ref + shell: bash + run: | + url=https://github.com/cppalliance/capy.git + ref= + for candidate in "${{ github.head_ref }}" "${{ github.base_ref }}" "${{ github.ref_name }}"; do + if [ -n "$candidate" ] && git ls-remote --exit-code --heads "$url" "$candidate" > /dev/null; then + ref="$candidate" + break + fi + done + if [ -z "$ref" ]; then + ref=$(git ls-remote --symref "$url" HEAD | awk '/^ref:/ { sub("refs/heads/", "", $2); print $2 }') + fi + echo "ref=$ref" >> "$GITHUB_OUTPUT" + - name: Clone Capy uses: actions/checkout@v6 with: repository: cppalliance/capy - ref: ${{ (github.ref_name == 'master' && github.ref_name) || 'develop' }} + ref: ${{ steps.capy-ref.outputs.ref }} path: capy-root - name: Clone Boost diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index e48b13452..f9df35b41 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -13,7 +13,7 @@ on: push: branches: - master - - develop + - develop* paths: - 'doc/**' - '*.adoc' @@ -42,11 +42,31 @@ jobs: with: path: corosio-root + # Use the capy branch with the same name as this build's branch + # (for pull requests: the source branch, then the target branch); + # fall back to capy's default branch. + - name: Resolve Capy branch + id: capy-ref + shell: bash + run: | + url=https://github.com/cppalliance/capy.git + ref= + for candidate in "${{ github.head_ref }}" "${{ github.base_ref }}" "${{ github.ref_name }}"; do + if [ -n "$candidate" ] && git ls-remote --exit-code --heads "$url" "$candidate" > /dev/null; then + ref="$candidate" + break + fi + done + if [ -z "$ref" ]; then + ref=$(git ls-remote --symref "$url" HEAD | awk '/^ref:/ { sub("refs/heads/", "", $2); print $2 }') + fi + echo "ref=$ref" >> "$GITHUB_OUTPUT" + - name: Clone Capy uses: actions/checkout@v4 with: repository: cppalliance/capy - ref: ${{ (github.ref_name == 'master' && github.ref_name) || 'develop' }} + ref: ${{ steps.capy-ref.outputs.ref }} path: capy-root - name: Clone Boost diff --git a/example/client/http_client.cpp b/example/client/http_client.cpp index 53a9b3d75..6e47342ea 100644 --- a/example/client/http_client.cpp +++ b/example/client/http_client.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include @@ -40,13 +39,22 @@ do_request( stream, capy::const_buffer(request.data(), request.size())); ec) throw std::system_error(ec); - // Read the entire response until EOF + // Read the entire response until EOF, one fixed chunk at a time std::string response; - auto [ec, n] = co_await capy::read( - stream, capy::string_dynamic_buffer(&response)); - // EOF is expected when server closes connection - if (ec && ec != capy::error::eof) - throw std::system_error(ec); + for (;;) + { + char chunk[4096]; + auto [ec, n] = co_await capy::read( + stream, capy::mutable_buffer(chunk, sizeof(chunk))); + response.append(chunk, n); + if (ec) + { + // EOF is expected when the server closes the connection + if (ec != capy::error::eof) + throw std::system_error(ec); + break; + } + } std::cout << response << std::endl; } diff --git a/example/https-client/https_client.cpp b/example/https-client/https_client.cpp index 24e369812..a6971c145 100644 --- a/example/https-client/https_client.cpp +++ b/example/https-client/https_client.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -42,13 +41,22 @@ do_request( stream, capy::const_buffer(request.data(), request.size())); ec) throw std::system_error(ec); - // Read entire response until EOF + // Read the entire response until EOF, one fixed chunk at a time std::string response; - auto [ec, n] = co_await capy::read( - stream, capy::string_dynamic_buffer(&response)); - // EOF is expected when server closes connection - if (ec && ec != capy::error::eof) - throw std::system_error(ec); + for (;;) + { + char chunk[4096]; + auto [ec, n] = co_await capy::read( + stream, capy::mutable_buffer(chunk, sizeof(chunk))); + response.append(chunk, n); + if (ec) + { + // EOF is expected when the server closes the connection + if (ec != capy::error::eof) + throw std::system_error(ec); + break; + } + } std::cout << response << std::endl; } diff --git a/include/boost/corosio/detail/continuation_op.hpp b/include/boost/corosio/detail/continuation_op.hpp deleted file mode 100644 index 6c6331157..000000000 --- a/include/boost/corosio/detail/continuation_op.hpp +++ /dev/null @@ -1,122 +0,0 @@ -// -// Copyright (c) 2026 Michael Vandeberg -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#ifndef BOOST_COROSIO_DETAIL_CONTINUATION_OP_HPP -#define BOOST_COROSIO_DETAIL_CONTINUATION_OP_HPP - -#include -#include - -#include -#include -#include - -namespace boost::corosio::detail { - -/* Scheduler operation that resumes a capy::continuation. - - Embeds a continuation alongside a scheduler_op so the - scheduler can queue it in the same FIFO as I/O completions - without a heap allocation. The continuation lives in the - caller's coroutine frame (awaitable or op struct); this - wrapper gives it a scheduler_op identity. - - io_context::executor_type::post(continuation&) uses - try_from_continuation() to recover the enclosing - continuation_op via a magic tag. The tag is read through - memcpy (not through a continuation_op*) so that UBSan - does not flag the speculative pointer arithmetic when the - continuation is not actually inside a continuation_op. -*/ -struct continuation_op final : scheduler_op -{ - static constexpr std::uint32_t magic_ = 0xC0710Au; - - std::uint32_t tag_ = magic_; - capy::continuation cont; - - continuation_op() noexcept : scheduler_op(&do_complete) {} - - // Reactor backends (epoll, select, kqueue) dispatch through - // virtual operator()(). IOCP dispatches through func_ which - // routes to do_complete below. - void operator()() override - { - // ThreadSanitizer cannot instrument standalone fences; this acquire - // fence pairs with the scheduler's release and is intentional. - BOOST_COROSIO_GCC_WARNING_PUSH - BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan") - std::atomic_thread_fence(std::memory_order_acquire); - BOOST_COROSIO_GCC_WARNING_POP - cont.h.resume(); - } - - void destroy() override - { - if (cont.h) - cont.h.destroy(); - } - -private: - // IOCP completion entry point. owner == nullptr means destroy. - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t, - std::uint32_t) - { - auto* self = static_cast(base); - if (!owner) - { - if (self->cont.h) - self->cont.h.destroy(); - return; - } - BOOST_COROSIO_GCC_WARNING_PUSH - BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan") - std::atomic_thread_fence(std::memory_order_acquire); - BOOST_COROSIO_GCC_WARNING_POP - self->cont.h.resume(); - } - -public: - - // Recover the enclosing continuation_op from its cont member. - // Returns nullptr if the continuation is not tagged (bare - // capy::continuation from capy internals like run_async). - static continuation_op* try_from_continuation( - capy::continuation& c) noexcept - { - // offsetof on non-standard-layout is conditionally-supported; - // suppress the warning — all targeted compilers handle this - // correctly and the self-relative arithmetic is move-safe. -#if defined(__GNUC__) || defined(__clang__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Winvalid-offsetof" -#endif - constexpr auto cont_off = offsetof(continuation_op, cont); - constexpr auto tag_off = offsetof(continuation_op, tag_); -#if defined(__GNUC__) || defined(__clang__) -#pragma GCC diagnostic pop -#endif - // Read the tag through memcpy from a char*, not through a - // continuation_op*. This avoids UBSan's vptr check when - // the continuation is not actually inside a continuation_op. - auto* base = reinterpret_cast(&c) - cont_off; - std::uint32_t tag; - std::memcpy(&tag, base + tag_off, sizeof(tag)); - if (tag != magic_) - return nullptr; - return reinterpret_cast(base); - } -}; - -} // namespace boost::corosio::detail - -#endif diff --git a/include/boost/corosio/detail/ready_queue.hpp b/include/boost/corosio/detail/ready_queue.hpp new file mode 100644 index 000000000..986472f3e --- /dev/null +++ b/include/boost/corosio/detail/ready_queue.hpp @@ -0,0 +1,167 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_DETAIL_READY_QUEUE_HPP +#define BOOST_COROSIO_DETAIL_READY_QUEUE_HPP + +#include +#include + +#include +#include + +namespace boost::corosio::detail { + +// A queue entry is a tagged pointer: low bit selects the node kind, the rest +// is the address. We steal a LOW bit (guaranteed zero by alignment), never a +// high bit (which would depend on a fragile platform canonical-address +// assumption). Both node types are >= 8-aligned, so the low 3 bits are free. +static_assert(alignof(scheduler_op) >= 2); +static_assert(alignof(capy::continuation) >= 2); +static_assert(sizeof(void*) == sizeof(std::uintptr_t)); +static_assert(sizeof(capy::continuation::reserved) >= sizeof(void*)); + +inline constexpr std::uintptr_t ready_cont_bit = 1; + +/// Return true if a queue entry refers to a continuation (vs a scheduler_op). +inline bool +ready_is_continuation(std::uintptr_t e) noexcept +{ + return (e & ready_cont_bit) != 0; +} + +/// Recover the scheduler_op from an op-tagged entry. +inline scheduler_op* +ready_as_op(std::uintptr_t e) noexcept +{ + return std::bit_cast(e & ~ready_cont_bit); +} + +/// Recover the continuation from a continuation-tagged entry. +inline capy::continuation* +ready_as_cont(std::uintptr_t e) noexcept +{ + return std::bit_cast(e & ~ready_cont_bit); +} + +/** A unified intrusive FIFO of scheduler_ops and continuations. + + Carries both completion handlers (`scheduler_op`, dispatched via + `(*op)()`) and posted coroutine resumptions (`capy::continuation`, + dispatched via `h.resume()`) in one ordered queue, with no per-entry + allocation. The next-link lives in the node: `scheduler_op::q_next_` + for ops, `capy::continuation::reserved` for continuations. + + @par Thread Safety + Not thread-safe; external synchronization required (the schedulers + hold their dispatch mutex while touching it). +*/ +class ready_queue +{ + std::uintptr_t head_ = 0; // tagged first entry, 0 when empty + std::uintptr_t tail_ = 0; // tagged last entry, 0 when empty + + // Read a node's next-link by value. A continuation's link lives in its + // void* `reserved` slot; bit_cast keeps us from forming a uintptr_t + // lvalue over that void* object (which would violate strict aliasing). + // + // GCC 12/13 false-positive: when inlining proves an entry refers to a + // continuation, -Warray-bounds still diagnoses the untaken scheduler_op + // branch against the smaller object. Fixed in GCC 14. + BOOST_COROSIO_GCC_WARNING_PUSH + BOOST_COROSIO_GCC_WARNING_DISABLE("-Warray-bounds") + static std::uintptr_t + next_of(std::uintptr_t e) noexcept + { + if (ready_is_continuation(e)) + return std::bit_cast(ready_as_cont(e)->reserved); + return ready_as_op(e)->q_next_; + } + + static void + set_next(std::uintptr_t e, std::uintptr_t nxt) noexcept + { + if (ready_is_continuation(e)) + ready_as_cont(e)->reserved = std::bit_cast(nxt); + else + ready_as_op(e)->q_next_ = nxt; + } + BOOST_COROSIO_GCC_WARNING_POP + + void + push_entry(std::uintptr_t e) noexcept + { + set_next(e, 0); + if (tail_) + set_next(tail_, e); + else + head_ = e; + tail_ = e; + } + +public: + ready_queue() = default; + + ready_queue(ready_queue&& o) noexcept + : head_(o.head_) + , tail_(o.tail_) + { + o.head_ = 0; + o.tail_ = 0; + } + + ready_queue(ready_queue const&) = delete; + ready_queue& operator=(ready_queue const&) = delete; + ready_queue& operator=(ready_queue&&) = delete; + + /// Return true if the queue holds no entries. + bool empty() const noexcept { return head_ == 0; } + + /// Append a scheduler_op to the back of the queue. + void push(scheduler_op* op) noexcept + { + push_entry(std::bit_cast(op)); + } + + /// Append a continuation to the back of the queue. + void push(capy::continuation& c) noexcept + { + push_entry(std::bit_cast(&c) | ready_cont_bit); + } + + /// Move all entries of @p other to the back in O(1); @p other is emptied. + void splice(ready_queue& other) noexcept + { + if (other.empty()) + return; + if (tail_) + set_next(tail_, other.head_); + else + head_ = other.head_; + tail_ = other.tail_; + other.head_ = 0; + other.tail_ = 0; + } + + /// Remove and return the front entry as a tagged value, or 0 when empty. + std::uintptr_t pop() noexcept + { + auto e = head_; + if (!e) + return 0; + head_ = next_of(e); + if (!head_) + tail_ = 0; + return e; + } +}; + +} // namespace boost::corosio::detail + +#endif diff --git a/include/boost/corosio/detail/scheduler.hpp b/include/boost/corosio/detail/scheduler.hpp index e0555e3a2..ec6f19196 100644 --- a/include/boost/corosio/detail/scheduler.hpp +++ b/include/boost/corosio/detail/scheduler.hpp @@ -12,6 +12,7 @@ #define BOOST_COROSIO_DETAIL_SCHEDULER_HPP #include +#include #include #include @@ -38,6 +39,9 @@ struct BOOST_COROSIO_DECL scheduler /// Post a scheduler operation for deferred execution. virtual void post(scheduler_op*) const = 0; + /// Post a continuation for deferred execution (zero-allocation). + virtual void post(capy::continuation&) const = 0; + /// Increment the outstanding work count. virtual void work_started() noexcept = 0; diff --git a/include/boost/corosio/detail/scheduler_op.hpp b/include/boost/corosio/detail/scheduler_op.hpp index 4edfcd214..0bc72a7ea 100644 --- a/include/boost/corosio/detail/scheduler_op.hpp +++ b/include/boost/corosio/detail/scheduler_op.hpp @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -114,9 +113,12 @@ class scheduler_op : public intrusive_queue::node func_type func_; - // Pad to 32 bytes so derived structs (descriptor_state, epoll_op) - // keep hot fields on optimal cache line boundaries - std::byte reserved_[sizeof(void*)] = {}; +public: + // Tagged next-link for ready_queue (low bit selects node kind). Reuses + // the former 8-byte padding word, so size/alignment are unchanged. Used + // only while this op is in the ready queue; node.next_ is used while it + // is pending in an op_queue. An op is in one or the other, never both. + std::uintptr_t q_next_ = 0; }; using op_queue = intrusive_queue; diff --git a/include/boost/corosio/io/io_timer.hpp b/include/boost/corosio/io/io_timer.hpp index 6701cc6a7..f69d3799f 100644 --- a/include/boost/corosio/io/io_timer.hpp +++ b/include/boost/corosio/io/io_timer.hpp @@ -12,7 +12,7 @@ #define BOOST_COROSIO_IO_IO_TIMER_HPP #include -#include +#include #include #include #include @@ -47,7 +47,7 @@ class BOOST_COROSIO_DECL io_timer : public io_object io_timer& t_; std::stop_token token_; mutable std::error_code ec_; - detail::continuation_op cont_op_; + capy::continuation cont_; explicit wait_awaitable(io_timer& t) noexcept : t_(t) {} @@ -67,7 +67,7 @@ class BOOST_COROSIO_DECL io_timer : public io_object -> std::coroutine_handle<> { token_ = env->stop_token; - cont_op_.cont.h = h; + cont_.h = h; auto& impl = t_.get(); // Inline fast path: already expired and not in the heap if (impl.heap_index_ == implementation::npos && @@ -78,10 +78,10 @@ class BOOST_COROSIO_DECL io_timer : public io_object token_ = {}; // match normal path so await_resume // returns ec_, not a stale stop check auto d = env->executor; - d.post(cont_op_.cont); + d.post(cont_); return std::noop_coroutine(); } - return impl.wait(h, env->executor, std::move(token_), &ec_, &cont_op_.cont); + return impl.wait(h, env->executor, std::move(token_), &ec_, &cont_); } }; diff --git a/include/boost/corosio/io_context.hpp b/include/boost/corosio/io_context.hpp index 2fb36b7d8..275d37046 100644 --- a/include/boost/corosio/io_context.hpp +++ b/include/boost/corosio/io_context.hpp @@ -13,7 +13,6 @@ #define BOOST_COROSIO_IO_CONTEXT_HPP #include -#include #include #include #include @@ -567,12 +566,10 @@ class io_context::executor_type /** Dispatch a continuation. Returns a handle for symmetric transfer. If called from - within `run()`, returns `c.h`. Otherwise posts the - enclosing continuation_op as a scheduler_op for later - execution and returns `std::noop_coroutine()`. + within `run()`, returns `c.h`. Otherwise posts `c` for + later execution and returns `std::noop_coroutine()`. - @param c The continuation to dispatch. Must be the `cont` - member of a `detail::continuation_op`. + @param c The continuation to dispatch. @return A handle for symmetric transfer or `std::noop_coroutine()`. */ @@ -586,25 +583,19 @@ class io_context::executor_type /** Post a continuation for deferred execution. - If the continuation is backed by a continuation_op - (tagged), posts it directly as a scheduler_op — zero - heap allocation. Otherwise falls back to the - heap-allocating post(coroutine_handle<>) path. + Enqueues `c` directly on the scheduler's ready queue. + No heap allocation occurs. */ void post(capy::continuation& c) const { - auto* op = detail::continuation_op::try_from_continuation(c); - if (op) - ctx_->sched_->post(op); - else - ctx_->sched_->post(c.h); + ctx_->sched_->post(c); } /** Post a bare coroutine handle for deferred execution. - Heap-allocates a scheduler_op to wrap the handle. Prefer - posting through a continuation_op-backed continuation when - the continuation has suitable lifetime. + Heap-allocates a scheduler_op to wrap the handle. A caller + that already owns a `scheduler_op` can post it directly via + the `post(scheduler_op*)` overload to avoid the allocation. @param h The coroutine handle to post. */ diff --git a/include/boost/corosio/native/detail/coro_op.hpp b/include/boost/corosio/native/detail/coro_op.hpp index 5801014c0..ecdf59815 100644 --- a/include/boost/corosio/native/detail/coro_op.hpp +++ b/include/boost/corosio/native/detail/coro_op.hpp @@ -11,7 +11,7 @@ #define BOOST_COROSIO_NATIVE_DETAIL_CORO_OP_HPP #include -#include +#include #include #include @@ -74,7 +74,7 @@ struct coro_op : scheduler_op }; std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref ex; std::error_code* ec_out = nullptr; std::size_t* bytes_out = nullptr; diff --git a/include/boost/corosio/native/detail/coro_op_complete.hpp b/include/boost/corosio/native/detail/coro_op_complete.hpp index fe479fffc..9954332f3 100644 --- a/include/boost/corosio/native/detail/coro_op_complete.hpp +++ b/include/boost/corosio/native/detail/coro_op_complete.hpp @@ -124,8 +124,8 @@ coro_drain_if_shutdown(void* owner, coro_op* self) noexcept inline void coro_resume(coro_op* self) noexcept { - self->cont_op.cont.h = self->h; - auto next = dispatch_coro(self->ex, self->cont_op.cont); + self->cont.h = self->h; + auto next = dispatch_coro(self->ex, self->cont); auto suicide = std::move(self->impl_ptr); next.resume(); // suicide drops here; may destroy impl + self. diff --git a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp index aed650031..b44785202 100644 --- a/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp +++ b/include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp @@ -346,7 +346,7 @@ epoll_scheduler::run_task( detail::throw_system_error(make_err(errno), "epoll_wait"); bool check_timers = false; - op_queue local_ops; + ready_queue local_ops; for (int i = 0; i < nfds; ++i) { @@ -390,8 +390,7 @@ epoll_scheduler::run_task( lock.lock(); - if (!local_ops.empty()) - completed_ops_.splice(local_ops); + completed_ops_.splice(local_ops); } } // namespace boost::corosio::detail diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp index fb607a605..84bff11bb 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_acceptor_ops.hpp @@ -77,7 +77,7 @@ struct uring_multi_accept_op : io_uring_op } static void do_cqe(io_uring_op* base, int res, unsigned flags, - op_queue& /*local*/) noexcept + ready_queue& /*local*/) noexcept { auto* self = static_cast(base); bool more = (flags & IORING_CQE_F_MORE) != 0; @@ -138,7 +138,7 @@ struct uring_accept_op : io_uring_op {} static void do_cqe(io_uring_op*, int, unsigned, - op_queue&) noexcept + ready_queue&) noexcept { // Unreachable: this op never receives a CQE. } @@ -165,8 +165,8 @@ struct uring_accept_op : io_uring_op *self->ec_out = was_cancelled ? std::error_code(capy::error::canceled) : make_err(self->err); - self->cont_op.cont.h = self->h; - auto next = dispatch_coro(self->ex, self->cont_op.cont); + self->cont.h = self->h; + auto next = dispatch_coro(self->ex, self->cont); delete self; next.resume(); return; @@ -184,8 +184,8 @@ struct uring_accept_op : io_uring_op if (self->ec_out) *self->ec_out = {}; - self->cont_op.cont.h = self->h; - auto next = dispatch_coro(self->ex, self->cont_op.cont); + self->cont.h = self->h; + auto next = dispatch_coro(self->ex, self->cont); delete self; next.resume(); } diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp index 4324b2767..04e26f608 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_dgram_ops.hpp @@ -118,7 +118,7 @@ struct uring_dgram_send_op : io_uring_op } static void do_cqe( - io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept + io_uring_op* base, int res, unsigned flags, ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -272,7 +272,7 @@ struct uring_dgram_recv_op : io_uring_op } static void do_cqe( - io_uring_op* base, int res, unsigned flags, op_queue& local) noexcept + io_uring_op* base, int res, unsigned flags, ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp index e9ea08f4a..99a26e339 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_file_ops.hpp @@ -100,7 +100,7 @@ struct uring_file_read_op_base : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -117,8 +117,8 @@ struct uring_file_read_op_base : io_uring_op if (self->bytes_out) *self->bytes_out = self->res >= 0 ? static_cast(self->res) : 0u; - self->cont_op.cont.h = self->h; - return dispatch_coro(self->ex, self->cont_op.cont); + self->cont.h = self->h; + return dispatch_coro(self->ex, self->cont); } }; @@ -238,7 +238,7 @@ struct uring_file_write_op_base : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -253,8 +253,8 @@ struct uring_file_write_op_base : io_uring_op if (self->bytes_out) *self->bytes_out = self->res >= 0 ? static_cast(self->res) : 0u; - self->cont_op.cont.h = self->h; - return dispatch_coro(self->ex, self->cont_op.cont); + self->cont.h = self->h; + return dispatch_coro(self->ex, self->cont); } }; diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_op.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_op.hpp index 1420d28b3..cfe42c672 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_op.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_op.hpp @@ -15,6 +15,7 @@ #if BOOST_COROSIO_HAS_IO_URING #include +#include // Forward declare to avoid circular include with io_uring_scheduler.hpp. namespace boost::corosio::detail { class io_uring_scheduler; } @@ -42,7 +43,7 @@ struct io_uring_op : coro_op /// process_completions can splice the batch into completed_ops_ /// atomically and do_one dispatches one handler at a time. using cqe_func_type = - void (*)(io_uring_op*, int res, unsigned flags, op_queue& local) noexcept; + void (*)(io_uring_op*, int res, unsigned flags, ready_queue& local) noexcept; /// SQE-preparation dispatcher type. Called by the leader during /// its drain step to fill an SQE for this op. Concrete op types diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp index 31f15b9c5..33f724743 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_scheduler.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -76,9 +77,9 @@ class BOOST_COROSIO_DECL io_uring_scheduler final void shutdown() override; - // scheduler virtuals — definitions in Task 6 void post(std::coroutine_handle<>) const override; void post(scheduler_op*) const override; + void post(capy::continuation&) const override; bool running_in_this_thread() const noexcept override; void stop() override; @@ -287,7 +288,7 @@ class BOOST_COROSIO_DECL io_uring_scheduler final mutable mutex_type dispatch_mutex_{true}; mutable mutex_type ring_mutex_{true}; mutable event_type cond_{true}; - mutable op_queue completed_ops_; + mutable ready_queue completed_ops_; // outstanding_work_ and io_uring_inflight_ are both atomic // counters updated at high frequency on different paths: // - outstanding_work_ : every work_started / work_finished call, @@ -526,11 +527,21 @@ io_uring_scheduler::shutdown() // broken explicitly inside each service before the scheduler // shutdown runs. lock_type lock(dispatch_mutex_); - while (auto* op = completed_ops_.pop()) + while (auto e = completed_ops_.pop()) { - lock.unlock(); - op->destroy(); - lock.lock(); + if (ready_is_continuation(e)) + { + lock.unlock(); + if (auto h = ready_as_cont(e)->h) + h.destroy(); + lock.lock(); + } + else + { + lock.unlock(); + ready_as_op(e)->destroy(); + lock.lock(); + } } cond_.notify_all(); } @@ -636,12 +647,6 @@ io_uring_scheduler::post(std::coroutine_handle<> h) const { auto saved = h_; delete this; - // TSan cannot instrument standalone fences; this acquire - // pairs with the posting thread's release and is intentional. - BOOST_COROSIO_GCC_WARNING_PUSH - BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan") - std::atomic_thread_fence(std::memory_order_acquire); - BOOST_COROSIO_GCC_WARNING_POP saved.resume(); } @@ -686,6 +691,23 @@ io_uring_scheduler::post(scheduler_op* op) const interrupt_reactor(); } +inline void +io_uring_scheduler::post(capy::continuation& c) const +{ + lazy_init_ring(); + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + bool wake_leader; + { + lock_type lock(dispatch_mutex_); + completed_ops_.push(c); + wake_leader = task_running_; + if (!wake_leader) + cond_.notify_one(); + } + if (wake_leader) + interrupt_reactor(); +} + // Thread-local stack of frames for io_uring schedulers being run on the // current thread. Holds the running-scheduler pointer (for // running_in_this_thread reporting) and the inline completion budget @@ -927,7 +949,7 @@ io_uring_scheduler::do_one(long timeout_us) if (stopped_.load(std::memory_order_acquire)) return 0; - if (auto* op = completed_ops_.pop()) + if (auto e = completed_ops_.pop()) { // Hand off any remaining queued work to a follower so we // dispatch in parallel. @@ -936,7 +958,10 @@ io_uring_scheduler::do_one(long timeout_us) lock.unlock(); // Speculative follow-ups in the handler share this budget. reset_inline_budget(); - (*op)(); + if (ready_is_continuation(e)) + ready_as_cont(e)->h.resume(); + else + (*ready_as_op(e))(); work_finished(); return 1; } @@ -1080,7 +1105,7 @@ io_uring_scheduler::process_completions() // Collect completed I/O ops locally; splice into completed_ops_ // after the loop so do_one dispatches them one at a time. - op_queue local_ops; + ready_queue local_ops; std::int64_t inflight_dec = 0; io_uring_for_each_cqe(&ring_, head, cqe) diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp index 7c4f4e65e..ffa2ed2a6 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_socket_ops.hpp @@ -176,7 +176,7 @@ struct uring_read_op : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -286,7 +286,7 @@ struct uring_write_op : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -387,7 +387,7 @@ struct uring_connect_op : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -552,7 +552,7 @@ struct uring_wait_op : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; @@ -643,7 +643,7 @@ struct uring_local_connect_op : io_uring_op static void do_cqe( io_uring_op* base, int res, unsigned flags, - op_queue& local) noexcept + ready_queue& local) noexcept { auto* self = static_cast(base); self->res = res; diff --git a/include/boost/corosio/native/detail/io_uring/io_uring_types.hpp b/include/boost/corosio/native/detail/io_uring/io_uring_types.hpp index 81dc7927a..5a59b9eaf 100644 --- a/include/boost/corosio/native/detail/io_uring/io_uring_types.hpp +++ b/include/boost/corosio/native/detail/io_uring/io_uring_types.hpp @@ -191,8 +191,8 @@ class BOOST_COROSIO_DECL io_uring_tcp_socket final n < 0 ? 0u : static_cast(n), empty_buf); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - rd_.cont_op.cont.h = h; - return dispatch_coro(ex, rd_.cont_op.cont); + rd_.cont.h = h; + return dispatch_coro(ex, rd_.cont); } rd_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, token); @@ -264,8 +264,8 @@ class BOOST_COROSIO_DECL io_uring_tcp_socket final /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - wr_.cont_op.cont.h = h; - return dispatch_coro(ex, wr_.cont_op.cont); + wr_.cont.h = h; + return dispatch_coro(ex, wr_.cont); } wr_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, token); @@ -311,8 +311,8 @@ class BOOST_COROSIO_DECL io_uring_tcp_socket final if (sched_->try_consume_inline_budget()) { if (ec) *ec = capy::error::canceled; - conn_.cont_op.cont.h = h; - return dispatch_coro(ex, conn_.cont_op.cont); + conn_.cont.h = h; + return dispatch_coro(ex, conn_.cont); } conn_.addrlen = to_sockaddr(ep, family_, conn_.addr); conn_.prepare(h, ex, ec, fd_, sched_, shared_from_this(), @@ -940,8 +940,8 @@ class BOOST_COROSIO_DECL io_uring_local_stream_socket final n < 0 ? 0u : static_cast(n), empty_buf); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - rd_.cont_op.cont.h = h; - return dispatch_coro(ex, rd_.cont_op.cont); + rd_.cont.h = h; + return dispatch_coro(ex, rd_.cont); } rd_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, token); @@ -1013,8 +1013,8 @@ class BOOST_COROSIO_DECL io_uring_local_stream_socket final /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - wr_.cont_op.cont.h = h; - return dispatch_coro(ex, wr_.cont_op.cont); + wr_.cont.h = h; + return dispatch_coro(ex, wr_.cont); } wr_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, token); @@ -1060,8 +1060,8 @@ class BOOST_COROSIO_DECL io_uring_local_stream_socket final if (sched_->try_consume_inline_budget()) { if (ec) *ec = capy::error::canceled; - conn_.cont_op.cont.h = h; - return dispatch_coro(ex, conn_.cont_op.cont); + conn_.cont.h = h; + return dispatch_coro(ex, conn_.cont); } conn_.addrlen = to_sockaddr(ep, conn_.addr); conn_.prepare(h, ex, ec, fd_, sched_, shared_from_this(), @@ -1701,8 +1701,8 @@ class BOOST_COROSIO_DECL io_uring_udp_socket final if (sched_->try_consume_inline_budget()) { if (ec) *ec = capy::error::canceled; - conn_.cont_op.cont.h = h; - return dispatch_coro(ex, conn_.cont_op.cont); + conn_.cont.h = h; + return dispatch_coro(ex, conn_.cont); } conn_.addrlen = to_sockaddr(ep, family_, conn_.addr); conn_.prepare(h, ex, ec, fd_, sched_, shared_from_this(), @@ -1841,8 +1841,8 @@ class BOOST_COROSIO_DECL io_uring_udp_socket final /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - send_.cont_op.cont.h = h; - return dispatch_coro(ex, send_.cont_op.cont); + send_.cont.h = h; + return dispatch_coro(ex, send_.cont); } send_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, dest_len, dest_storage, @@ -1930,8 +1930,8 @@ class BOOST_COROSIO_DECL io_uring_udp_socket final *bytes = (n < 0) ? 0u : static_cast(n); if (n >= 0 && want_source && source_out && !empty_buf) *source_out = sockaddr_to_endpoint(src_storage); - recv_.cont_op.cont.h = h; - return dispatch_coro(ex, recv_.cont_op.cont); + recv_.cont.h = h; + return dispatch_coro(ex, recv_.cont); } recv_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, source_out, @@ -2216,8 +2216,8 @@ class BOOST_COROSIO_DECL io_uring_local_datagram_socket final if (sched_->try_consume_inline_budget()) { if (ec) *ec = capy::error::canceled; - conn_.cont_op.cont.h = h; - return dispatch_coro(ex, conn_.cont_op.cont); + conn_.cont.h = h; + return dispatch_coro(ex, conn_.cont); } conn_.addrlen = to_sockaddr(ep, conn_.addr); conn_.prepare(h, ex, ec, fd_, sched_, shared_from_this(), @@ -2389,8 +2389,8 @@ class BOOST_COROSIO_DECL io_uring_local_datagram_socket final /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false); if (bytes) *bytes = (n < 0) ? 0u : static_cast(n); - send_.cont_op.cont.h = h; - return dispatch_coro(ex, send_.cont_op.cont); + send_.cont.h = h; + return dispatch_coro(ex, send_.cont); } send_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, dest_len, dest_storage, @@ -2478,8 +2478,8 @@ class BOOST_COROSIO_DECL io_uring_local_datagram_socket final *bytes = (n < 0) ? 0u : static_cast(n); if (n >= 0 && want_source && source_out && !empty_buf) *source_out = sockaddr_to_local_endpoint(src_storage, src_namelen); - recv_.cont_op.cont.h = h; - return dispatch_coro(ex, recv_.cont_op.cont); + recv_.cont.h = h; + return dispatch_coro(ex, recv_.cont); } recv_.prepare(h, ex, ec, bytes, fd_, sched_, shared_from_this(), &spec_, buffers, source_out, diff --git a/include/boost/corosio/native/detail/iocp/win_completion_key.hpp b/include/boost/corosio/native/detail/iocp/win_completion_key.hpp index 78f9262ee..8bb31c452 100644 --- a/include/boost/corosio/native/detail/iocp/win_completion_key.hpp +++ b/include/boost/corosio/native/detail/iocp/win_completion_key.hpp @@ -45,7 +45,11 @@ enum completion_key : ULONG_PTR key_result_stored = 3, /** Posted scheduler_op*. OVERLAPPED* is actually a scheduler_op*. */ - key_posted = 4 + key_posted = 4, + + /** Posted capy::continuation*. OVERLAPPED* is actually a + capy::continuation*; resume its handle. */ + key_continuation = 5 }; } // namespace boost::corosio::detail diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp index 4f7da1965..ecb6383b1 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp @@ -206,11 +206,11 @@ local_stream_accept_op::do_complete( *op->impl_out = nullptr; } - op->cont_op.cont.h = op->h; + op->cont.h = op->h; auto saved_ex = op->ex; auto prevent_premature_destruction = std::move(op->acceptor_ptr); - dispatch_coro(saved_ex, op->cont_op.cont).resume(); + dispatch_coro(saved_ex, op->cont).resume(); } inline void diff --git a/include/boost/corosio/native/detail/iocp/win_overlapped_op.hpp b/include/boost/corosio/native/detail/iocp/win_overlapped_op.hpp index 0557ae828..58f05aaa3 100644 --- a/include/boost/corosio/native/detail/iocp/win_overlapped_op.hpp +++ b/include/boost/corosio/native/detail/iocp/win_overlapped_op.hpp @@ -118,8 +118,8 @@ struct overlapped_op if (bytes_out) *bytes_out = static_cast(bytes_transferred); - cont_op.cont.h = h; - dispatch_coro(ex, cont_op.cont).resume(); + cont.h = h; + dispatch_coro(ex, cont).resume(); } /** Disarm cancellation and abandon the coroutine handle. */ diff --git a/include/boost/corosio/native/detail/iocp/win_resolver_service.hpp b/include/boost/corosio/native/detail/iocp/win_resolver_service.hpp index b23268869..dfda81821 100644 --- a/include/boost/corosio/native/detail/iocp/win_resolver_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_resolver_service.hpp @@ -275,8 +275,8 @@ resolve_op::do_complete( op->cancel_handle = nullptr; - op->cont_op.cont.h = op->h; - dispatch_coro(op->ex, op->cont_op.cont).resume(); + op->cont.h = op->h; + dispatch_coro(op->ex, op->cont).resume(); } // reverse_resolve_op @@ -320,8 +320,8 @@ reverse_resolve_op::do_complete( op->ep, std::move(op->stored_host), std::move(op->stored_service)); } - op->cont_op.cont.h = op->h; - dispatch_coro(op->ex, op->cont_op.cont).resume(); + op->cont.h = op->h; + dispatch_coro(op->ex, op->cont).resume(); } // win_resolver diff --git a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp index c88bba2f1..d6c9e38e5 100644 --- a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp +++ b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -64,6 +65,7 @@ class BOOST_COROSIO_DECL win_scheduler final void shutdown() override; void post(std::coroutine_handle<> h) const override; void post(scheduler_op* h) const override; + void post(capy::continuation&) const override; bool running_in_this_thread() const noexcept override; void stop() override; bool stopped() const noexcept override; @@ -171,6 +173,8 @@ class BOOST_COROSIO_DECL win_scheduler final - key_wake_dispatch (1): Timer wakeup, check dispatch_required_ - key_shutdown (2): Stop signal - key_result_stored (3): Results pre-stored in OVERLAPPED + - key_posted: Carries a scheduler_op* in the OVERLAPPED pointer + - key_continuation: Carries a capy::continuation* in the OVERLAPPED pointer */ namespace iocp { @@ -277,7 +281,6 @@ win_scheduler::post(std::coroutine_handle<> h) const } auto coro = self->h_; delete self; - std::atomic_thread_fence(std::memory_order_acquire); coro.resume(); } @@ -314,6 +317,22 @@ win_scheduler::post(scheduler_op* h) const } } +inline void +win_scheduler::post(capy::continuation& c) const +{ + ::InterlockedIncrement(&outstanding_work_); + + if (!::PostQueuedCompletionStatus( + iocp_, 0, key_continuation, reinterpret_cast(&c))) + { + // completed_ops_ is an op_queue and cannot carry a raw continuation, + // so on the rare PQCS failure fall back to the allocating handle + // path. Drop the increment first; post(c.h) does its own accounting. + ::InterlockedDecrement(&outstanding_work_); + post(c.h); + } +} + inline bool win_scheduler::running_in_this_thread() const noexcept { @@ -589,6 +608,15 @@ win_scheduler::do_one(unsigned long timeout_ms) return 1; } + case key_continuation: + { + // Posted continuation: overlapped is actually a continuation* + auto* c = reinterpret_cast(overlapped); + c->h.resume(); + work_finished(); + return 1; + } + default: continue; } @@ -725,6 +753,13 @@ win_scheduler::shutdown() auto* op = reinterpret_cast(overlapped); op->destroy(); } + else if (key == key_continuation) + { + // Drain without resuming: destroy the parked frame. + auto* c = reinterpret_cast(overlapped); + if (c->h) + c->h.destroy(); + } else { auto* op = overlapped_to_op(overlapped); diff --git a/include/boost/corosio/native/detail/iocp/win_signal.hpp b/include/boost/corosio/native/detail/iocp/win_signal.hpp index a0fe30c71..e7038eeec 100644 --- a/include/boost/corosio/native/detail/iocp/win_signal.hpp +++ b/include/boost/corosio/native/detail/iocp/win_signal.hpp @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -45,7 +44,7 @@ enum struct signal_op : scheduler_op { std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref d; std::error_code* ec_out = nullptr; int* signal_out = nullptr; diff --git a/include/boost/corosio/native/detail/iocp/win_signals.hpp b/include/boost/corosio/native/detail/iocp/win_signals.hpp index 9134b8f80..88a5ea5e7 100644 --- a/include/boost/corosio/native/detail/iocp/win_signals.hpp +++ b/include/boost/corosio/native/detail/iocp/win_signals.hpp @@ -307,8 +307,8 @@ signal_op::do_complete( auto* service = op->svc; op->svc = nullptr; - op->cont_op.cont.h = op->h; - dispatch_coro(op->d, op->cont_op.cont).resume(); + op->cont.h = op->h; + dispatch_coro(op->d, op->cont).resume(); if (service) service->work_finished(); @@ -341,8 +341,8 @@ win_signal::wait( *ec = make_error_code(capy::error::canceled); if (signal_out) *signal_out = 0; - pending_op_.cont_op.cont.h = h; - dispatch_coro(d, pending_op_.cont_op.cont).resume(); + pending_op_.cont.h = h; + dispatch_coro(d, pending_op_.cont).resume(); // completion is always posted to scheduler queue, never inline. return std::noop_coroutine(); } @@ -616,8 +616,8 @@ win_signals::cancel_wait(win_signal& impl) *op->ec_out = make_error_code(capy::error::canceled); if (op->signal_out) *op->signal_out = 0; - op->cont_op.cont.h = op->h; - dispatch_coro(op->d, op->cont_op.cont).resume(); + op->cont.h = op->h; + dispatch_coro(op->d, op->cont).resume(); sched_.work_finished(); } } @@ -639,7 +639,7 @@ win_signals::start_wait(win_signal& impl, signal_op* op) *op->ec_out = make_error_code(capy::error::canceled); if (op->signal_out) *op->signal_out = 0; - op->cont_op.cont.h = op->h; + op->cont.h = op->h; } else { @@ -671,7 +671,7 @@ win_signals::start_wait(win_signal& impl, signal_op* op) // Dispatch outside the lock to avoid deadlock if the resumed // coroutine re-enters cancel()/add()/remove() if (was_cancelled) - dispatch_coro(op->d, op->cont_op.cont).resume(); + dispatch_coro(op->d, op->cont).resume(); } inline void diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp index 40bc78aa5..91d10e9a9 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp @@ -287,11 +287,11 @@ accept_op::do_complete( *op->impl_out = nullptr; } - op->cont_op.cont.h = op->h; + op->cont.h = op->h; auto saved_ex = op->ex; auto prevent_premature_destruction = std::move(op->acceptor_ptr); - dispatch_coro(saved_ex, op->cont_op.cont).resume(); + dispatch_coro(saved_ex, op->cont).resume(); } // acceptor_wait_op completion handler diff --git a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp index 60ae2f21a..a394d5d4c 100644 --- a/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp +++ b/include/boost/corosio/native/detail/kqueue/kqueue_scheduler.hpp @@ -337,7 +337,7 @@ kqueue_scheduler::run_task( if (nev < 0 && saved_errno != EINTR) detail::throw_system_error(make_err(saved_errno), "kevent"); - op_queue local_ops; + ready_queue local_ops; for (int i = 0; i < nev; ++i) { @@ -385,8 +385,7 @@ kqueue_scheduler::run_task( lock.lock(); - if (!local_ops.empty()) - completed_ops_.splice(local_ops); + completed_ops_.splice(local_ops); } } // namespace boost::corosio::detail diff --git a/include/boost/corosio/native/detail/posix/posix_resolver.hpp b/include/boost/corosio/native/detail/posix/posix_resolver.hpp index 237737484..6edd438dc 100644 --- a/include/boost/corosio/native/detail/posix/posix_resolver.hpp +++ b/include/boost/corosio/native/detail/posix/posix_resolver.hpp @@ -176,7 +176,7 @@ class posix_resolver final // Coroutine state std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref ex; posix_resolver* impl = nullptr; @@ -221,7 +221,7 @@ class posix_resolver final // Coroutine state std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref ex; posix_resolver* impl = nullptr; diff --git a/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp b/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp index 97b29572e..55b8d1a48 100644 --- a/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_resolver_service.hpp @@ -273,8 +273,8 @@ posix_resolver::resolve_op::operator()() *out = std::move(stored_results); impl->svc_.work_finished(); - cont_op.cont.h = h; - dispatch_coro(ex, cont_op.cont).resume(); + cont.h = h; + dispatch_coro(ex, cont).resume(); } inline void @@ -339,8 +339,8 @@ posix_resolver::reverse_resolve_op::operator()() } impl->svc_.work_finished(); - cont_op.cont.h = h; - dispatch_coro(ex, cont_op.cont).resume(); + cont.h = h; + dispatch_coro(ex, cont).resume(); } inline void @@ -381,8 +381,8 @@ posix_resolver::resolve( if (svc_.single_threaded()) { *ec = std::make_error_code(std::errc::operation_not_supported); - op_.cont_op.cont.h = h; - return dispatch_coro(ex, op_.cont_op.cont); + op_.cont.h = h; + return dispatch_coro(ex, op_.cont); } auto& op = op_; @@ -427,8 +427,8 @@ posix_resolver::reverse_resolve( if (svc_.single_threaded()) { *ec = std::make_error_code(std::errc::operation_not_supported); - reverse_op_.cont_op.cont.h = h; - return dispatch_coro(ex, reverse_op_.cont_op.cont); + reverse_op_.cont.h = h; + return dispatch_coro(ex, reverse_op_.cont); } auto& op = reverse_op_; diff --git a/include/boost/corosio/native/detail/posix/posix_signal.hpp b/include/boost/corosio/native/detail/posix/posix_signal.hpp index deb4fc4b9..8024938ff 100644 --- a/include/boost/corosio/native/detail/posix/posix_signal.hpp +++ b/include/boost/corosio/native/detail/posix/posix_signal.hpp @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include #include @@ -44,7 +44,7 @@ enum struct signal_op : scheduler_op { std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref d; std::error_code* ec_out = nullptr; int* signal_out = nullptr; diff --git a/include/boost/corosio/native/detail/posix/posix_signal_service.hpp b/include/boost/corosio/native/detail/posix/posix_signal_service.hpp index 913cf4a44..7957c8e55 100644 --- a/include/boost/corosio/native/detail/posix/posix_signal_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_signal_service.hpp @@ -313,8 +313,8 @@ signal_op::operator()() auto* service = svc; svc = nullptr; - cont_op.cont.h = h; - d.post(cont_op.cont); + cont.h = h; + d.post(cont); // Balance the work_started() from start_wait if (service) @@ -354,8 +354,8 @@ posix_signal::wait( *ec = make_error_code(capy::error::canceled); if (signal_out) *signal_out = 0; - pending_op_.cont_op.cont.h = h; - d.post(pending_op_.cont_op.cont); + pending_op_.cont.h = h; + d.post(pending_op_.cont); // completion is always posted to scheduler queue, never inline. return std::noop_coroutine(); } @@ -656,8 +656,8 @@ posix_signal_service::cancel_wait(posix_signal& impl) *op->ec_out = make_error_code(capy::error::canceled); if (op->signal_out) *op->signal_out = 0; - op->cont_op.cont.h = op->h; - op->d.post(op->cont_op.cont); + op->cont.h = op->h; + op->d.post(op->cont); sched_->work_finished(); } } @@ -676,8 +676,8 @@ posix_signal_service::start_wait(posix_signal& impl, signal_op* op) *op->ec_out = make_error_code(capy::error::canceled); if (op->signal_out) *op->signal_out = 0; - op->cont_op.cont.h = op->h; - op->d.post(op->cont_op.cont); + op->cont.h = op->h; + op->d.post(op->cont); return; } diff --git a/include/boost/corosio/native/detail/posix/posix_stream_file.hpp b/include/boost/corosio/native/detail/posix/posix_stream_file.hpp index a83a1febc..ca5b40c4d 100644 --- a/include/boost/corosio/native/detail/posix/posix_stream_file.hpp +++ b/include/boost/corosio/native/detail/posix/posix_stream_file.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -105,7 +104,7 @@ class posix_stream_file final // Coroutine state std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; capy::executor_ref ex; // Output pointers @@ -426,8 +425,8 @@ posix_stream_file::file_op::operator()() // the parent posix_stream_file (which embeds this file_op) alive. auto prevent_destroy = std::move(impl_ref); ex.on_work_finished(); - cont_op.cont.h = h; - dispatch_coro(ex, cont_op.cont).resume(); + cont.h = h; + dispatch_coro(ex, cont).resume(); } inline void diff --git a/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp b/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp index e24ab65ea..bf6dfe4b7 100644 --- a/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp +++ b/include/boost/corosio/native/detail/posix/posix_stream_file_service.hpp @@ -174,8 +174,8 @@ posix_stream_file::read_some( { *ec = {}; *bytes_out = 0; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } for (int i = 0; i < op.iovec_count; ++i) @@ -258,8 +258,8 @@ posix_stream_file::write_some( { *ec = {}; *bytes_out = 0; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } for (int i = 0; i < op.iovec_count; ++i) diff --git a/include/boost/corosio/native/detail/reactor/reactor_backend.hpp b/include/boost/corosio/native/detail/reactor/reactor_backend.hpp index 74674f5b2..2378a9294 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_backend.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_backend.hpp @@ -98,8 +98,8 @@ reactor_acceptor_implsvc_.scheduler().try_consume_inline_budget()) { *ec = err ? make_err(err) : std::error_code{}; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.reset(); op.h = h; @@ -824,8 +824,8 @@ reactor_datagram_socket< { *ec = err ? make_err(err) : std::error_code{}; *bytes_out = bytes; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.h = h; op.ex = ex; @@ -937,8 +937,8 @@ reactor_datagram_socket< { *ec = err ? make_err(err) : std::error_code{}; *bytes_out = bytes; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.h = h; op.ex = ex; @@ -1006,8 +1006,8 @@ reactor_datagram_socket< if (this->svc_.scheduler().try_consume_inline_budget()) { *ec = std::error_code{}; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.reset(); op.wait_event = reactor_event_write; diff --git a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp index 6ba5731da..f52a24c00 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp @@ -12,6 +12,7 @@ #include #include +#include #include @@ -148,7 +149,7 @@ inline void reactor_descriptor_state::invoke_deferred_io() { std::shared_ptr prevent_impl_destruction; - op_queue local_ops; + ready_queue local_ops; { conditionally_enabled_mutex::scoped_lock lock(mutex); @@ -292,8 +293,9 @@ reactor_descriptor_state::invoke_deferred_io() } // Execute first handler inline — the scheduler's work_cleanup - // accounts for this as the "consumed" work item - scheduler_op* first = local_ops.pop(); + // accounts for this as the "consumed" work item. local_ops holds + // only ops, so the popped entry decodes directly. + scheduler_op* first = ready_as_op(local_ops.pop()); if (first) { scheduler_->post_deferred_completions(local_ops); diff --git a/include/boost/corosio/native/detail/reactor/reactor_op.hpp b/include/boost/corosio/native/detail/reactor/reactor_op.hpp index 34916f6aa..bc89bd57a 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_op.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_op.hpp @@ -13,16 +13,12 @@ #include #include #include -#include #include #include -#include #include -#include #include #include -#include #include @@ -50,7 +46,7 @@ namespace boost::corosio::detail { template struct reactor_op : reactor_op_base { - // The op envelope — coroutine handle h, cont_op, executor ex, ec_out, + // The op envelope — coroutine handle h, cont, executor ex, ec_out, // bytes_out, cancelled, stop_cb (+ its canceller), impl_ptr — lives in // coro_op (via reactor_op_base) and is shared with io_uring/IOCP. // reactor_op adds only the reactor-specific routing state below. diff --git a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp index 1a46974a5..6e7cabf92 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -50,7 +51,7 @@ struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context reactor_scheduler_context* next; /// Private work queue for reduced contention. - op_queue private_queue; + ready_queue private_queue; /// Unflushed work count for the private queue. std::int64_t private_outstanding_work; @@ -107,7 +108,7 @@ inline bool reactor_drain_private_queue( reactor_scheduler_context* ctx, std::atomic& outstanding_work, - op_queue& completed_ops) noexcept + ready_queue& completed_ops) noexcept { if (!ctx || ctx->private_queue.empty()) return false; @@ -154,6 +155,9 @@ class reactor_scheduler /// Post a scheduler operation for deferred execution. void post(scheduler_op* h) const override; + /// Post a continuation for deferred execution. + void post(capy::continuation&) const override; + /// Return true if called from a thread running this scheduler. bool running_in_this_thread() const noexcept override; @@ -216,7 +220,7 @@ class reactor_scheduler @param queue The private queue to drain. @param count Private work count to flush before draining. */ - void drain_thread_queue(op_queue& queue, std::int64_t count) const; + void drain_thread_queue(ready_queue& queue, std::int64_t count) const; /** Post completed operations for deferred invocation. @@ -230,7 +234,7 @@ class reactor_scheduler @param ops Queue of operations to post. */ - void post_deferred_completions(op_queue& ops) const; + void post_deferred_completions(ready_queue& ops) const; /** Apply runtime configuration to the scheduler. @@ -299,7 +303,7 @@ class reactor_scheduler mutable mutex_type mutex_{true}; mutable event_type cond_{true}; - mutable op_queue completed_ops_; + mutable ready_queue completed_ops_; mutable std::atomic outstanding_work_{0}; std::atomic stopped_{false}; mutable std::atomic task_running_{false}; @@ -490,13 +494,6 @@ reactor_scheduler::post(std::coroutine_handle<> h) const { auto saved = h_; delete this; - // Ensure stores from the posting thread are visible. TSan - // cannot instrument standalone fences; this acquire pairs - // with the posting thread's release and is intentional. - BOOST_COROSIO_GCC_WARNING_PUSH - BOOST_COROSIO_GCC_WARNING_DISABLE("-Wtsan") - std::atomic_thread_fence(std::memory_order_acquire); - BOOST_COROSIO_GCC_WARNING_POP saved.resume(); } @@ -541,6 +538,23 @@ reactor_scheduler::post(scheduler_op* h) const wake_one_thread_and_unlock(lock); } +inline void +reactor_scheduler::post(capy::continuation& c) const +{ + if (auto* ctx = reactor_find_context(this)) + { + ++ctx->private_outstanding_work; + ctx->private_queue.push(c); + return; + } + + outstanding_work_.fetch_add(1, std::memory_order_relaxed); + + lock_type lock(mutex_); + completed_ops_.push(c); + wake_one_thread_and_unlock(lock); +} + inline bool reactor_scheduler::running_in_this_thread() const noexcept { @@ -686,7 +700,7 @@ reactor_scheduler::compensating_work_started() const noexcept inline void reactor_scheduler::drain_thread_queue( - op_queue& queue, std::int64_t count) const + ready_queue& queue, std::int64_t count) const { if (count > 0) outstanding_work_.fetch_add(count, std::memory_order_relaxed); @@ -698,7 +712,7 @@ reactor_scheduler::drain_thread_queue( } inline void -reactor_scheduler::post_deferred_completions(op_queue& ops) const +reactor_scheduler::post_deferred_completions(ready_queue& ops) const { if (ops.empty()) return; @@ -719,13 +733,24 @@ reactor_scheduler::shutdown_drain() { lock_type lock(mutex_); - while (auto* h = completed_ops_.pop()) + while (auto e = completed_ops_.pop()) { - if (h == &task_op_) - continue; - lock.unlock(); - h->destroy(); - lock.lock(); + if (ready_is_continuation(e)) + { + lock.unlock(); + if (auto h = ready_as_cont(e)->h) + h.destroy(); + lock.lock(); + } + else + { + auto* op = ready_as_op(e); + if (op == &task_op_) + continue; + lock.unlock(); + op->destroy(); + lock.lock(); + } } signal_all(lock); @@ -866,7 +891,8 @@ reactor_scheduler::do_one( if (stopped_.load(std::memory_order_acquire)) return 0; - scheduler_op* op = completed_ops_.pop(); + std::uintptr_t e = completed_ops_.pop(); + scheduler_op* op = ready_is_continuation(e) ? nullptr : ready_as_op(e); // Handle reactor sentinel — time to poll for I/O if (op == &task_op_) @@ -906,8 +932,8 @@ reactor_scheduler::do_one( continue; } - // Handle operation - if (op != nullptr) + // Handle ready entry (op or continuation) + if (e != 0) { bool more = !completed_ops_.empty(); @@ -922,7 +948,10 @@ reactor_scheduler::do_one( work_cleanup on_exit{this, &lock, ctx}; (void)on_exit; - (*op)(); + if (ready_is_continuation(e)) + ready_as_cont(e)->h.resume(); + else + (*op)(); return 1; } diff --git a/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp b/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp index 217813299..8de63d0de 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp @@ -375,8 +375,8 @@ reactor_stream_socketsvc_.scheduler().try_consume_inline_budget()) { *ec = err ? make_err(err) : std::error_code{}; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.reset(); op.h = h; @@ -487,8 +487,8 @@ reactor_stream_socketsvc_.scheduler().try_consume_inline_budget()) { *ec = std::error_code{}; - op.cont_op.cont.h = h; - return dispatch_coro(ex, op.cont_op.cont); + op.cont.h = h; + return dispatch_coro(ex, op.cont); } op.reset(); op.wait_event = reactor_event_write; diff --git a/include/boost/corosio/native/detail/select/select_scheduler.hpp b/include/boost/corosio/native/detail/select/select_scheduler.hpp index 07d5d5600..e403b2e0b 100644 --- a/include/boost/corosio/native/detail/select/select_scheduler.hpp +++ b/include/boost/corosio/native/detail/select/select_scheduler.hpp @@ -383,7 +383,7 @@ select_scheduler::run_task( // Process timers outside the lock timer_svc_->process_expired(); - op_queue local_ops; + ready_queue local_ops; if (ready > 0) { @@ -425,8 +425,7 @@ select_scheduler::run_task( lock.lock(); - if (!local_ops.empty()) - completed_ops_.splice(local_ops); + completed_ops_.splice(local_ops); } } // namespace boost::corosio::detail diff --git a/include/boost/corosio/native/native_timer.hpp b/include/boost/corosio/native/native_timer.hpp index b0716e9a3..93c5a76e5 100644 --- a/include/boost/corosio/native/native_timer.hpp +++ b/include/boost/corosio/native/native_timer.hpp @@ -53,7 +53,7 @@ class native_timer : public timer native_timer& self_; std::stop_token token_; mutable std::error_code ec_; - detail::continuation_op cont_op_; + capy::continuation cont_; explicit native_wait_awaitable(native_timer& self) noexcept : self_(self) @@ -76,7 +76,7 @@ class native_timer : public timer -> std::coroutine_handle<> { token_ = env->stop_token; - cont_op_.cont.h = h; + cont_.h = h; auto& impl = self_.get_impl(); // Fast path: already expired and not in the heap if (impl.heap_index_ == timer::implementation::npos && @@ -85,10 +85,10 @@ class native_timer : public timer { ec_ = {}; auto d = env->executor; - d.post(cont_op_.cont); + d.post(cont_); return std::noop_coroutine(); } - return impl.wait(h, env->executor, std::move(token_), &ec_, &cont_op_.cont); + return impl.wait(h, env->executor, std::move(token_), &ec_, &cont_); } }; diff --git a/include/boost/corosio/tcp_server.hpp b/include/boost/corosio/tcp_server.hpp index b0fd02b9c..d959c4cda 100644 --- a/include/boost/corosio/tcp_server.hpp +++ b/include/boost/corosio/tcp_server.hpp @@ -159,7 +159,7 @@ class BOOST_COROSIO_DECL tcp_server { waiter* next; std::coroutine_handle<> h; - detail::continuation_op cont_op; + capy::continuation cont; worker_base* w; }; @@ -348,7 +348,7 @@ class BOOST_COROSIO_DECL tcp_server { tcp_server& self_; worker_base& w_; - detail::continuation_op cont_op_; + capy::continuation cont_; public: push_awaitable(tcp_server& self, worker_base& w) noexcept @@ -366,8 +366,8 @@ class BOOST_COROSIO_DECL tcp_server await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept { // Symmetric transfer to server's executor - cont_op_.cont.h = h; - return self_.ex_.dispatch(cont_op_.cont); + cont_.h = h; + return self_.ex_.dispatch(cont_); } void await_resume() noexcept @@ -380,8 +380,8 @@ class BOOST_COROSIO_DECL tcp_server auto* wait = self_.waiters_; self_.waiters_ = wait->next; wait->w = &w_; - wait->cont_op.cont.h = wait->h; - self_.ex_.post(wait->cont_op.cont); + wait->cont.h = wait->h; + self_.ex_.post(wait->cont); } else { @@ -438,8 +438,8 @@ class BOOST_COROSIO_DECL tcp_server auto* wait = waiters_; waiters_ = wait->next; wait->w = &w; - wait->cont_op.cont.h = wait->h; - ex_.post(wait->cont_op.cont); + wait->cont.h = wait->h; + ex_.post(wait->cont); } else { diff --git a/perf/bench/corosio/http_server_bench.cpp b/perf/bench/corosio/http_server_bench.cpp index bca9b13c0..bca5ba3be 100644 --- a/perf/bench/corosio/http_server_bench.cpp +++ b/perf/bench/corosio/http_server_bench.cpp @@ -15,17 +15,18 @@ #include #include #include -#include #include #include -#include #include #include #include #include #include +#include +#include #include +#include #include #include "../common/http_protocol.hpp" @@ -37,6 +38,31 @@ namespace capy = boost::capy; namespace corosio_bench { namespace { +// Read into `buf` until `delim` appears; return the index just past it. +// capy dropped composed read_until, and this benchmark only needs to locate +// the header terminator; bytes past the delimiter stay buffered in `buf`. +template +capy::task> +read_until(Sock& sock, std::string& buf, std::string_view delim) +{ + std::size_t scan_from = 0; + for (;;) + { + if (auto pos = buf.find(delim, scan_from); pos != std::string::npos) + co_return {std::error_code{}, pos + delim.size()}; + // A full delimiter can only straddle the last delim.size()-1 bytes, + // so the next scan need not re-examine earlier bytes. + scan_from = + buf.size() >= delim.size() ? buf.size() - (delim.size() - 1) : 0; + char chunk[4096]; + auto [ec, n] = + co_await sock.read_some(capy::mutable_buffer(chunk, sizeof(chunk))); + buf.append(chunk, n); + if (ec) + co_return {ec, buf.size()}; + } +} + template capy::task<> server_task(corosio::native_tcp_socket& sock) @@ -45,8 +71,7 @@ server_task(corosio::native_tcp_socket& sock) for (;;) { - auto [ec, n] = co_await capy::read_until( - sock, capy::dynamic_buffer(buf), "\r\n\r\n"); + auto [ec, n] = co_await read_until(sock, buf, "\r\n\r\n"); if (ec) co_return; @@ -80,8 +105,7 @@ client_task( if (wec) co_return; - auto [ec, header_end] = co_await capy::read_until( - sock, capy::dynamic_buffer(buf), "\r\n\r\n"); + auto [ec, header_end] = co_await read_until(sock, buf, "\r\n\r\n"); if (ec) co_return; diff --git a/test/unit/context.hpp b/test/unit/context.hpp index e26f0e158..9cb31d6be 100644 --- a/test/unit/context.hpp +++ b/test/unit/context.hpp @@ -93,6 +93,14 @@ COROSIO_TEST_KQUEUE_(impl, name) \ COROSIO_TEST_SELECT_(impl, name) +// All backends except IOCP (which posts via a separate tagged-PQCS path +// and is not zero-alloc until that task is done). +#define COROSIO_NON_IOCP_BACKEND_TESTS(impl, name) \ + COROSIO_TEST_EPOLL_(impl, name) \ + COROSIO_TEST_KQUEUE_(impl, name) \ + COROSIO_TEST_SELECT_(impl, name) \ + COROSIO_TEST_IO_URING_(impl, name) + // Tests that destroy the io_context with ops still parked abandon the // suspended coroutine frames: op destroy() must not resume or destroy // them (resuming runs user code during teardown; destroying recurses diff --git a/test/unit/continuation_post.cpp b/test/unit/continuation_post.cpp new file mode 100644 index 000000000..d32020280 --- /dev/null +++ b/test/unit/continuation_post.cpp @@ -0,0 +1,154 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +namespace { + +struct bare_post_awaitable +{ + capy::continuation* cont; + + bool await_ready() const noexcept { return false; } + + void await_suspend( + std::coroutine_handle<> h, + capy::io_env const* env) noexcept + { + cont->h = h; + env->executor.post(*cont); + } + + void await_resume() const noexcept {} +}; + +inline capy::task<> +bare_post_task(capy::continuation* cont, bool& ran) +{ + co_await bare_post_awaitable{cont}; + ran = true; +} + +// TSan's runtime already replaces global operator new/delete, so the +// counting replacements below cannot link under -fsanitize=thread (and +// the counts would reflect the sanitizer's allocator anyway). +#if defined(__SANITIZE_THREAD__) +# define COROSIO_TEST_HAS_TSAN 1 +#elif defined(__has_feature) +# if __has_feature(thread_sanitizer) +# define COROSIO_TEST_HAS_TSAN 1 +# endif +#endif + +#ifndef COROSIO_TEST_HAS_TSAN +std::atomic alloc_armed{false}; +std::atomic alloc_count{0}; +#endif + +} // namespace + +} // namespace boost::corosio + +#ifndef COROSIO_TEST_HAS_TSAN +void* operator new(std::size_t n) +{ + if (boost::corosio::alloc_armed.load(std::memory_order_relaxed)) + boost::corosio::alloc_count.fetch_add(1, std::memory_order_relaxed); + if (void* p = std::malloc(n ? n : 1)) + return p; + throw std::bad_alloc{}; +} + +void operator delete(void* p) noexcept { std::free(p); } +void operator delete(void* p, std::size_t) noexcept { std::free(p); } +#endif + +namespace boost::corosio { + +template +struct continuation_post_test +{ + void testBarePostIsSafe() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + bool ran = false; + + // Allocate the continuation at the start of a fresh heap block + // so that any read before &cont lands in the ASan redzone — the + // worst-case layout for callers that subtract a struct offset + // from the continuation address. + auto cont = std::make_unique(); + + capy::run_async(ex)(bare_post_task(cont.get(), ran)); + ioc.run(); + + BOOST_TEST(ran); + } + + void run() + { + testBarePostIsSafe(); + } +}; + +#ifndef COROSIO_TEST_HAS_TSAN +// Assert post(continuation&) allocates nothing on backends that enqueue +// continuations directly on the ready queue (reactor + io_uring). +template +struct continuation_zero_alloc_test +{ + void testPostIsZeroAlloc() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + capy::continuation cont{}; + cont.h = std::noop_coroutine(); + + alloc_count.store(0); + alloc_armed.store(true, std::memory_order_relaxed); + ex.post(cont); + alloc_armed.store(false, std::memory_order_relaxed); + + BOOST_TEST(alloc_count.load() == 0LL); + ioc.run(); + } + + void run() + { + testPostIsZeroAlloc(); + } +}; +#endif + +COROSIO_BACKEND_TESTS(continuation_post_test, "boost.corosio.continuation_post") +#ifndef COROSIO_TEST_HAS_TSAN +COROSIO_NON_IOCP_BACKEND_TESTS( + continuation_zero_alloc_test, + "boost.corosio.continuation_post.zero_alloc") +#endif + +} // namespace boost::corosio diff --git a/test/unit/io_context.cpp b/test/unit/io_context.cpp index 97d96ec09..38be71691 100644 --- a/test/unit/io_context.cpp +++ b/test/unit/io_context.cpp @@ -10,8 +10,6 @@ // Test that header file is self-contained. #include -#include - #include #include #include @@ -229,9 +227,8 @@ make_check_coro(bool& result, io_context::executor_type& ex) } // Helper: post a bare coroutine handle (heap-allocating path). -// Test-only: production code embeds continuation_op in long-lived -// structures; this helper uses the executor's post(coroutine_handle<>) -// overload since the handle has no enclosing continuation_op. +// Test-only: uses the heap-allocating overload since the handle has +// no enclosing capy::continuation. inline void post_coro(io_context::executor_type& ex, std::coroutine_handle<> h) { @@ -711,31 +708,28 @@ struct io_context_test BOOST_TEST_EQ(destroyed, 3); } - // Exercises continuation_op::destroy() — invoked when shutdown drains - // queued continuation_op posts. The tagged-post path through - // executor::post(capy::continuation&) routes to scheduler::post(scheduler_op*) - // which enqueues without heap allocation; on shutdown the queue is drained - // and destroy() must release each continuation's coroutine frame. + // Exercises scheduler shutdown drain on posted continuations. void testContinuationOpDestroyOnShutdown() { int destroyed = 0; - // Allocate the continuation_ops outside the io_context scope so the - // ops outlive the scheduler that points at them. - detail::continuation_op op1; - detail::continuation_op op2; - op1.cont.h = make_destroy_coro(destroyed); - op2.cont.h = make_destroy_coro(destroyed); + // Continuations live outside the io_context scope; the scheduler + // links them through their own reserved slot and destroys their + // coroutine frames on shutdown. + capy::continuation cont1; + capy::continuation cont2; + cont1.h = make_destroy_coro(destroyed); + cont2.h = make_destroy_coro(destroyed); { io_context ioc(Backend); auto ex = ioc.get_executor(); - ex.post(op1.cont); - ex.post(op2.cont); + ex.post(cont1); + ex.post(cont2); - // io_context destructor drains scheduler queue and calls - // continuation_op::destroy() on each. + // io_context destructor drains the ready queue and calls + // h.destroy() on each queued continuation's handle. } BOOST_TEST_EQ(destroyed, 2); diff --git a/test/unit/ready_queue.cpp b/test/unit/ready_queue.cpp new file mode 100644 index 000000000..7bad8780f --- /dev/null +++ b/test/unit/ready_queue.cpp @@ -0,0 +1,94 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#include +#include +#include + +#include "test_suite.hpp" + +using namespace boost::corosio::detail; + +namespace { + +struct fake_op : scheduler_op +{ + int id; + explicit fake_op(int i) : id(i) {} + void operator()() override {} +}; + +} // namespace + +struct ready_queue_test +{ + void testFifoOpsOnly() + { + ready_queue q; + BOOST_TEST(q.empty()); + fake_op a{1}, b{2}; + q.push(&a); + q.push(&b); + BOOST_TEST(!q.empty()); + + auto e1 = q.pop(); + BOOST_TEST(!ready_is_continuation(e1)); + BOOST_TEST(ready_as_op(e1) == &a); + + auto e2 = q.pop(); + BOOST_TEST(ready_as_op(e2) == &b); + + BOOST_TEST(q.pop() == 0u); + BOOST_TEST(q.empty()); + } + + void testMixedOrderPreserved() + { + ready_queue q; + fake_op a{1}; + boost::capy::continuation c{}; + fake_op b{2}; + q.push(&a); + q.push(c); + q.push(&b); + + auto e1 = q.pop(); + BOOST_TEST(!ready_is_continuation(e1)); + BOOST_TEST(ready_as_op(e1) == &a); + + auto e2 = q.pop(); + BOOST_TEST(ready_is_continuation(e2)); + BOOST_TEST(ready_as_cont(e2) == &c); + + auto e3 = q.pop(); + BOOST_TEST(ready_as_op(e3) == &b); + } + + void testSpliceAppendsInOrder() + { + ready_queue q1, q2; + fake_op a{1}, b{2}; + q1.push(&a); + q2.push(&b); + q1.splice(q2); + BOOST_TEST(q2.empty()); + BOOST_TEST(ready_as_op(q1.pop()) == &a); + BOOST_TEST(ready_as_op(q1.pop()) == &b); + BOOST_TEST(q1.pop() == 0u); + } + + void run() + { + testFifoOpsOnly(); + testMixedOrderPreserved(); + testSpliceAppendsInOrder(); + } +}; + +TEST_SUITE(ready_queue_test, "boost.corosio.ready_queue"); diff --git a/test/unit/tcp_socket.cpp b/test/unit/tcp_socket.cpp index 28e9a6d09..b5303d14c 100644 --- a/test/unit/tcp_socket.cpp +++ b/test/unit/tcp_socket.cpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -882,15 +881,13 @@ struct tcp_socket_test auto task = [](tcp_socket& a, tcp_socket& b) -> capy::task<> { std::string send_data = "Hello, this is a test message!"; (void)co_await capy::write(a, capy::make_buffer(send_data)); - a.close(); - // Read into string until EOF using dynamic buffer - std::string result; - auto [ec, n] = - co_await capy::read(b, capy::string_dynamic_buffer(&result)); + char buf[64] = {}; + auto [ec, n] = co_await capy::read( + b, capy::mutable_buffer(buf, send_data.size())); BOOST_TEST(!ec); BOOST_TEST_EQ(n, send_data.size()); - BOOST_TEST_EQ(result, send_data); + BOOST_TEST_EQ(std::string_view(buf, n), send_data); }; capy::run_async(ioc.get_executor())(task(s1, s2));