diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 343dbb0..17a10af 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -1,20 +1,21 @@ { - "configurations": [ - { - "name": "Linux", - "includePath": [ - "${workspaceFolder}/**" - ], - "defines": [], - "compilerPath": "/usr/bin/clang++", - "cStandard": "c11", - "cppStandard": "c++20", - "intelliSenseMode": "clang-x64", - "compilerArgs": [ - "-fcoroutines-ts", - "-stdlib=libc++" - ] - } - ], - "version": 4 -} + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${workspaceFolder}/**", + "${workspaceFolder}/include" + ], + "defines": [], + "compilerPath": "/usr/bin/clang++", + "cStandard": "c17", + "cppStandard": "c++20", + "intelliSenseMode": "linux-clang-arm64", + "compilerArgs": [ + "-fcoroutines" + ], + "configurationProvider": "ms-vscode.cmake-tools" + } + ], + "version": 4 +} \ No newline at end of file diff --git a/demo/echo_server.cpp b/demo/echo_server.cpp index a6f9685..d683a16 100644 --- a/demo/echo_server.cpp +++ b/demo/echo_server.cpp @@ -20,8 +20,8 @@ enum { int runningCoroutines = 0; uio::task<> accept_connection(uio::io_service& service, int serverfd) { - while (int clientfd = co_await service.accept(serverfd, nullptr, nullptr)) { - [](uio::io_service& service, int clientfd) -> uio::task<> { + service.accept_multishot(serverfd, nullptr, nullptr).set_callback([&service, serverfd](int clientfd, unsigned flags) -> uio::task<> { + fmt::print("sockfd {} is accepted; number of running coroutines: {}\n", clientfd, ++runningCoroutines); #if USE_SPLICE @@ -63,8 +63,9 @@ uio::task<> accept_connection(uio::io_service& service, int serverfd) { co_await service.close(clientfd); fmt::print("sockfd {} is closed; number of running coroutines: {}\n", clientfd, --runningCoroutines); - }(service, clientfd); - } + }); + + co_return; } int main(int argc, char *argv[]) { diff --git a/include/liburing/io_service.hpp b/include/liburing/io_service.hpp index 416cacf..149f830 100644 --- a/include/liburing/io_service.hpp +++ b/include/liburing/io_service.hpp @@ -430,6 +430,24 @@ class io_service { return await_work(sqe, iflags); } + /** Accept connections on a socket asynchronously + * @see accept4(2) + * @see io_uring_enter(2) IORING_ACCEPT_MULTISHOT + * @param iflags IOSQE_* flags + * @return + */ + sqe_awaitable accept_multishot( + int fd, + sockaddr *addr, + socklen_t *addrlen, + int flags = 0, + uint8_t iflags = 0 + ) noexcept { + auto* sqe = io_uring_get_sqe_safe(); + io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags); + return await_work(sqe, iflags); + } + /** Initiate a connection on a socket asynchronously * @see connect(2) * @see io_uring_enter(2) IORING_OP_CONNECT @@ -721,7 +739,7 @@ class io_service { io_uring_for_each_cqe(&ring, head, cqe) { ++cqe_count; auto coro = static_cast(io_uring_cqe_get_data(cqe)); - if (coro) coro->resolve(cqe->res); + if (coro) coro->resolve(cqe->res, cqe->flags); } printf_if_verbose(__FILE__ ": Found %u cqe(s), looping...\n", cqe_count); diff --git a/include/liburing/sqe_awaitable.hpp b/include/liburing/sqe_awaitable.hpp index 41212a9..4939536 100644 --- a/include/liburing/sqe_awaitable.hpp +++ b/include/liburing/sqe_awaitable.hpp @@ -9,59 +9,42 @@ namespace uio { struct resolver { - virtual void resolve(int result) noexcept = 0; + virtual void resolve(int32_t result, uint32_t flags) noexcept = 0; }; struct resume_resolver final: resolver { friend struct sqe_awaitable; - void resolve(int result) noexcept override { + void resolve(int32_t result, uint32_t flags) noexcept override { this->result = result; + this->flags = flags; handle.resume(); } private: std::coroutine_handle<> handle; - int result = 0; + uint32_t result = 0; + uint32_t flags = 0; }; static_assert(std::is_trivially_destructible_v); -struct deferred_resolver final: resolver { - void resolve(int result) noexcept override { - this->result = result; - } - -#ifndef NDEBUG - ~deferred_resolver() { - assert(!!result && "deferred_resolver is destructed before it's resolved"); - } -#endif - - std::optional result; -}; - struct callback_resolver final: resolver { - callback_resolver(std::function&& cb): cb(std::move(cb)) {} + callback_resolver(std::function&& cb): cb(std::move(cb)) {} - void resolve(int result) noexcept override { - this->cb(result); - delete this; + void resolve(int32_t result, uint32_t flags) noexcept override { + this->cb(result, flags); + if (!(flags & IORING_CQE_F_MORE)) delete this; } private: - std::function cb; + std::function cb; }; struct sqe_awaitable { // TODO: use cancel_token to implement cancellation sqe_awaitable(io_uring_sqe* sqe) noexcept: sqe(sqe) {} - // User MUST keep resolver alive before the operation is finished - void set_deferred(deferred_resolver& resolver) { - io_uring_sqe_set_data(sqe, &resolver); - } - - void set_callback(std::function cb) { + void set_callback(std::function cb) { io_uring_sqe_set_data(sqe, new callback_resolver(std::move(cb))); } @@ -72,14 +55,15 @@ struct sqe_awaitable { await_sqe(io_uring_sqe* sqe): sqe(sqe) {} - constexpr bool await_ready() const noexcept { return false; } + constexpr bool await_ready() const noexcept { return !!sqe; } void await_suspend(std::coroutine_handle<> handle) noexcept { resolver.handle = handle; - io_uring_sqe_set_data(sqe, &resolver); + if (sqe) io_uring_sqe_set_data(sqe, &resolver); + sqe = nullptr; } - constexpr int await_resume() const noexcept { return resolver.result; } + constexpr int await_resume() noexcept { return resolver.result; } }; return await_sqe(sqe);