From 43fbde5fc8d24323faa9eab366e1564c56cc8d49 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Fri, 16 Jun 2023 15:52:02 +0200 Subject: [PATCH 1/9] Implement failing test --- test/exec/test_io_uring_context.cpp | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/test/exec/test_io_uring_context.cpp b/test/exec/test_io_uring_context.cpp index a131647fa..97ba1484e 100644 --- a/test/exec/test_io_uring_context.cpp +++ b/test/exec/test_io_uring_context.cpp @@ -301,14 +301,22 @@ TEST_CASE("io_uring_context schedule_after -1s", "[types][io_uring][schedulers]" scope_guard guard{[&]() noexcept { context.request_stop(); }}; - bool is_called = false; + bool is_called_1 = false; + bool is_called_2 = false; + auto start = std::chrono::steady_clock::now(); sync_wait(when_any( schedule_after(scheduler, -1s) | then([&] { CHECK(io_thread.get_id() == std::this_thread::get_id()); - is_called = true; + is_called_1 = true; }), - schedule_after(scheduler, 5ms))); - CHECK(is_called); + schedule_after(scheduler, 5ms) | then([&] { + is_called_2 = true; + }))); + auto end = std::chrono::steady_clock::now(); + auto diff = end - start; + CHECK(diff < 5ms); + CHECK(is_called_1 == true); + CHECK(is_called_2 == false); } } From f014a1c76d2d4f157bfae5d5d50d6df105ca7c71 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Fri, 16 Jun 2023 16:04:50 +0200 Subject: [PATCH 2/9] Fix the test --- include/exec/linux/io_uring_context.hpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/include/exec/linux/io_uring_context.hpp b/include/exec/linux/io_uring_context.hpp index b9e16456a..292c3cbb9 100644 --- a/include/exec/linux/io_uring_context.hpp +++ b/include/exec/linux/io_uring_context.hpp @@ -638,11 +638,11 @@ namespace exec { static constexpr __task_vtable __vtable{&__ready_, &__submit_, &__complete_}; template - requires stdexec::constructible_from<_Base, std::in_place_t, _Args...> + requires stdexec::constructible_from<_Base, std::in_place_t, __task*, _Args...> __io_task_facade(std::in_place_t, _Args&&... __args) noexcept( - stdexec::__nothrow_constructible_from<_Base, _Args...>) + stdexec::__nothrow_constructible_from<_Base, __task*, _Args...>) : __task{__vtable} - , __base_(std::in_place, (_Args&&) __args...) { + , __base_(std::in_place, static_cast<__task*>(this), (_Args&&) __args...) { } template @@ -732,7 +732,7 @@ namespace exec { } else { __sqe = ::io_uring_sqe{ .opcode = IORING_OP_ASYNC_CANCEL, // - .addr = bit_cast<__u64>(__op_) // + .addr = bit_cast<__u64>(__op_->__parent_) // }; } #else @@ -768,23 +768,27 @@ namespace exec { template struct __impl_base { + __task* __parent_; _Base __base_; template - __impl_base(std::in_place_t, _Args&&... __args) noexcept( + __impl_base(__task* __parent, std::in_place_t, _Args&&... __args) noexcept( stdexec::__nothrow_constructible_from<_Base, _Args...>) - : __base_((_Args&&) __args...) { + : __parent_{__parent} + , __base_((_Args&&) __args...) { } }; template struct __impl_base<_Base, true> { + __task* __parent_; _Base __base_; template - __impl_base(std::in_place_t, _Args&&... __args) noexcept( + __impl_base(__task* __parent, std::in_place_t, _Args&&... __args) noexcept( stdexec::__nothrow_constructible_from<_Base, _Args...>) - : __base_((_Args&&) __args...) { + : __parent_{__parent} + , __base_((_Args&&) __args...) { } void submit_stop(::io_uring_sqe& __sqe) noexcept { @@ -823,9 +827,9 @@ namespace exec { template requires stdexec::constructible_from<_Base, _Args...> - __impl(std::in_place_t, _Args&&... __args) noexcept( + __impl(std::in_place_t, __task* __parent, _Args&&... __args) noexcept( stdexec::__nothrow_constructible_from<_Base, _Args...>) - : __base_t(std::in_place, (_Args&&) __args...) + : __base_t(__parent, std::in_place, (_Args&&) __args...) , __stop_operation_{this} { } From 2e7b62bca97aec1f677be2ef0cb83a343b185d24 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 20 Jun 2023 13:04:20 +0200 Subject: [PATCH 3/9] Give the test more time --- test/exec/test_io_uring_context.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/exec/test_io_uring_context.cpp b/test/exec/test_io_uring_context.cpp index 97ba1484e..954388593 100644 --- a/test/exec/test_io_uring_context.cpp +++ b/test/exec/test_io_uring_context.cpp @@ -309,12 +309,12 @@ TEST_CASE("io_uring_context schedule_after -1s", "[types][io_uring][schedulers]" CHECK(io_thread.get_id() == std::this_thread::get_id()); is_called_1 = true; }), - schedule_after(scheduler, 5ms) | then([&] { + schedule_after(scheduler, 100ms) | then([&] { is_called_2 = true; }))); auto end = std::chrono::steady_clock::now(); auto diff = end - start; - CHECK(diff < 5ms); + CHECK(diff < 100ms); CHECK(is_called_1 == true); CHECK(is_called_2 == false); } From 76788e160db7549e56639256f1ad4b259e1b555a Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Thu, 22 Jun 2023 19:52:12 +0200 Subject: [PATCH 4/9] io_uring_enter only gets the number of new submissions and not total submissions --- include/exec/linux/io_uring_context.hpp | 49 ++++++++++++++----------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/include/exec/linux/io_uring_context.hpp b/include/exec/linux/io_uring_context.hpp index 292c3cbb9..c789aeece 100644 --- a/include/exec/linux/io_uring_context.hpp +++ b/include/exec/linux/io_uring_context.hpp @@ -402,26 +402,28 @@ namespace exec { /// /// This function is not thread-safe and must only be called from the thread that drives the io context. void run_some() noexcept { - __n_submitted_ -= __completion_queue_.complete(); + __n_total_submitted_ -= __completion_queue_.complete(); STDEXEC_ASSERT( - 0 <= __n_submitted_ - && __n_submitted_ <= static_cast(__params_.cq_entries)); - __u32 __max_submissions = __params_.cq_entries - static_cast<__u32>(__n_submitted_); + 0 <= __n_total_submitted_ + && __n_total_submitted_ <= static_cast(__params_.cq_entries)); + __u32 __max_submissions = __params_.cq_entries - static_cast<__u32>(__n_total_submitted_); __pending_.append(__requests_.pop_all()); __submission_result __result = __submission_queue_.submit( (__task_queue&&) __pending_, __max_submissions, __stop_source_->stop_requested()); - __n_submitted_ += __result.__n_submitted; - STDEXEC_ASSERT(__n_submitted_ <= static_cast(__params_.cq_entries)); + __n_total_submitted_ += __result.__n_submitted; + __n_newly_submitted_ += __result.__n_submitted; + STDEXEC_ASSERT(__n_total_submitted_ <= static_cast(__params_.cq_entries)); __pending_ = (__task_queue&&) __result.__pending; while (!__result.__ready.empty()) { - __n_submitted_ -= __completion_queue_.complete((__task_queue&&) __result.__ready); - STDEXEC_ASSERT(0 <= __n_submitted_); + __n_total_submitted_ -= __completion_queue_.complete((__task_queue&&) __result.__ready); + STDEXEC_ASSERT(0 <= __n_total_submitted_); __pending_.append(__requests_.pop_all()); - __max_submissions = __params_.cq_entries - static_cast<__u32>(__n_submitted_); + __max_submissions = __params_.cq_entries - static_cast<__u32>(__n_total_submitted_); __result = __submission_queue_.submit( (__task_queue&&) __pending_, __max_submissions, __stop_source_->stop_requested()); - __n_submitted_ += __result.__n_submitted; - STDEXEC_ASSERT(__n_submitted_ <= static_cast(__params_.cq_entries)); + __n_total_submitted_ += __result.__n_submitted; + __n_newly_submitted_ += __result.__n_submitted; + STDEXEC_ASSERT(__n_total_submitted_ <= static_cast(__params_.cq_entries)); __pending_ = (__task_queue&&) __result.__pending; } } @@ -446,28 +448,30 @@ namespace exec { __is_running_.store(false, std::memory_order_relaxed); }}; __pending_.append(__requests_.pop_all()); - while (__n_submitted_ > 0 || !__pending_.empty()) { + while (__n_total_submitted_ > 0 || !__pending_.empty()) { run_some(); if ( - __n_submitted_ == 0 - || (__n_submitted_ == 1 && __break_loop_.load(std::memory_order_acquire))) { + __n_total_submitted_ == 0 + || (__n_total_submitted_ == 1 && __break_loop_.load(std::memory_order_acquire))) { __break_loop_.store(false, std::memory_order_relaxed); break; } constexpr int __min_complete = 1; STDEXEC_ASSERT( - 0 <= __n_submitted_ - && __n_submitted_ <= static_cast(__params_.cq_entries)); + 0 <= __n_total_submitted_ + && __n_total_submitted_ <= static_cast(__params_.cq_entries)); int rc = __io_uring_enter( - __ring_fd_, __n_submitted_, __min_complete, IORING_ENTER_GETEVENTS); + __ring_fd_, __n_newly_submitted_, __min_complete, IORING_ENTER_GETEVENTS); __throw_error_code_if(rc < 0, -rc); - __n_submitted_ -= __completion_queue_.complete(); - STDEXEC_ASSERT(0 <= __n_submitted_); + STDEXEC_ASSERT(rc <= __n_newly_submitted_); + __n_newly_submitted_ -= rc; + __n_total_submitted_ -= __completion_queue_.complete(); + STDEXEC_ASSERT(0 <= __n_total_submitted_); __pending_.append(__requests_.pop_all()); } - STDEXEC_ASSERT(__n_submitted_ <= 1); + STDEXEC_ASSERT(__n_total_submitted_ <= 1); if (__stop_source_->stop_requested() && __pending_.empty()) { - STDEXEC_ASSERT(__n_submitted_ == 0); + STDEXEC_ASSERT(__n_total_submitted_ == 0); // try to shutdown the request queue int __n_in_flight_expected = 0; while (!__n_submissions_in_flight_.compare_exchange_weak( @@ -581,7 +585,8 @@ namespace exec { std::atomic __is_running_{false}; std::atomic __n_submissions_in_flight_{0}; std::atomic __break_loop_{false}; - std::ptrdiff_t __n_submitted_{0}; + std::ptrdiff_t __n_total_submitted_{0}; + std::ptrdiff_t __n_newly_submitted_{0}; std::optional __stop_source_{std::in_place}; __completion_queue __completion_queue_; __submission_queue __submission_queue_; From 0f75bda9266ed2f39465c0607069de8c4ff3d2ae Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Fri, 23 Jun 2023 15:01:38 +0200 Subject: [PATCH 5/9] print timeout for error message --- test/exec/test_io_uring_context.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/exec/test_io_uring_context.cpp b/test/exec/test_io_uring_context.cpp index 954388593..7beed9505 100644 --- a/test/exec/test_io_uring_context.cpp +++ b/test/exec/test_io_uring_context.cpp @@ -304,17 +304,18 @@ TEST_CASE("io_uring_context schedule_after -1s", "[types][io_uring][schedulers]" bool is_called_1 = false; bool is_called_2 = false; auto start = std::chrono::steady_clock::now(); + auto timeout = 100ms; sync_wait(when_any( schedule_after(scheduler, -1s) | then([&] { CHECK(io_thread.get_id() == std::this_thread::get_id()); is_called_1 = true; }), - schedule_after(scheduler, 100ms) | then([&] { + schedule_after(scheduler, timeout) | then([&] { is_called_2 = true; }))); auto end = std::chrono::steady_clock::now(); - auto diff = end - start; - CHECK(diff < 100ms); + std::chrono::nanoseconds diff = end - start; + CHECK(diff.count() < std::chrono::duration_cast(timeout).count()); CHECK(is_called_1 == true); CHECK(is_called_2 == false); } From 8e6099014884a3951bff4f34ee980215aacc42ed Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 27 Jun 2023 05:53:53 +0200 Subject: [PATCH 6/9] Probe at io_uring construction whether IORING_OP_ASYNC_CANCEL is supported. --- include/exec/linux/io_uring_context.hpp | 202 +++++++++++++++++------- 1 file changed, 147 insertions(+), 55 deletions(-) diff --git a/include/exec/linux/io_uring_context.hpp b/include/exec/linux/io_uring_context.hpp index c789aeece..7b90e882d 100644 --- a/include/exec/linux/io_uring_context.hpp +++ b/include/exec/linux/io_uring_context.hpp @@ -40,10 +40,10 @@ #if LINUX_VERSION_CODE < KERNEL_VERSION(5, 5, 0) #warning "Your kernel is too old to support io_uring with cancellation support." -#include #else #define STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION #endif +#include #if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0) #define STDEXEC_HAS_IORING_OP_READ @@ -327,8 +327,36 @@ namespace exec { void start() noexcept; }; +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + struct __probe_cancelation : __task { + __context* __context_ = nullptr; + + static bool __ready_(__task*) noexcept { + return false; + } + + static void __submit_(__task* __pointer, ::io_uring_sqe& __entry) noexcept { + // __probe_cancelation& __self = *static_cast<__probe_cancelation*>(__pointer); + __entry = ::io_uring_sqe{}; + __entry.addr = bit_cast<__u64>(__pointer); + __entry.opcode = IORING_OP_ASYNC_CANCEL; + } + + static void __complete_(__task* __pointer, const ::io_uring_cqe& __cqe) noexcept; + + void start() noexcept; + + static constexpr __task_vtable __vtable{&__ready_, &__submit_, &__complete_}; + + __probe_cancelation(__context* __ctx) + : __task{__vtable} + , __context_{__ctx} { + } + }; +#endif + class __scheduler; - + enum class until { stopped, empty @@ -340,8 +368,16 @@ namespace exec { : __context_base(std::max(__entries, 2u), __flags) , __completion_queue_{__completion_queue_region_ ? __completion_queue_region_ : __submission_queue_region_, __params_} , __submission_queue_{__submission_queue_region_, __submission_queue_entries_, __params_} - , __wakeup_operation_{this, __eventfd_} { + , __wakeup_operation_{this, __eventfd_} +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + , __probe_cancelation_operation_{this} +#endif + { __wakeup_operation_.start(); +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + __probe_cancelation_operation_.start(); + run_until_empty(); +#endif } void wakeup() { @@ -366,6 +402,14 @@ namespace exec { return __is_running_.load(std::memory_order_relaxed); } + bool has_async_cancelation() const noexcept { +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + return __has_async_cancelation_; +#else + return false; +#endif + } + /// @brief Breaks out of the run loop of the io context without stopping the context. void finish() { __break_loop_.store(true, std::memory_order_release); @@ -593,8 +637,27 @@ namespace exec { __task_queue __pending_{}; __atomic_task_queue __requests_{}; __wakeup_operation __wakeup_operation_; +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + friend struct __probe_cancelation; + __probe_cancelation __probe_cancelation_operation_; + bool __has_async_cancelation_ = false; +#endif }; +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + inline void + __probe_cancelation::__complete_(__task* __pointer, const ::io_uring_cqe& __cqe) noexcept { + __probe_cancelation& __self = *static_cast<__probe_cancelation*>(__pointer); + if (__cqe.res != -EINVAL) { + __self.__context_->__has_async_cancelation_ = true; + } + } + + inline void __probe_cancelation::start() noexcept { + __context_->submit(this); + } +#endif + inline void __wakeup_operation::start() noexcept { if (!__context_->__stop_source_->stop_requested()) { __context_->__pending_.push_front(this); @@ -733,11 +796,19 @@ namespace exec { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION if constexpr ( requires(_Base* __op, ::io_uring_sqe& __sqe) { __op->submit_stop(__sqe); }) { - __op_->submit_stop(__sqe); + if (__op_->context().has_async_cancelation()) { + __sqe = ::io_uring_sqe{ + .opcode = IORING_OP_ASYNC_CANCEL, // + .addr = bit_cast<__u64>(__op_->__parent_) // + }; + } else { + __op_->submit_stop(__sqe); + } } else { + STDEXEC_ASSERT(__op_->context().has_async_cancelation()); __sqe = ::io_uring_sqe{ - .opcode = IORING_OP_ASYNC_CANCEL, // - .addr = bit_cast<__u64>(__op_->__parent_) // + .opcode = IORING_OP_ASYNC_CANCEL, // + .addr = bit_cast<__u64>(__op_->__parent_) // }; } #else @@ -909,24 +980,7 @@ namespace exec { struct __schedule_after_operation { using _Receiver = stdexec::__t<_ReceiverId>; - class __impl : public __stoppable_op_base<_Receiver> { -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - struct __kernel_timespec { - __s64 __tv_sec; - __s64 __tv_nsec; - }; - - __kernel_timespec __duration_; - - static constexpr __kernel_timespec - __duration_to_timespec(std::chrono::nanoseconds dur) noexcept { - auto secs = std::chrono::duration_cast(dur); - dur -= secs; - secs = std::max(secs, std::chrono::seconds{0}); - dur = std::clamp(dur, std::chrono::nanoseconds{0}, std::chrono::nanoseconds{999'999'999}); - return __kernel_timespec{secs.count(), dur.count()}; - } -#else + struct __timerfd_impl { safe_file_descriptor __timerfd_; ::itimerspec __duration_; std::uint64_t __n_expirations_{0}; @@ -947,43 +1001,89 @@ namespace exec { 0 <= __timerspec.it_value.tv_nsec && __timerspec.it_value.tv_nsec < 1'000'000'000); return __timerspec; } -#endif + + __timerfd_impl(std::chrono::nanoseconds __duration) + : __timerfd_{::timerfd_create(CLOCK_REALTIME, 0)} + , __duration_{__duration_to_timespec(__duration)} { + int __rc = ::timerfd_settime( + __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); + __throw_error_code_if(__rc < 0, errno); + } + }; + + struct __async_cancel_impl { + struct __kernel_timespec { + __s64 __tv_sec; + __s64 __tv_nsec; + }; + + __kernel_timespec __duration_; + + static constexpr __kernel_timespec + __duration_to_timespec(std::chrono::nanoseconds dur) noexcept { + auto secs = std::chrono::duration_cast(dur); + dur -= secs; + secs = std::max(secs, std::chrono::seconds{0}); + dur = std::clamp(dur, std::chrono::nanoseconds{0}, std::chrono::nanoseconds{999'999'999}); + return __kernel_timespec{secs.count(), dur.count()}; + } + + __async_cancel_impl(std::chrono::nanoseconds __duration) + : __duration_{__duration_to_timespec(__duration)} { + } + }; + + class __impl : public __stoppable_op_base<_Receiver> { + std::variant<__timerfd_impl, __async_cancel_impl> __impl_; public: static constexpr std::false_type ready() noexcept { return {}; } -#ifndef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION void submit_stop(::io_uring_sqe& __sqe) noexcept { - __duration_.it_value.tv_sec = 1; - __duration_.it_value.tv_nsec = 0; + __timerfd_impl* __impl = std::get_if<0>(&__impl_); + STDEXEC_ASSERT(__impl != nullptr); + __impl->__duration_.it_value.tv_sec = 1; + __impl->__duration_.it_value.tv_nsec = 0; ::timerfd_settime( - __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); + __impl->__timerfd_, + TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, + &__impl->__duration_, + nullptr); __sqe = ::io_uring_sqe{.opcode = IORING_OP_NOP}; } -#endif void submit(::io_uring_sqe& __sqe) noexcept { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - ::io_uring_sqe __sqe_{}; - __sqe_.opcode = IORING_OP_TIMEOUT; - __sqe_.addr = bit_cast<__u64>(&__duration_); - __sqe_.len = 1; - __sqe = __sqe_; -#else - ::io_uring_sqe __sqe_{}; - __sqe_.opcode = IORING_OP_READV; - __sqe_.fd = __timerfd_; - __sqe_.addr = bit_cast<__u64>(&__iov_); - __sqe_.len = 1; - __sqe = __sqe_; + if (this->context().has_async_cancelation()) { + __async_cancel_impl* __impl = std::get_if<1>(&__impl_); + STDEXEC_ASSERT(__impl != nullptr); + ::io_uring_sqe __sqe_{}; + __sqe_.opcode = IORING_OP_TIMEOUT; + __sqe_.addr = bit_cast<__u64>(&__impl->__duration_); + __sqe_.len = 1; + __sqe = __sqe_; + } else { +#endif + __timerfd_impl* __impl = std::get_if<0>(&__impl_); + STDEXEC_ASSERT(__impl != nullptr); + ::io_uring_sqe __sqe_{}; + __sqe_.opcode = IORING_OP_READV; + __sqe_.fd = __impl->__timerfd_; + __sqe_.addr = bit_cast<__u64>(&__impl->__iov_); + __sqe_.len = 1; + __sqe = __sqe_; +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + } #endif } void complete(const ::io_uring_cqe& __cqe) noexcept { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - if (__cqe.res == -ETIME || __cqe.res == 0) { + if ( + (this->context().has_async_cancelation() && (__cqe.res == -ETIME || __cqe.res == 0)) + || __cqe.res == sizeof(std::uint64_t)) { #else if (__cqe.res == sizeof(std::uint64_t)) { #endif @@ -998,18 +1098,10 @@ namespace exec { __impl(__context& __context, std::chrono::nanoseconds __duration, _Receiver&& __receiver) : __stoppable_op_base<_Receiver>{__context, (_Receiver&&) __receiver} -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - , __duration_{__duration_to_timespec(__duration)} -#else - , __timerfd_{::timerfd_create(CLOCK_REALTIME, 0)} - , __duration_{__duration_to_timespec(__duration)} -#endif - { -#ifndef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - int __rc = ::timerfd_settime( - __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); - __throw_error_code_if(__rc < 0, errno); -#endif + , __impl_(__async_cancel_impl{__duration}) { + if (!__context.has_async_cancelation()) { + __impl_ = __timerfd_impl{__duration}; + } } }; From a44974fa192ae6cfcac0de67d98e5a32c21474ae Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 27 Jun 2023 09:51:20 +0200 Subject: [PATCH 7/9] Do not use IORING_OP_READ and do not reassign std::variant --- include/exec/linux/io_uring_context.hpp | 27 +++++++++---------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/include/exec/linux/io_uring_context.hpp b/include/exec/linux/io_uring_context.hpp index 7b90e882d..7f364f3ac 100644 --- a/include/exec/linux/io_uring_context.hpp +++ b/include/exec/linux/io_uring_context.hpp @@ -45,10 +45,6 @@ #endif #include -#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0) -#define STDEXEC_HAS_IORING_OP_READ -#endif - #include #include #include @@ -286,12 +282,8 @@ namespace exec { struct __wakeup_operation : __task { __context* __context_ = nullptr; int __eventfd_ = -1; -#ifdef STDEXEC_HAS_IORING_OP_READ - std::uint64_t __buffer_ = 0; -#else std::uint64_t __value_ = 0; ::iovec __buffer_ = {.iov_base = &__value_, .iov_len = sizeof(__value_)}; -#endif static bool __ready_(__task*) noexcept { return false; @@ -302,13 +294,8 @@ namespace exec { __entry = ::io_uring_sqe{}; __entry.fd = __self.__eventfd_; __entry.addr = bit_cast<__u64>(&__self.__buffer_); -#ifdef STDEXEC_HAS_IORING_OP_READ - __entry.opcode = IORING_OP_READ; - __entry.len = sizeof(__self.__buffer_); -#else __entry.opcode = IORING_OP_READV; __entry.len = 1; -#endif } static void __complete_(__task* __pointer, const ::io_uring_cqe& __entry) noexcept { @@ -1036,6 +1023,15 @@ namespace exec { class __impl : public __stoppable_op_base<_Receiver> { std::variant<__timerfd_impl, __async_cancel_impl> __impl_; + static std::variant<__timerfd_impl, __async_cancel_impl> + __make_impl(__context& __ctx, std::chrono::nanoseconds __duration) noexcept { + if (__ctx.has_async_cancelation()) { + return __async_cancel_impl{__duration}; + } else { + return __timerfd_impl{__duration}; + } + } + public: static constexpr std::false_type ready() noexcept { return {}; @@ -1098,10 +1094,7 @@ namespace exec { __impl(__context& __context, std::chrono::nanoseconds __duration, _Receiver&& __receiver) : __stoppable_op_base<_Receiver>{__context, (_Receiver&&) __receiver} - , __impl_(__async_cancel_impl{__duration}) { - if (!__context.has_async_cancelation()) { - __impl_ = __timerfd_impl{__duration}; - } + , __impl_(__make_impl(__context, __duration)) { } }; From b61b96f7cedec34c41bdb40b8953b759d687ad69 Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Wed, 28 Jun 2023 05:35:54 +0200 Subject: [PATCH 8/9] Revert runtime detection disable tests on GPU --- .github/workflows/ci.gpu.yml | 1 + CMakeLists.txt | 2 + include/exec/linux/io_uring_context.hpp | 215 +++++++----------------- 3 files changed, 68 insertions(+), 150 deletions(-) diff --git a/.github/workflows/ci.gpu.yml b/.github/workflows/ci.gpu.yml index 2ad31d8ca..d6312c6c8 100644 --- a/.github/workflows/ci.gpu.yml +++ b/.github/workflows/ci.gpu.yml @@ -45,6 +45,7 @@ jobs: cd /workspaces/stdexec; # Configure cmake -S . -B build -GNinja \ + -DSTDEXEC_ENABLE_IO_URING_TESTS=OFF \ -DSTDEXEC_ENABLE_CUDA=ON \ -DCMAKE_CXX_COMPILER="$cxx" \ -DCMAKE_CUDA_COMPILER="$cxx" \ diff --git a/CMakeLists.txt b/CMakeLists.txt index da632896f..11d98fde0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -276,6 +276,8 @@ if (STDEXEC_ENABLE_TBB) ) endif () +option (STDEXEC_ENABLE_IO_URING_TESTS "Enable io_uring tests" ON) + option(STDEXEC_BUILD_EXAMPLES "Build stdexec examples" ON) option(STDEXEC_BUILD_TESTS "Build stdexec tests" ON) option(BUILD_TESTING "" ${STDEXEC_BUILD_TESTS}) diff --git a/include/exec/linux/io_uring_context.hpp b/include/exec/linux/io_uring_context.hpp index 7f364f3ac..b056c9683 100644 --- a/include/exec/linux/io_uring_context.hpp +++ b/include/exec/linux/io_uring_context.hpp @@ -40,10 +40,14 @@ #if LINUX_VERSION_CODE < KERNEL_VERSION(5, 5, 0) #warning "Your kernel is too old to support io_uring with cancellation support." +#include #else #define STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION #endif -#include + +#if LINUX_VERSION_CODE >= KERNEL_VERSION(5, 6, 0) +#define STDEXEC_HAS_IORING_OP_READ +#endif #include #include @@ -282,8 +286,12 @@ namespace exec { struct __wakeup_operation : __task { __context* __context_ = nullptr; int __eventfd_ = -1; +#ifdef STDEXEC_HAS_IORING_OP_READ + std::uint64_t __buffer_ = 0; +#else std::uint64_t __value_ = 0; ::iovec __buffer_ = {.iov_base = &__value_, .iov_len = sizeof(__value_)}; +#endif static bool __ready_(__task*) noexcept { return false; @@ -294,8 +302,13 @@ namespace exec { __entry = ::io_uring_sqe{}; __entry.fd = __self.__eventfd_; __entry.addr = bit_cast<__u64>(&__self.__buffer_); +#ifdef STDEXEC_HAS_IORING_OP_READ + __entry.opcode = IORING_OP_READ; + __entry.len = sizeof(__self.__buffer_); +#else __entry.opcode = IORING_OP_READV; __entry.len = 1; +#endif } static void __complete_(__task* __pointer, const ::io_uring_cqe& __entry) noexcept { @@ -314,34 +327,6 @@ namespace exec { void start() noexcept; }; -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - struct __probe_cancelation : __task { - __context* __context_ = nullptr; - - static bool __ready_(__task*) noexcept { - return false; - } - - static void __submit_(__task* __pointer, ::io_uring_sqe& __entry) noexcept { - // __probe_cancelation& __self = *static_cast<__probe_cancelation*>(__pointer); - __entry = ::io_uring_sqe{}; - __entry.addr = bit_cast<__u64>(__pointer); - __entry.opcode = IORING_OP_ASYNC_CANCEL; - } - - static void __complete_(__task* __pointer, const ::io_uring_cqe& __cqe) noexcept; - - void start() noexcept; - - static constexpr __task_vtable __vtable{&__ready_, &__submit_, &__complete_}; - - __probe_cancelation(__context* __ctx) - : __task{__vtable} - , __context_{__ctx} { - } - }; -#endif - class __scheduler; enum class until { @@ -355,16 +340,8 @@ namespace exec { : __context_base(std::max(__entries, 2u), __flags) , __completion_queue_{__completion_queue_region_ ? __completion_queue_region_ : __submission_queue_region_, __params_} , __submission_queue_{__submission_queue_region_, __submission_queue_entries_, __params_} - , __wakeup_operation_{this, __eventfd_} -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - , __probe_cancelation_operation_{this} -#endif - { + , __wakeup_operation_{this, __eventfd_} { __wakeup_operation_.start(); -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - __probe_cancelation_operation_.start(); - run_until_empty(); -#endif } void wakeup() { @@ -389,14 +366,6 @@ namespace exec { return __is_running_.load(std::memory_order_relaxed); } - bool has_async_cancelation() const noexcept { -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - return __has_async_cancelation_; -#else - return false; -#endif - } - /// @brief Breaks out of the run loop of the io context without stopping the context. void finish() { __break_loop_.store(true, std::memory_order_release); @@ -624,27 +593,8 @@ namespace exec { __task_queue __pending_{}; __atomic_task_queue __requests_{}; __wakeup_operation __wakeup_operation_; -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - friend struct __probe_cancelation; - __probe_cancelation __probe_cancelation_operation_; - bool __has_async_cancelation_ = false; -#endif }; -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - inline void - __probe_cancelation::__complete_(__task* __pointer, const ::io_uring_cqe& __cqe) noexcept { - __probe_cancelation& __self = *static_cast<__probe_cancelation*>(__pointer); - if (__cqe.res != -EINVAL) { - __self.__context_->__has_async_cancelation_ = true; - } - } - - inline void __probe_cancelation::start() noexcept { - __context_->submit(this); - } -#endif - inline void __wakeup_operation::start() noexcept { if (!__context_->__stop_source_->stop_requested()) { __context_->__pending_.push_front(this); @@ -783,16 +733,8 @@ namespace exec { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION if constexpr ( requires(_Base* __op, ::io_uring_sqe& __sqe) { __op->submit_stop(__sqe); }) { - if (__op_->context().has_async_cancelation()) { - __sqe = ::io_uring_sqe{ - .opcode = IORING_OP_ASYNC_CANCEL, // - .addr = bit_cast<__u64>(__op_->__parent_) // - }; - } else { - __op_->submit_stop(__sqe); - } + __op_->submit_stop(__sqe); } else { - STDEXEC_ASSERT(__op_->context().has_async_cancelation()); __sqe = ::io_uring_sqe{ .opcode = IORING_OP_ASYNC_CANCEL, // .addr = bit_cast<__u64>(__op_->__parent_) // @@ -967,7 +909,24 @@ namespace exec { struct __schedule_after_operation { using _Receiver = stdexec::__t<_ReceiverId>; - struct __timerfd_impl { + class __impl : public __stoppable_op_base<_Receiver> { +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + struct __kernel_timespec { + __s64 __tv_sec; + __s64 __tv_nsec; + }; + + __kernel_timespec __duration_; + + static constexpr __kernel_timespec + __duration_to_timespec(std::chrono::nanoseconds dur) noexcept { + auto secs = std::chrono::duration_cast(dur); + dur -= secs; + secs = std::max(secs, std::chrono::seconds{0}); + dur = std::clamp(dur, std::chrono::nanoseconds{0}, std::chrono::nanoseconds{999'999'999}); + return __kernel_timespec{secs.count(), dur.count()}; + } +#else safe_file_descriptor __timerfd_; ::itimerspec __duration_; std::uint64_t __n_expirations_{0}; @@ -988,98 +947,43 @@ namespace exec { 0 <= __timerspec.it_value.tv_nsec && __timerspec.it_value.tv_nsec < 1'000'000'000); return __timerspec; } - - __timerfd_impl(std::chrono::nanoseconds __duration) - : __timerfd_{::timerfd_create(CLOCK_REALTIME, 0)} - , __duration_{__duration_to_timespec(__duration)} { - int __rc = ::timerfd_settime( - __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); - __throw_error_code_if(__rc < 0, errno); - } - }; - - struct __async_cancel_impl { - struct __kernel_timespec { - __s64 __tv_sec; - __s64 __tv_nsec; - }; - - __kernel_timespec __duration_; - - static constexpr __kernel_timespec - __duration_to_timespec(std::chrono::nanoseconds dur) noexcept { - auto secs = std::chrono::duration_cast(dur); - dur -= secs; - secs = std::max(secs, std::chrono::seconds{0}); - dur = std::clamp(dur, std::chrono::nanoseconds{0}, std::chrono::nanoseconds{999'999'999}); - return __kernel_timespec{secs.count(), dur.count()}; - } - - __async_cancel_impl(std::chrono::nanoseconds __duration) - : __duration_{__duration_to_timespec(__duration)} { - } - }; - - class __impl : public __stoppable_op_base<_Receiver> { - std::variant<__timerfd_impl, __async_cancel_impl> __impl_; - - static std::variant<__timerfd_impl, __async_cancel_impl> - __make_impl(__context& __ctx, std::chrono::nanoseconds __duration) noexcept { - if (__ctx.has_async_cancelation()) { - return __async_cancel_impl{__duration}; - } else { - return __timerfd_impl{__duration}; - } - } +#endif public: static constexpr std::false_type ready() noexcept { return {}; } +#ifndef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION void submit_stop(::io_uring_sqe& __sqe) noexcept { - __timerfd_impl* __impl = std::get_if<0>(&__impl_); - STDEXEC_ASSERT(__impl != nullptr); - __impl->__duration_.it_value.tv_sec = 1; - __impl->__duration_.it_value.tv_nsec = 0; + __duration_.it_value.tv_sec = 1; + __duration_.it_value.tv_nsec = 0; ::timerfd_settime( - __impl->__timerfd_, - TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, - &__impl->__duration_, - nullptr); + __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); __sqe = ::io_uring_sqe{.opcode = IORING_OP_NOP}; } +#endif void submit(::io_uring_sqe& __sqe) noexcept { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - if (this->context().has_async_cancelation()) { - __async_cancel_impl* __impl = std::get_if<1>(&__impl_); - STDEXEC_ASSERT(__impl != nullptr); - ::io_uring_sqe __sqe_{}; - __sqe_.opcode = IORING_OP_TIMEOUT; - __sqe_.addr = bit_cast<__u64>(&__impl->__duration_); - __sqe_.len = 1; - __sqe = __sqe_; - } else { -#endif - __timerfd_impl* __impl = std::get_if<0>(&__impl_); - STDEXEC_ASSERT(__impl != nullptr); - ::io_uring_sqe __sqe_{}; - __sqe_.opcode = IORING_OP_READV; - __sqe_.fd = __impl->__timerfd_; - __sqe_.addr = bit_cast<__u64>(&__impl->__iov_); - __sqe_.len = 1; - __sqe = __sqe_; -#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - } + ::io_uring_sqe __sqe_{}; + __sqe_.opcode = IORING_OP_TIMEOUT; + __sqe_.addr = bit_cast<__u64>(&__duration_); + __sqe_.len = 1; + __sqe = __sqe_; +#else + ::io_uring_sqe __sqe_{}; + __sqe_.opcode = IORING_OP_READV; + __sqe_.fd = __timerfd_; + __sqe_.addr = bit_cast<__u64>(&__iov_); + __sqe_.len = 1; + __sqe = __sqe_; #endif } void complete(const ::io_uring_cqe& __cqe) noexcept { #ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION - if ( - (this->context().has_async_cancelation() && (__cqe.res == -ETIME || __cqe.res == 0)) - || __cqe.res == sizeof(std::uint64_t)) { + if (__cqe.res == -ETIME || __cqe.res == 0) { #else if (__cqe.res == sizeof(std::uint64_t)) { #endif @@ -1094,7 +998,18 @@ namespace exec { __impl(__context& __context, std::chrono::nanoseconds __duration, _Receiver&& __receiver) : __stoppable_op_base<_Receiver>{__context, (_Receiver&&) __receiver} - , __impl_(__make_impl(__context, __duration)) { +#ifdef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + , __duration_{__duration_to_timespec(__duration)} +#else + , __timerfd_{::timerfd_create(CLOCK_REALTIME, 0)} + , __duration_{__duration_to_timespec(__duration)} +#endif + { +#ifndef STDEXEC_HAS_IO_URING_ASYNC_CANCELLATION + int __rc = ::timerfd_settime( + __timerfd_, TFD_TIMER_ABSTIME | TFD_TIMER_CANCEL_ON_SET, &__duration_, nullptr); + __throw_error_code_if(__rc < 0, errno); +#endif } }; From dcd96cf3ca2df6a3141d4a155aa99400d9d806aa Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Wed, 28 Jun 2023 08:33:27 +0200 Subject: [PATCH 9/9] Don't compile the test if the option is disabled --- test/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d3821cc21..b2d2eeb37 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -78,7 +78,7 @@ set(stdexec_test_sources exec/test_when_any.cpp exec/test_at_coroutine_exit.cpp exec/test_materialize.cpp - exec/test_io_uring_context.cpp + $<$:exec/test_io_uring_context.cpp> exec/test_trampoline_scheduler.cpp exec/test_sequence_senders.cpp exec/sequence/test_empty_sequence.cpp