From 39efec6e9e9df7306f306d90cce4b73297cf6e4d Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 10 Feb 2026 21:59:25 +0800 Subject: [PATCH 1/3] refactor finish_handle to support handle_cqe() func --- include/condy/awaiters.hpp | 2 +- include/condy/concepts.hpp | 9 +- include/condy/finish_handles.hpp | 151 ++++++++++++++++++------------ include/condy/runtime.hpp | 30 +----- include/condy/work_type.hpp | 2 - tests/test_awaiter_operations.cpp | 2 +- tests/test_op_awaiter.cpp | 14 ++- tests/test_op_finish_handle.cpp | 34 +++++-- tests/test_ring.cpp | 10 +- tests/test_work_type.cpp | 3 +- 10 files changed, 143 insertions(+), 114 deletions(-) diff --git a/include/condy/awaiters.hpp b/include/condy/awaiters.hpp index 9b86a3bb..0296c0e3 100644 --- a/include/condy/awaiters.hpp +++ b/include/condy/awaiters.hpp @@ -100,7 +100,7 @@ class OpAwaiterBase { prep_func_(sqe); sqe->flags |= static_cast(flags); io_uring_sqe_set_data( - sqe, encode_work(&finish_handle_.get(), Handle::work_type)); + sqe, encode_work(&finish_handle_.get(), WorkType::Common)); } protected: diff --git a/include/condy/concepts.hpp b/include/condy/concepts.hpp index fce4a1fb..7e351949 100644 --- a/include/condy/concepts.hpp +++ b/include/condy/concepts.hpp @@ -26,10 +26,11 @@ concept HandleLike = requires(T handle, Invoker *invoker) { }; template -concept OpFinishHandleLike = HandleLike && requires(T handle) { - { handle.invoke() } -> std::same_as; - { handle.set_result(0, 0) } -> std::same_as; -}; +concept OpFinishHandleLike = + HandleLike && requires(T handle, io_uring_cqe *cqe) { + { handle.invoke() } -> std::same_as; + { handle.handle_cqe(cqe) } -> std::same_as; + }; template concept AwaiterLike = requires(T awaiter) { diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 1c0eda44..89a5323a 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -22,30 +22,32 @@ #include #include -// Cancel method for finish handles -#define DEFINE_CANCEL_METHOD_() \ - void cancel() { \ - auto *ring = detail::Context::current().ring(); \ - io_uring_sqe *sqe = ring->get_sqe(); \ - io_uring_prep_cancel(sqe, encode_work(this, work_type), 0); \ - io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); \ - io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); \ - } - namespace condy { class Ring; class OpFinishHandle : public InvokerAdapter { public: - static constexpr WorkType work_type = WorkType::Common; using ReturnType = int32_t; + struct Action { + bool queue_work; + bool op_finish; + }; + using HandleCQEFunc = Action (*)(void *, io_uring_cqe *); + + void cancel() { + auto *ring = detail::Context::current().ring(); + io_uring_sqe *sqe = ring->get_sqe(); + io_uring_prep_cancel(sqe, this, 0); + io_uring_sqe_set_data(sqe, encode_work(nullptr, WorkType::Ignore)); + io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); + } - DEFINE_CANCEL_METHOD_(); + Action handle_cqe(io_uring_cqe *cqe) { return handle_func_(this, cqe); } - void set_result(int32_t res, uint32_t flags) { - res_ = res; - flags_ = flags; + Action handle_cqe_impl(io_uring_cqe *cqe) { + res_ = cqe->res; + return {.queue_work = true, .op_finish = true}; } void invoke() { @@ -57,68 +59,61 @@ class OpFinishHandle : public InvokerAdapter { void set_invoker(Invoker *invoker) { invoker_ = invoker; } -protected: - int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set - uint32_t flags_ = 0; - Invoker *invoker_ = nullptr; -}; - -class ExtendOpFinishHandle : public OpFinishHandle { -public: - using ExtendFunc = void (*)(void *, int32_t); - - void invoke_extend(int32_t res) { - assert(extend_func_ != nullptr); - extend_func_(this, res); +private: + static Action handle_cqe_static_(void *data, io_uring_cqe *cqe) { + auto *self = static_cast(data); + return self->handle_cqe_impl(cqe); } protected: - ExtendFunc extend_func_ = nullptr; + HandleCQEFunc handle_func_ = handle_cqe_static_; + Invoker *invoker_ = nullptr; + int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set }; template class MultiShotMixin : public HandleBase { public: - static constexpr WorkType work_type = WorkType::MultiShot; - template MultiShotMixin(Func func, Args &&...args) : HandleBase(std::forward(args)...), func_(std::move(func)) { - this->extend_func_ = [](void *data, int32_t) { - auto *self = static_cast *>(data); - self->invoke_multishot_(); - }; + this->handle_func_ = handle_cqe_static_; } - DEFINE_CANCEL_METHOD_(); + OpFinishHandle::Action + handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { + if (cqe->flags & IORING_CQE_F_MORE) { + HandleBase::handle_cqe_impl(cqe); + func_(HandleBase::extract_result()); + return {.queue_work = false, .op_finish = false}; + } else { + HandleBase::handle_cqe_impl(cqe); + return {.queue_work = true, .op_finish = true}; + } + } private: - void invoke_multishot_() { func_(HandleBase::extract_result()); } + static OpFinishHandle::Action handle_cqe_static_(void *data, + io_uring_cqe *cqe) { + auto *self = static_cast(data); + return self->handle_cqe_impl(cqe); + } -private: +protected: Func func_; }; template -using MultiShotOpFinishHandle = - MultiShotMixin; +using MultiShotOpFinishHandle = MultiShotMixin; template class ZeroCopyMixin : public HandleBase { public: - static constexpr WorkType work_type = WorkType::ZeroCopy; - template ZeroCopyMixin(Func func, Args &&...args) : HandleBase(std::forward(args)...), free_func_(std::move(func)) { - this->func_ = [](void *data) { - auto *self = static_cast *>(data); - self->invoke(); - }; - this->extend_func_ = [](void *data, int32_t res) { - auto *self = static_cast *>(data); - self->invoke_notify_(res); - }; + this->func_ = invoke_static_; + this->handle_func_ = handle_cqe_static_; } void invoke() /* fake override */ { @@ -131,10 +126,24 @@ class ZeroCopyMixin : public HandleBase { } } - DEFINE_CANCEL_METHOD_(); + OpFinishHandle::Action + handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { + if (cqe->flags & IORING_CQE_F_MORE) { + HandleBase::handle_cqe_impl(cqe); + return {.queue_work = true, .op_finish = false}; + } else { + if (cqe->flags & IORING_CQE_F_NOTIF) { + notify_(cqe->res); + return {.queue_work = false, .op_finish = true}; + } else { + HandleBase::handle_cqe_impl(cqe); + return {.queue_work = true, .op_finish = true}; + } + } + } private: - void invoke_notify_(int32_t res) { + void notify_(int32_t res) { notify_res_ = res; notified_ = true; if (resumed_) { @@ -143,7 +152,18 @@ class ZeroCopyMixin : public HandleBase { } } -private: + static void invoke_static_(void *data) { + auto *self = static_cast(data); + self->invoke(); + } + + static OpFinishHandle::Action handle_cqe_static_(void *data, + io_uring_cqe *cqe) { + auto *self = static_cast(data); + return self->handle_cqe_impl(cqe); + } + +protected: Func free_func_; int32_t notify_res_ = -ENOTRECOVERABLE; // Use these flags to handle race between invoke and notify @@ -152,7 +172,7 @@ class ZeroCopyMixin : public HandleBase { }; template -using ZeroCopyOpFinishHandle = ZeroCopyMixin; +using ZeroCopyOpFinishHandle = ZeroCopyMixin; template class SelectBufferMixin : public HandleBase { @@ -161,7 +181,16 @@ class SelectBufferMixin : public HandleBase { template SelectBufferMixin(Br *buffers, Args &&...args) - : HandleBase(std::forward(args)...), buffers_(buffers) {} + : HandleBase(std::forward(args)...), buffers_(buffers) { + this->handle_func_ = handle_cqe_static_; + } + + OpFinishHandle::Action + handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { + HandleBase::handle_cqe_impl(cqe); + this->flags_ = cqe->flags; + return {.queue_work = true, .op_finish = true}; + } ReturnType extract_result() { int res = this->res_; @@ -170,6 +199,14 @@ class SelectBufferMixin : public HandleBase { } private: + static OpFinishHandle::Action handle_cqe_static_(void *data, + io_uring_cqe *cqe) { + auto *self = static_cast(data); + return self->handle_cqe_impl(cqe); + } + +protected: + uint32_t flags_ = 0; Br *buffers_; }; @@ -178,7 +215,7 @@ using SelectBufferOpFinishHandle = SelectBufferMixin; template using MultiShotSelectBufferOpFinishHandle = - MultiShotMixin>; + MultiShotMixin>; template class RangedParallelFinishHandle { public: @@ -437,6 +474,4 @@ class WhenAnyFinishHandle : public ParallelAnyFinishHandle { } }; -#undef DEFINE_CANCEL_METHOD_ - } // namespace condy diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 77990421..5624a3ea 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -361,35 +361,15 @@ class Runtime { tsan_acquire(data); local_queue_.push_back(work); } - } else if (type == WorkType::MultiShot) { - auto *handle = static_cast(data); - handle->set_result(cqe->res, cqe->flags); - if (cqe->flags & IORING_CQE_F_MORE) { - handle->invoke_extend(0); // res not used here - } else { + } else if (type == WorkType::Common) { + auto *handle = static_cast(data); + auto action = handle->handle_cqe(cqe); + if (action.op_finish) { pending_works_--; - local_queue_.push_back(handle); } - } else if (type == WorkType::ZeroCopy) { - auto *handle = static_cast(data); - if (cqe->flags & IORING_CQE_F_MORE) { - handle->set_result(cqe->res, cqe->flags); + if (action.queue_work) { local_queue_.push_back(handle); - } else { - pending_works_--; - if (cqe->flags & IORING_CQE_F_NOTIF) { - handle->invoke_extend(cqe->res); - } else { - handle->set_result(cqe->res, cqe->flags); - local_queue_.push_back(handle); - handle->invoke_extend(0); - } } - } else if (type == WorkType::Common) { - auto *handle = static_cast(data); - handle->set_result(cqe->res, cqe->flags); - pending_works_--; - local_queue_.push_back(handle); } else { assert(false && "Invalid work type"); } diff --git a/include/condy/work_type.hpp b/include/condy/work_type.hpp index 4f60b6cd..dc1d7cee 100644 --- a/include/condy/work_type.hpp +++ b/include/condy/work_type.hpp @@ -16,8 +16,6 @@ enum class WorkType : uint8_t { Notify, SendFd, Schedule, - MultiShot, - ZeroCopy, }; inline std::pair decode_work(void *ptr) { diff --git a/tests/test_awaiter_operations.cpp b/tests/test_awaiter_operations.cpp index 598f08d6..3ecc14d9 100644 --- a/tests/test_awaiter_operations.cpp +++ b/tests/test_awaiter_operations.cpp @@ -22,7 +22,7 @@ void event_loop(size_t &unfinished) { return; } auto handle_ptr = static_cast(data); - handle_ptr->set_result(cqe->res, cqe->flags); + handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); } diff --git a/tests/test_op_awaiter.cpp b/tests/test_op_awaiter.cpp index 6f512bb0..cf183df2 100644 --- a/tests/test_op_awaiter.cpp +++ b/tests/test_op_awaiter.cpp @@ -1,3 +1,4 @@ +#include "condy/finish_handles.hpp" #include "condy/provided_buffers.hpp" #include #include @@ -18,7 +19,7 @@ void event_loop(size_t &unfinished) { return; } auto handle_ptr = static_cast(data); - handle_ptr->set_result(cqe->res, cqe->flags); + handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); } @@ -163,10 +164,13 @@ void mock_multishot_event_loop(size_t &unfinished) { if (type == condy::WorkType::Ignore) { return; } - auto handle_ptr = static_cast(data); - handle_ptr->set_result(42, 0); - handle_ptr->invoke_extend(0); // Multishot - handle_ptr->set_result(cqe->res, cqe->flags); + auto handle_ptr = static_cast(data); + // Mock Multishot + io_uring_cqe mock_cqe = *cqe; + mock_cqe.res = 42; + mock_cqe.flags |= IORING_CQE_F_MORE; + handle_ptr->handle_cqe(&mock_cqe); + handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); } diff --git a/tests/test_op_finish_handle.cpp b/tests/test_op_finish_handle.cpp index 56ab452e..09777bfa 100644 --- a/tests/test_op_finish_handle.cpp +++ b/tests/test_op_finish_handle.cpp @@ -30,7 +30,7 @@ void event_loop(size_t &unfinished) { return; } auto handle_ptr = static_cast(data); - handle_ptr->set_result(cqe->res, cqe->flags); + handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); } @@ -62,7 +62,9 @@ TEST_CASE("test op_finish_handle - basic usage") { ring.reap_completions([](io_uring_cqe *cqe) { auto handle_ptr = static_cast(io_uring_cqe_get_data(cqe)); - handle_ptr->set_result(42, 0); + io_uring_cqe mock_cqe = *cqe; + mock_cqe.res = 42; + handle_ptr->handle_cqe(&mock_cqe); (*handle_ptr)(); }); @@ -164,13 +166,16 @@ TEST_CASE("test op_finish_handle - multishot op") { invoker(); }; - condy::MultiShotMixin handle( - func); + condy::MultiShotMixin handle(func); REQUIRE(!invoker.finished); - handle.set_result(1, 0); - handle.invoke_extend(0); // Multishot + io_uring_cqe cqe{}; + cqe.res = 1; + cqe.flags |= IORING_CQE_F_MORE; // Indicate more results to come + auto act = handle.handle_cqe(&cqe); // Multishot REQUIRE(invoker.finished); REQUIRE(invoker.result == 1); + REQUIRE(!act.op_finish); + REQUIRE(!act.queue_work); } TEST_CASE("test op_finish_handle - zero copy op") { @@ -180,15 +185,24 @@ TEST_CASE("test op_finish_handle - zero copy op") { auto func = [&](int r) { res = r; }; auto *handle = - new condy::ZeroCopyMixin( - func); + new condy::ZeroCopyMixin(func); handle->set_invoker(&invoker); REQUIRE(!invoker.finished); - handle->set_result(1, 0); + io_uring_cqe cqe{}; + cqe.res = 1; + cqe.flags |= IORING_CQE_F_MORE; // Indicate more results to come + auto act1 = handle->handle_cqe(&cqe); + REQUIRE(act1.queue_work); + REQUIRE(!act1.op_finish); (*handle)(); REQUIRE(invoker.finished); REQUIRE(handle->extract_result() == 1); REQUIRE(res == -1); - handle->invoke_extend(2); // Notify + io_uring_cqe cqe2{}; + cqe2.res = 2; + cqe2.flags |= IORING_CQE_F_NOTIF; + auto act2 = handle->handle_cqe(&cqe2); + REQUIRE(act2.op_finish); + REQUIRE(!act2.queue_work); REQUIRE(res == 2); } \ No newline at end of file diff --git a/tests/test_ring.cpp b/tests/test_ring.cpp index 3d382eea..2ee0ee64 100644 --- a/tests/test_ring.cpp +++ b/tests/test_ring.cpp @@ -23,9 +23,8 @@ TEST_CASE("test ring - register and complete ops") { ring.init(8, ¶ms); constexpr size_t num_ops = 4; - OpFinishHandle handles[num_ops]; + std::vector handles(num_ops); for (size_t i = 0; i < num_ops; i++) { - handles[i].set_result(-1, 0); auto *sqe = ring.get_sqe(); io_uring_prep_nop(sqe); io_uring_sqe_set_data(sqe, &handles[i]); @@ -40,7 +39,7 @@ TEST_CASE("test ring - register and complete ops") { OpFinishHandle *handle = reinterpret_cast(io_uring_cqe_get_data(cqe)); REQUIRE(handle != nullptr); - handle->set_result(cqe->res, 0); + handle->handle_cqe(cqe); count++; }); } @@ -66,9 +65,8 @@ TEST_CASE("test ring - cancel ops") { .tv_nsec = 0, }; constexpr size_t num_ops = 8; - OpFinishHandle handles[num_ops]; + std::vector handles(num_ops); for (size_t i = 0; i < num_ops; i++) { - handles[i].set_result(-1, 0); auto *sqe = ring.get_sqe(); if (i % 2 == 0) { io_uring_prep_nop(sqe); @@ -101,7 +99,7 @@ TEST_CASE("test ring - cancel ops") { OpFinishHandle *handle = reinterpret_cast(io_uring_cqe_get_data(cqe)); REQUIRE(handle != nullptr); - handle->set_result(cqe->res, 0); + handle->handle_cqe(cqe); if (cqe->res == -ECANCELED) { canceled_count++; } diff --git a/tests/test_work_type.cpp b/tests/test_work_type.cpp index 994ddf0a..f52b0027 100644 --- a/tests/test_work_type.cpp +++ b/tests/test_work_type.cpp @@ -26,7 +26,6 @@ TEST_CASE("test work_type - encode and decode") { test_type(condy::WorkType::Common); test_type(condy::WorkType::Ignore); test_type(condy::WorkType::Notify); + test_type(condy::WorkType::SendFd); test_type(condy::WorkType::Schedule); - test_type(condy::WorkType::MultiShot); - test_type(condy::WorkType::ZeroCopy); } \ No newline at end of file From 250c991c27744cac9d16e4807808ae007f303d43 Mon Sep 17 00:00:00 2001 From: wokron Date: Wed, 11 Feb 2026 15:55:13 +0800 Subject: [PATCH 2/3] update zc handle logic --- include/condy/finish_handles.hpp | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index 89a5323a..f0b0741a 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -120,10 +120,10 @@ class ZeroCopyMixin : public HandleBase { assert(this->invoker_ != nullptr); (*this->invoker_)(); resumed_ = true; - if (notified_) { - free_func_(notify_res_); - delete this; - } + // Invocation of free_func_ should be delayed until the operation is + // finished since user may adjust the behavior of free_func_ based on + // the result of the operation. + maybe_free_(); } OpFinishHandle::Action @@ -136,6 +136,10 @@ class ZeroCopyMixin : public HandleBase { notify_(cqe->res); return {.queue_work = false, .op_finish = true}; } else { + // Only one cqe means the operation is finished without + // notification. This is rare but possible. + // https://github.com/axboe/liburing/issues/1462 + notify_(0); HandleBase::handle_cqe_impl(cqe); return {.queue_work = true, .op_finish = true}; } @@ -143,15 +147,20 @@ class ZeroCopyMixin : public HandleBase { } private: - void notify_(int32_t res) { - notify_res_ = res; - notified_ = true; - if (resumed_) { + void maybe_free_() { + if (resumed_ && notified_) { free_func_(notify_res_); delete this; } } + void notify_(int32_t res) { + assert(res != -ENOTRECOVERABLE); + notify_res_ = res; + notified_ = true; + maybe_free_(); + } + static void invoke_static_(void *data) { auto *self = static_cast(data); self->invoke(); From 4e176ae05e718107ae9832e177d7a7466dd3e482 Mon Sep 17 00:00:00 2001 From: wokron Date: Wed, 11 Feb 2026 17:23:44 +0800 Subject: [PATCH 3/3] add more zc testcase --- tests/test_async_operations.3.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_async_operations.3.cpp b/tests/test_async_operations.3.cpp index 9fca900a..b1d1503e 100644 --- a/tests/test_async_operations.3.cpp +++ b/tests/test_async_operations.3.cpp @@ -264,6 +264,7 @@ TEST_CASE("test async_operations - test send - zero copy") { auto func = [&]() -> condy::Coro { size_t n = co_await condy::async_send_zc( sv[1], condy::buffer(msg), 0, 0, [&](auto) { called = true; }); + REQUIRE(!called); // Callback should not be called REQUIRE(n == msg.size()); }; condy::sync_wait(func()); @@ -278,6 +279,22 @@ TEST_CASE("test async_operations - test send - zero copy") { close(sv[1]); } +TEST_CASE("test async_operations - test send - zero copy failed") { + auto msg = generate_data(1024); + bool called = false; + auto func = [&]() -> condy::Coro { + int r = co_await condy::async_send_zc(-1, condy::buffer(msg), 0, 0, + [&](auto r) { + REQUIRE(r == 0); + called = true; + }); + REQUIRE(!called); // Callback should not be called + REQUIRE(r == -EBADF); + }; + condy::sync_wait(func()); + REQUIRE(called); +} + TEST_CASE("test async_operations - test send - zero copy fixed buffer") { int sv[2]; create_tcp_socketpair(sv); @@ -296,6 +313,7 @@ TEST_CASE("test async_operations - test send - zero copy fixed buffer") { size_t n = co_await condy::async_send_zc( sv[1], condy::fixed(0, condy::buffer(msg)), 0, 0, [&](auto) { called = true; }); + REQUIRE(!called); // Callback should not be called REQUIRE(n == msg.size()); }; condy::sync_wait(func()); @@ -505,6 +523,7 @@ TEST_CASE("test async_operations - test sendto - zero copy") { size_t n = co_await condy::async_sendto_zc( sender_fd, condy::buffer(msg), 0, (sockaddr *)&recv_addr, sizeof(recv_addr), 0, [&](auto) { called = true; }); + REQUIRE(!called); // Callback should not be called REQUIRE(n == msg.size()); }; condy::sync_wait(func()); @@ -550,6 +569,7 @@ TEST_CASE("test async_operations - test sendto - zero copy fixed buffer") { sender_fd, condy::fixed(0, condy::buffer(msg)), 0, (sockaddr *)&recv_addr, sizeof(recv_addr), 0, [&](auto) { called = true; }); + REQUIRE(!called); // Callback should not be called REQUIRE(n == msg.size()); }; condy::sync_wait(func());