diff --git a/include/exec/task.hpp b/include/exec/task.hpp index e0a3f4af3..e94943203 100644 --- a/include/exec/task.hpp +++ b/include/exec/task.hpp @@ -672,8 +672,7 @@ namespace experimental::execution if constexpr (requires { __coro_.promise().stop_requested() ? 0 : 1; }) { if (__coro_.promise().stop_requested()) - return STDEXEC::__coroutine_destroy_and_continue( - __parent.promise().unhandled_stopped()); + return __parent.unhandled_stopped(); } return __coro_; } @@ -681,7 +680,9 @@ namespace experimental::execution constexpr auto await_resume() -> _Ty { __context_.reset(); - scope_guard __on_exit{[this]() noexcept { std::exchange(__coro_, {}).destroy(); }}; + scope_guard __on_exit{ + [this]() noexcept + { STDEXEC::__coroutine_destroy_nothrow(std::exchange(__coro_, {})); }}; if (__coro_.promise().__data_.index() == 1) std::rethrow_exception(std::move(__var::__get<1>(__coro_.promise().__data_))); diff --git a/include/stdexec/__detail/__as_awaitable.hpp b/include/stdexec/__detail/__as_awaitable.hpp index e489263c6..d82e140a2 100644 --- a/include/stdexec/__detail/__as_awaitable.hpp +++ b/include/stdexec/__detail/__as_awaitable.hpp @@ -135,7 +135,7 @@ namespace STDEXEC // as normal. if (__result_.__is_valueless()) { - return STDEXEC::__coroutine_destroy_and_continue(__continuation_.unhandled_stopped()); + return __continuation_.unhandled_stopped(); } else { @@ -317,7 +317,6 @@ namespace STDEXEC STDEXEC_CONSTEXPR_CXX23 auto await_suspend([[maybe_unused]] __std::coroutine_handle<> __continuation) noexcept - -> __std::coroutine_handle<> { STDEXEC_ASSERT(this->__continuation_.handle() == __continuation); @@ -329,15 +328,18 @@ namespace STDEXEC // proceed to resume(). // - If T2 hasn't run yet, it will see {} from its load in __done() and // skip the spin entirely - std::thread::id const __old_id = this->__thread_id_.exchange(std::thread::id{}, - __std::memory_order_release); - if (__old_id == std::thread::id{}) - { - // The receiver already cleared __thread_id_, so it completed on the same - // thread. Resume the continuation directly. - return this->__get_continuation(); - } - return __std::noop_coroutine(); + bool const __done = // + this->__thread_id_.exchange(std::thread::id{}, __std::memory_order_release) + == std::thread::id{}; + + // If the receiver already cleared __thread_id_, it completed on the same thread. + // Resume the continuation directly. +# if !defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND) + return __done ? this->__get_continuation() : __std::noop_coroutine(); +# else + if (__done) + STDEXEC::__coroutine_resume_nothrow(this->__get_continuation()); +# endif } private: diff --git a/include/stdexec/__detail/__config.hpp b/include/stdexec/__detail/__config.hpp index ad3643cad..908d12655 100644 --- a/include/stdexec/__detail/__config.hpp +++ b/include/stdexec/__detail/__config.hpp @@ -379,6 +379,8 @@ namespace STDEXEC::__std #if STDEXEC_MSVC() && !STDEXEC_CLANG_CL() && STDEXEC_MSVC_VERSION >= 1950 # define STDEXEC_ATTR_WHICH_10(_ATTR) [[msvc::musttail]] +#elif STDEXEC_HAS_CPP_ATTRIBUTE(clang::musttail) +# define STDEXEC_ATTR_WHICH_10(_ATTR) [[clang::musttail]] #elif STDEXEC_HAS_CPP_ATTRIBUTE(gnu::musttail) # define STDEXEC_ATTR_WHICH_10(_ATTR) [[gnu::musttail]] #else diff --git a/include/stdexec/coroutine.hpp b/include/stdexec/coroutine.hpp index 06d9eda32..54706cd3c 100644 --- a/include/stdexec/coroutine.hpp +++ b/include/stdexec/coroutine.hpp @@ -24,6 +24,10 @@ #if !STDEXEC_NO_STDCPP_COROUTINES() +# if STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950 +# define STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND +# endif + namespace STDEXEC { template _Up> @@ -179,7 +183,7 @@ namespace STDEXEC sizeof(__synthetic_coro_frame)); } // namespace __detail -# if STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950 +# if defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND) // MSVCBUG https://developercommunity.visualstudio.com/t/destroy-coroutine-from-final_suspend-r/10096047 // Prior to Visual Studio 17.9 (Feb, 2024), aka MSVC 19.39, MSVC incorrectly allocates @@ -189,8 +193,8 @@ namespace STDEXEC // implementation when NRVO is in play. // This workaround delays the destruction of the suspended coroutine by wrapping the - // continuation in another "synthetic" coroutine the resumes the continuation and *then* - // destroys the suspended coroutine. + // continuation in another "synthetic" coroutine that resumes the continuation and + // *then* destroys the suspended coroutine. // The wrapping coroutine frame is thread-local and reused within the thread for each // destroy-and-continue sequence. @@ -201,10 +205,10 @@ namespace STDEXEC { // Make a local copy of the promise to ensure we can safely destroy the suspended // coroutine after resuming the continuation. - auto __promise = static_cast<__destroy_and_continue_frame*>(__address)->__promise_; - STDEXEC::__coroutine_resume_nothrow(__promise.__continue_); - STDEXEC_ATTRIBUTE(musttail) - return STDEXEC::__coroutine_destroy_nothrow(__promise.__destroy_.address()); + auto& __self = *static_cast<__destroy_and_continue_frame*>(__address); + auto __destroy = __self.__promise_.__destroy_; + STDEXEC::__coroutine_resume_nothrow(__self.__promise_.__continue_); + STDEXEC::__coroutine_destroy_nothrow(__destroy); } struct __promise @@ -220,29 +224,6 @@ namespace STDEXEC {&__destroy_and_continue_frame::__resume}, {}}; - struct __symmetric_transfer_frame : __detail::__synthetic_coro_frame - { - static void __resume(void* __address) noexcept - { - // Make a local copy of the promise to ensure we can safely destroy the suspended - // coroutine after resuming the continuation. - auto __promise = static_cast<__symmetric_transfer_frame*>(__address)->__promise_; - STDEXEC_ATTRIBUTE(musttail) - return STDEXEC::__coroutine_resume_nothrow(__promise.__continue_.address()); - } - - struct __promise - { - __std::coroutine_handle<> __continue_{}; - } __promise_; - - static thread_local __symmetric_transfer_frame value; - }; - - inline thread_local __symmetric_transfer_frame __symmetric_transfer_frame::value{ - {&__symmetric_transfer_frame::__resume}, - {}}; - inline auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __destroy, // __std::coroutine_handle<> __continue) noexcept // -> __std::coroutine_handle<> @@ -252,13 +233,6 @@ namespace STDEXEC return __std::coroutine_handle<>::from_address(&__destroy_and_continue_frame::value); } - inline auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __continue) noexcept // - -> __std::coroutine_handle<> - { - __symmetric_transfer_frame::value.__promise_.__continue_ = __continue; - return __std::coroutine_handle<>::from_address(&__symmetric_transfer_frame::value); - } - # else STDEXEC_ATTRIBUTE(always_inline) @@ -270,14 +244,8 @@ namespace STDEXEC return __continue; } - STDEXEC_ATTRIBUTE(always_inline) - auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __continue) noexcept // - -> __std::coroutine_handle<> - { - return __continue; - } +# endif // !defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND) -# endif // STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950 } // namespace STDEXEC #endif // !STDEXEC_NO_STDCPP_COROUTINES() diff --git a/test/stdexec/types/test_task.cpp b/test/stdexec/types/test_task.cpp index 85c8aa64a..48accaa08 100644 --- a/test/stdexec/types/test_task.cpp +++ b/test/stdexec/types/test_task.cpp @@ -327,8 +327,6 @@ namespace })); } - // FUTURE TODO: add support so that `co_await sndr` can return a reference. - constinit int global_int = 0; constexpr auto wrap_ref = ex::then([](auto &i) noexcept { return std::ref(i); }); @@ -374,17 +372,28 @@ namespace struct operation { Receiver rcvr_; + bool complete_inline_ = true; void start() & noexcept { - ex::set_stopped(std::move(rcvr_)); + if (complete_inline_) + { + ex::set_stopped(std::move(rcvr_)); + } + else + { + std::thread([rcvr = std::move(rcvr_)]() mutable noexcept + { ex::set_stopped(std::move(rcvr)); }) + .detach(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } }; template auto connect(Receiver rcvr) && -> operation { - return {std::move(rcvr)}; + return {std::move(rcvr), complete_inline_}; } struct attrs @@ -402,6 +411,8 @@ namespace { return {}; } + + bool complete_inline_ = true; }; TEST_CASE("task co_awaiting inline|async_affine stopped sender does not deadlock", @@ -445,6 +456,39 @@ namespace static_assert(!ex::sender_to, sink>); static_assert(ex::sender_in, ex::__sync_wait::__env>); + auto await_stopped_sender(bool complete_inline) -> ex::task + { + co_await inline_affine_stopped_sender{complete_inline}; + } + + TEST_CASE("repro for NVIDIA/stdexec#2047", "[types][task]") + { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); + auto pool = exec::static_thread_pool(1); + + auto scope = ex::counting_scope(); + ex::spawn(ex::starts_on(pool.get_scheduler(), await_stopped_sender(true)) + | ex::upon_error([](auto) noexcept { std::terminate(); }), + scope.get_token()); + ex::sync_wait(scope.join()); + } + + TEST_CASE("repro for NVIDIA/stdexec#2047 async completion from another thread", "[types][task]") + { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); + auto pool = exec::static_thread_pool(1); + + auto scope = ex::counting_scope(); + ex::spawn(ex::starts_on(pool.get_scheduler(), await_stopped_sender(false)) + | ex::upon_error([](auto) noexcept { std::terminate(); }), + scope.get_token()); + ex::sync_wait(scope.join()); + } + // TODO: add tests for stop token support in task } // anonymous namespace