diff --git a/include/condy/awaiter_operations.hpp b/include/condy/awaiter_operations.hpp index 2143c5e5..3d5759bc 100644 --- a/include/condy/awaiter_operations.hpp +++ b/include/condy/awaiter_operations.hpp @@ -11,6 +11,7 @@ #include "condy/awaiters.hpp" #include "condy/concepts.hpp" +#include "condy/cqe_handler.hpp" namespace condy { @@ -25,7 +26,8 @@ auto make_op_awaiter(Func &&func, Args &&...args) { ... args = std::forward(args)](auto sqe) { func(sqe, args...); }; - return OpAwaiter(std::move(prep_func)); + return OpAwaiter( + std::move(prep_func)); } #if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13 @@ -38,7 +40,8 @@ auto make_op_awaiter128(Func &&func, Args &&...args) { ... args = std::forward(args)](auto sqe) { func(sqe, args...); }; - return OpAwaiter128(std::move(prep_func)); + return OpAwaiter( + std::move(prep_func)); } #endif @@ -52,8 +55,9 @@ auto make_multishot_op_awaiter(MultiShotFunc &&multishot_func, Func &&func, ... args = std::forward(args)](auto sqe) { func(sqe, args...); }; - return MultiShotOpAwaiter, decltype(prep_func)>( - std::forward(multishot_func), std::move(prep_func)); + return MultiShotOpAwaiter>( + std::move(prep_func), std::forward(multishot_func)); } /** @@ -67,9 +71,8 @@ auto make_select_buffer_op_awaiter(Br *buffers, Func &&func, Args &&...args) { sqe->flags |= IOSQE_BUFFER_SELECT; sqe->buf_group = bgid; }; - auto op = SelectBufferOpAwaiter( - buffers, std::move(prep_func)); - return op; + return OpAwaiter>( + std::move(prep_func), buffers); } /** @@ -86,11 +89,10 @@ auto make_multishot_select_buffer_op_awaiter(MultiShotFunc &&multishot_func, sqe->flags |= IOSQE_BUFFER_SELECT; sqe->buf_group = bgid; }; - auto op = MultiShotSelectBufferOpAwaiter, Br, - decltype(prep_func)>( - std::forward(multishot_func), buffers, - std::move(prep_func)); - return op; + return MultiShotOpAwaiter, + std::decay_t>( + std::move(prep_func), std::forward(multishot_func), + buffers); } #if !IO_URING_CHECK_VERSION(2, 7) // >= 2.7 @@ -107,9 +109,8 @@ auto make_bundle_select_buffer_op_awaiter(Br *buffers, Func &&func, sqe->buf_group = bgid; sqe->ioprio |= IORING_RECVSEND_BUNDLE; }; - auto op = SelectBufferOpAwaiter( - buffers, std::move(prep_func)); - return op; + return OpAwaiter>( + std::move(prep_func), buffers); } #endif @@ -128,11 +129,10 @@ auto make_multishot_bundle_select_buffer_op_awaiter( sqe->buf_group = bgid; sqe->ioprio |= IORING_RECVSEND_BUNDLE; }; - auto op = MultiShotSelectBufferOpAwaiter, Br, - decltype(prep_func)>( - std::forward(multishot_func), buffers, - std::move(prep_func)); - return op; + return MultiShotOpAwaiter, + std::decay_t>( + std::move(prep_func), std::forward(multishot_func), + buffers); } #endif @@ -146,8 +146,9 @@ auto make_zero_copy_op_awaiter(FreeFunc &&free_func, Func &&func, ... args = std::forward(args)](auto sqe) { func(sqe, args...); }; - return ZeroCopyOpAwaiter, decltype(prep_func)>( - std::forward(free_func), std::move(prep_func)); + return ZeroCopyOpAwaiter>( + std::move(prep_func), std::forward(free_func)); } /** diff --git a/include/condy/awaiters.hpp b/include/condy/awaiters.hpp index 0296c0e3..62c227c7 100644 --- a/include/condy/awaiters.hpp +++ b/include/condy/awaiters.hpp @@ -108,64 +108,47 @@ class OpAwaiterBase { HandleBox finish_handle_; }; -template -class [[nodiscard]] OpAwaiter : public OpAwaiterBase { +template +class [[nodiscard]] OpAwaiter + : public OpAwaiterBase, PrepFunc, SQE128> { public: - using Base = OpAwaiterBase; - OpAwaiter(Func func) : Base(OpFinishHandle(), func) {} -}; - -#if !IO_URING_CHECK_VERSION(2, 13) // >= 2.13 -template -class [[nodiscard]] OpAwaiter128 - : public OpAwaiterBase { -public: - using Base = OpAwaiterBase; - OpAwaiter128(Func func) : Base(OpFinishHandle(), func) {} + using Base = OpAwaiterBase, PrepFunc, SQE128>; + template + OpAwaiter(PrepFunc func, Args &&...args) + : Base(HandleBox( + OpFinishHandle(std::forward(args)...)), + func) {} }; -#endif -template +template class [[nodiscard]] MultiShotOpAwaiter - : public OpAwaiterBase, Func> { + : public OpAwaiterBase, + PrepFunc, SQE128> { public: - using Base = OpAwaiterBase, Func>; - MultiShotOpAwaiter(MultiShotFunc multishot_func, Func func) - : Base( - MultiShotOpFinishHandle(std::move(multishot_func)), - func) {} + using Base = + OpAwaiterBase, + PrepFunc, SQE128>; + template + MultiShotOpAwaiter(PrepFunc func, MultiShotFunc multishot_func, + Args &&...args) + : Base(HandleBox(MultiShotOpFinishHandle( + std::move(multishot_func), std::forward(args)...)), + func) {} }; -template +template class [[nodiscard]] ZeroCopyOpAwaiter - : public OpAwaiterBase, Func> { + : public OpAwaiterBase, + PrepFunc, SQE128> { public: - using Base = OpAwaiterBase, Func>; - ZeroCopyOpAwaiter(FreeFunc free_func, Func func) - : Base(ZeroCopyOpFinishHandle(std::move(free_func)), func) {} -}; - -template -class [[nodiscard]] SelectBufferOpAwaiter - : public OpAwaiterBase, Func> { -public: - using Base = OpAwaiterBase, Func>; - SelectBufferOpAwaiter(Br *buffers, Func func) - : Base(SelectBufferOpFinishHandle
(buffers), func) {} -}; - -template -class [[nodiscard]] MultiShotSelectBufferOpAwaiter - : public OpAwaiterBase< - MultiShotSelectBufferOpFinishHandle, Func> { -public: - using Base = - OpAwaiterBase, - Func>; - MultiShotSelectBufferOpAwaiter(MultiShotFunc multishot_func, Br *buffers, - Func func) - : Base(MultiShotSelectBufferOpFinishHandle( - std::move(multishot_func), buffers), + using Base = OpAwaiterBase, + PrepFunc, SQE128>; + template + ZeroCopyOpAwaiter(PrepFunc func, FreeFunc free_func, Args &&...args) + : Base(HandleBox(ZeroCopyOpFinishHandle( + std::move(free_func), std::forward(args)...)), func) {} }; diff --git a/include/condy/concepts.hpp b/include/condy/concepts.hpp index 7e351949..64db5956 100644 --- a/include/condy/concepts.hpp +++ b/include/condy/concepts.hpp @@ -14,8 +14,9 @@ namespace condy { namespace detail { struct FixedFd; +struct Action; -} +} // namespace detail template concept HandleLike = requires(T handle, Invoker *invoker) { @@ -29,9 +30,16 @@ template concept OpFinishHandleLike = HandleLike && requires(T handle, io_uring_cqe *cqe) { { handle.invoke() } -> std::same_as; - { handle.handle_cqe(cqe) } -> std::same_as; + { handle.handle_cqe(cqe) } -> std::same_as; }; +template +concept CQEHandlerLike = requires(T handler, io_uring_cqe *cqe) { + typename T::ReturnType; + { handler.handle_cqe(cqe) } -> std::same_as; + { handler.extract_result() } -> std::same_as; +}; + template concept AwaiterLike = requires(T awaiter) { typename T::HandleType; diff --git a/include/condy/cqe_handler.hpp b/include/condy/cqe_handler.hpp new file mode 100644 index 00000000..486ef96b --- /dev/null +++ b/include/condy/cqe_handler.hpp @@ -0,0 +1,52 @@ +/** + * @file cqe_handler.hpp + * @brief Definitions of CQE handlers + * @details This file defines a series of CQE handlers, which are responsible + * for processing the completion of asynchronous operations. Each handler + * defines a `handle_cqe` method to process the CQE and an `extract_result` + * method to retrieve the result of the operation. + */ + +#pragma once + +#include "condy/concepts.hpp" +#include +#include +#include + +namespace condy { + +class SimpleCQEHandler { +public: + using ReturnType = int32_t; + + void handle_cqe(io_uring_cqe *cqe) { res_ = cqe->res; } + + ReturnType extract_result() { return res_; } + +private: + int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set +}; + +template class SelectBufferCQEHandler { +public: + using ReturnType = std::pair; + + SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {} + + void handle_cqe(io_uring_cqe *cqe) { + res_ = cqe->res; + flags_ = cqe->flags; + } + + ReturnType extract_result() { + return std::make_pair(res_, buffers_->handle_finish(res_, flags_)); + } + +private: + int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set + uint32_t flags_ = 0; + Br *buffers_; +}; + +} // namespace condy \ No newline at end of file diff --git a/include/condy/finish_handles.hpp b/include/condy/finish_handles.hpp index f0b0741a..fb7120a3 100644 --- a/include/condy/finish_handles.hpp +++ b/include/condy/finish_handles.hpp @@ -26,14 +26,19 @@ namespace condy { class Ring; -class OpFinishHandle : public InvokerAdapter { +namespace detail { + +struct Action { + bool queue_work; + bool op_finish; +}; + +} // namespace detail + +class OpFinishHandleBase + : public InvokerAdapter { public: - using ReturnType = int32_t; - struct Action { - bool queue_work; - bool op_finish; - }; - using HandleCQEFunc = Action (*)(void *, io_uring_cqe *); + using HandleCQEFunc = detail::Action (*)(void *, io_uring_cqe *); void cancel() { auto *ring = detail::Context::current().ring(); @@ -43,11 +48,9 @@ class OpFinishHandle : public InvokerAdapter { io_uring_sqe_set_flags(sqe, IOSQE_CQE_SKIP_SUCCESS); } - Action handle_cqe(io_uring_cqe *cqe) { return handle_func_(this, cqe); } - - Action handle_cqe_impl(io_uring_cqe *cqe) { - res_ = cqe->res; - return {.queue_work = true, .op_finish = true}; + detail::Action handle_cqe(io_uring_cqe *cqe) { + assert(handle_func_ != nullptr); + return handle_func_(this, cqe); } void invoke() { @@ -55,20 +58,41 @@ class OpFinishHandle : public InvokerAdapter { (*invoker_)(); } - ReturnType extract_result() { return res_; } - void set_invoker(Invoker *invoker) { invoker_ = invoker; } +protected: + OpFinishHandleBase() = default; + +protected: + HandleCQEFunc handle_func_ = nullptr; + Invoker *invoker_ = nullptr; +}; + +template +class OpFinishHandle : public OpFinishHandleBase { +public: + using ReturnType = typename CQEHandler::ReturnType; + + template + OpFinishHandle(Args &&...args) : cqe_handler_(std::forward(args)...) { + this->handle_func_ = handle_cqe_static_; + } + + detail::Action handle_cqe_impl(io_uring_cqe *cqe) { + cqe_handler_.handle_cqe(cqe); + return {.queue_work = true, .op_finish = true}; + } + + ReturnType extract_result() { return cqe_handler_.extract_result(); } + private: - static Action handle_cqe_static_(void *data, io_uring_cqe *cqe) { + static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) { auto *self = static_cast(data); return self->handle_cqe_impl(cqe); } protected: - HandleCQEFunc handle_func_ = handle_cqe_static_; - Invoker *invoker_ = nullptr; - int32_t res_ = -ENOTRECOVERABLE; // Internal error if not set + CQEHandler cqe_handler_; }; template @@ -80,8 +104,7 @@ class MultiShotMixin : public HandleBase { this->handle_func_ = handle_cqe_static_; } - OpFinishHandle::Action - handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { + detail::Action handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { if (cqe->flags & IORING_CQE_F_MORE) { HandleBase::handle_cqe_impl(cqe); func_(HandleBase::extract_result()); @@ -93,8 +116,7 @@ class MultiShotMixin : public HandleBase { } private: - static OpFinishHandle::Action handle_cqe_static_(void *data, - io_uring_cqe *cqe) { + static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) { auto *self = static_cast(data); return self->handle_cqe_impl(cqe); } @@ -103,8 +125,9 @@ class MultiShotMixin : public HandleBase { Func func_; }; -template -using MultiShotOpFinishHandle = MultiShotMixin; +template +using MultiShotOpFinishHandle = + MultiShotMixin>; template class ZeroCopyMixin : public HandleBase { @@ -126,8 +149,7 @@ class ZeroCopyMixin : public HandleBase { maybe_free_(); } - OpFinishHandle::Action - handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { + detail::Action handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { if (cqe->flags & IORING_CQE_F_MORE) { HandleBase::handle_cqe_impl(cqe); return {.queue_work = true, .op_finish = false}; @@ -166,8 +188,7 @@ class ZeroCopyMixin : public HandleBase { self->invoke(); } - static OpFinishHandle::Action handle_cqe_static_(void *data, - io_uring_cqe *cqe) { + static detail::Action handle_cqe_static_(void *data, io_uring_cqe *cqe) { auto *self = static_cast(data); return self->handle_cqe_impl(cqe); } @@ -180,51 +201,9 @@ class ZeroCopyMixin : public HandleBase { bool notified_ = false; }; -template -using ZeroCopyOpFinishHandle = ZeroCopyMixin; - -template -class SelectBufferMixin : public HandleBase { -public: - using ReturnType = std::pair; - - template - SelectBufferMixin(Br *buffers, Args &&...args) - : HandleBase(std::forward(args)...), buffers_(buffers) { - this->handle_func_ = handle_cqe_static_; - } - - OpFinishHandle::Action - handle_cqe_impl(io_uring_cqe *cqe) /* fake override */ { - HandleBase::handle_cqe_impl(cqe); - this->flags_ = cqe->flags; - return {.queue_work = true, .op_finish = true}; - } - - ReturnType extract_result() { - int res = this->res_; - return std::make_pair( - res, buffers_->handle_finish(this->res_, this->flags_)); - } - -private: - static OpFinishHandle::Action handle_cqe_static_(void *data, - io_uring_cqe *cqe) { - auto *self = static_cast(data); - return self->handle_cqe_impl(cqe); - } - -protected: - uint32_t flags_ = 0; - Br *buffers_; -}; - -template -using SelectBufferOpFinishHandle = SelectBufferMixin; - -template -using MultiShotSelectBufferOpFinishHandle = - MultiShotMixin>; +template +using ZeroCopyOpFinishHandle = + ZeroCopyMixin>; template class RangedParallelFinishHandle { public: diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 5624a3ea..08cc6a85 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -362,7 +362,7 @@ class Runtime { local_queue_.push_back(work); } } else if (type == WorkType::Common) { - auto *handle = static_cast(data); + auto *handle = static_cast(data); auto action = handle->handle_cqe(cqe); if (action.op_finish) { pending_works_--; diff --git a/tests/test_awaiter_operations.cpp b/tests/test_awaiter_operations.cpp index 3ecc14d9..392774ba 100644 --- a/tests/test_awaiter_operations.cpp +++ b/tests/test_awaiter_operations.cpp @@ -21,7 +21,7 @@ void event_loop(size_t &unfinished) { if (type == condy::WorkType::Ignore) { return; } - auto handle_ptr = static_cast(data); + auto handle_ptr = static_cast(data); handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); diff --git a/tests/test_op_awaiter.cpp b/tests/test_op_awaiter.cpp index cf183df2..a2f7acda 100644 --- a/tests/test_op_awaiter.cpp +++ b/tests/test_op_awaiter.cpp @@ -18,7 +18,7 @@ void event_loop(size_t &unfinished) { if (type == condy::WorkType::Ignore) { return; } - auto handle_ptr = static_cast(data); + auto handle_ptr = static_cast(data); handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); @@ -164,7 +164,7 @@ void mock_multishot_event_loop(size_t &unfinished) { if (type == condy::WorkType::Ignore) { return; } - auto handle_ptr = static_cast(data); + auto handle_ptr = static_cast(data); // Mock Multishot io_uring_cqe mock_cqe = *cqe; mock_cqe.res = 42; diff --git a/tests/test_op_finish_handle.cpp b/tests/test_op_finish_handle.cpp index 09777bfa..189171ce 100644 --- a/tests/test_op_finish_handle.cpp +++ b/tests/test_op_finish_handle.cpp @@ -1,4 +1,5 @@ #include "condy/context.hpp" +#include "condy/cqe_handler.hpp" #include "condy/finish_handles.hpp" #include "condy/invoker.hpp" #include "condy/runtime.hpp" @@ -29,7 +30,7 @@ void event_loop(size_t &unfinished) { if (type == condy::WorkType::Ignore) { return; } - auto handle_ptr = static_cast(data); + auto handle_ptr = static_cast(data); handle_ptr->handle_cqe(cqe); (*handle_ptr)(); }); @@ -51,7 +52,7 @@ TEST_CASE("test op_finish_handle - basic usage") { context.init(&ring, &runtime); SetFinishInvoker invoker; - condy::OpFinishHandle handle; + condy::OpFinishHandle handle; handle.set_invoker(&invoker); auto *sqe = ring.get_sqe(); @@ -60,8 +61,8 @@ TEST_CASE("test op_finish_handle - basic usage") { ring.submit(); ring.reap_completions([](io_uring_cqe *cqe) { - auto handle_ptr = - static_cast(io_uring_cqe_get_data(cqe)); + auto handle_ptr = static_cast( + io_uring_cqe_get_data(cqe)); io_uring_cqe mock_cqe = *cqe; mock_cqe.res = 42; handle_ptr->handle_cqe(&mock_cqe); @@ -85,7 +86,7 @@ TEST_CASE("test op_finish_handle - concurrent ops") { auto &context = condy::detail::Context::current(); context.init(&ring, &runtime); - condy::OpFinishHandle handle1, handle2; + condy::OpFinishHandle handle1, handle2; handle1.set_invoker(&invoker); handle2.set_invoker(&invoker); @@ -115,9 +116,10 @@ TEST_CASE("test op_finish_handle - cancel op") { auto &context = condy::detail::Context::current(); context.init(&ring, &runtime); - condy::OpFinishHandle handle1, handle2; - condy::ParallelFinishHandle + condy::OpFinishHandle handle1, handle2; + condy::ParallelFinishHandle, + condy::OpFinishHandle> finish_handle; finish_handle.init(&handle1, &handle2); finish_handle.set_invoker(&invoker); @@ -166,7 +168,9 @@ TEST_CASE("test op_finish_handle - multishot op") { invoker(); }; - condy::MultiShotMixin handle(func); + condy::MultiShotMixin> + handle(func); REQUIRE(!invoker.finished); io_uring_cqe cqe{}; cqe.res = 1; @@ -184,8 +188,8 @@ TEST_CASE("test op_finish_handle - zero copy op") { int res = -1; auto func = [&](int r) { res = r; }; - auto *handle = - new condy::ZeroCopyMixin(func); + auto *handle = new condy::ZeroCopyMixin< + decltype(func), condy::OpFinishHandle>(func); handle->set_invoker(&invoker); REQUIRE(!invoker.finished); io_uring_cqe cqe{}; diff --git a/tests/test_ring.cpp b/tests/test_ring.cpp index 2ee0ee64..33949c12 100644 --- a/tests/test_ring.cpp +++ b/tests/test_ring.cpp @@ -1,3 +1,4 @@ +#include "condy/cqe_handler.hpp" #include "condy/finish_handles.hpp" #include "condy/ring.hpp" #include @@ -23,7 +24,7 @@ TEST_CASE("test ring - register and complete ops") { ring.init(8, ¶ms); constexpr size_t num_ops = 4; - std::vector handles(num_ops); + std::vector> handles(num_ops); for (size_t i = 0; i < num_ops; i++) { auto *sqe = ring.get_sqe(); io_uring_prep_nop(sqe); @@ -36,8 +37,8 @@ TEST_CASE("test ring - register and complete ops") { while (reaped < num_ops) { ring.submit(); reaped += ring.reap_completions([&](io_uring_cqe *cqe) { - OpFinishHandle *handle = - reinterpret_cast(io_uring_cqe_get_data(cqe)); + auto *handle = reinterpret_cast( + io_uring_cqe_get_data(cqe)); REQUIRE(handle != nullptr); handle->handle_cqe(cqe); count++; @@ -65,7 +66,7 @@ TEST_CASE("test ring - cancel ops") { .tv_nsec = 0, }; constexpr size_t num_ops = 8; - std::vector handles(num_ops); + std::vector> handles(num_ops); for (size_t i = 0; i < num_ops; i++) { auto *sqe = ring.get_sqe(); if (i % 2 == 0) { @@ -96,8 +97,8 @@ TEST_CASE("test ring - cancel ops") { condy::encode_work(nullptr, condy::WorkType::Ignore)) { return; } - OpFinishHandle *handle = - reinterpret_cast(io_uring_cqe_get_data(cqe)); + auto *handle = reinterpret_cast( + io_uring_cqe_get_data(cqe)); REQUIRE(handle != nullptr); handle->handle_cqe(cqe); if (cqe->res == -ECANCELED) {