Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions include/exec/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,16 +672,17 @@ namespace experimental::execution
if constexpr (requires { __coro_.promise().stop_requested() ? 0 : 1; })
{
if (__coro_.promise().stop_requested())
return STDEXEC::__coroutine_destroy_and_continue(
__parent.promise().unhandled_stopped());
return __parent.unhandled_stopped();
}
return __coro_;
}

constexpr auto await_resume() -> _Ty
{
__context_.reset();
scope_guard __on_exit{[this]() noexcept { std::exchange(__coro_, {}).destroy(); }};
scope_guard __on_exit{
[this]() noexcept
{ STDEXEC::__coroutine_destroy_nothrow(std::exchange(__coro_, {})); }};

if (__coro_.promise().__data_.index() == 1)
std::rethrow_exception(std::move(__var::__get<1>(__coro_.promise().__data_)));
Expand Down
24 changes: 13 additions & 11 deletions include/stdexec/__detail/__as_awaitable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ namespace STDEXEC
// as normal.
if (__result_.__is_valueless())
{
return STDEXEC::__coroutine_destroy_and_continue(__continuation_.unhandled_stopped());
return __continuation_.unhandled_stopped();
}
else
{
Expand Down Expand Up @@ -317,7 +317,6 @@ namespace STDEXEC

STDEXEC_CONSTEXPR_CXX23 auto
await_suspend([[maybe_unused]] __std::coroutine_handle<> __continuation) noexcept
-> __std::coroutine_handle<>
{
STDEXEC_ASSERT(this->__continuation_.handle() == __continuation);

Expand All @@ -329,15 +328,18 @@ namespace STDEXEC
// proceed to resume().
// - If T2 hasn't run yet, it will see {} from its load in __done() and
// skip the spin entirely
std::thread::id const __old_id = this->__thread_id_.exchange(std::thread::id{},
__std::memory_order_release);
if (__old_id == std::thread::id{})
{
// The receiver already cleared __thread_id_, so it completed on the same
// thread. Resume the continuation directly.
return this->__get_continuation();
}
return __std::noop_coroutine();
bool const __done = //
this->__thread_id_.exchange(std::thread::id{}, __std::memory_order_release)
== std::thread::id{};

// If the receiver already cleared __thread_id_, it completed on the same thread.
// Resume the continuation directly.
# if !defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND)
return __done ? this->__get_continuation() : __std::noop_coroutine();
# else
if (__done)
STDEXEC::__coroutine_resume_nothrow(this->__get_continuation());
# endif
}

private:
Expand Down
2 changes: 2 additions & 0 deletions include/stdexec/__detail/__config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ namespace STDEXEC::__std

#if STDEXEC_MSVC() && !STDEXEC_CLANG_CL() && STDEXEC_MSVC_VERSION >= 1950
# define STDEXEC_ATTR_WHICH_10(_ATTR) [[msvc::musttail]]
#elif STDEXEC_HAS_CPP_ATTRIBUTE(clang::musttail)
# define STDEXEC_ATTR_WHICH_10(_ATTR) [[clang::musttail]]
#elif STDEXEC_HAS_CPP_ATTRIBUTE(gnu::musttail)
# define STDEXEC_ATTR_WHICH_10(_ATTR) [[gnu::musttail]]
#else
Expand Down
56 changes: 12 additions & 44 deletions include/stdexec/coroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

#if !STDEXEC_NO_STDCPP_COROUTINES()

# if STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950
# define STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND
# endif

namespace STDEXEC
{
template <class _Tp, __one_of<_Tp, void> _Up>
Expand Down Expand Up @@ -179,7 +183,7 @@ namespace STDEXEC
sizeof(__synthetic_coro_frame));
} // namespace __detail

# if STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950
# if defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND)
// MSVCBUG https://developercommunity.visualstudio.com/t/destroy-coroutine-from-final_suspend-r/10096047

// Prior to Visual Studio 17.9 (Feb, 2024), aka MSVC 19.39, MSVC incorrectly allocates
Expand All @@ -189,8 +193,8 @@ namespace STDEXEC
// implementation when NRVO is in play.

// This workaround delays the destruction of the suspended coroutine by wrapping the
// continuation in another "synthetic" coroutine the resumes the continuation and *then*
// destroys the suspended coroutine.
// continuation in another "synthetic" coroutine that resumes the continuation and
// *then* destroys the suspended coroutine.

// The wrapping coroutine frame is thread-local and reused within the thread for each
// destroy-and-continue sequence.
Expand All @@ -201,10 +205,10 @@ namespace STDEXEC
{
// Make a local copy of the promise to ensure we can safely destroy the suspended
// coroutine after resuming the continuation.
auto __promise = static_cast<__destroy_and_continue_frame*>(__address)->__promise_;
STDEXEC::__coroutine_resume_nothrow(__promise.__continue_);
STDEXEC_ATTRIBUTE(musttail)
return STDEXEC::__coroutine_destroy_nothrow(__promise.__destroy_.address());
auto& __self = *static_cast<__destroy_and_continue_frame*>(__address);
auto __destroy = __self.__promise_.__destroy_;
STDEXEC::__coroutine_resume_nothrow(__self.__promise_.__continue_);
STDEXEC::__coroutine_destroy_nothrow(__destroy);
}

struct __promise
Expand All @@ -220,29 +224,6 @@ namespace STDEXEC
{&__destroy_and_continue_frame::__resume},
{}};

struct __symmetric_transfer_frame : __detail::__synthetic_coro_frame
{
static void __resume(void* __address) noexcept
{
// Make a local copy of the promise to ensure we can safely destroy the suspended
// coroutine after resuming the continuation.
auto __promise = static_cast<__symmetric_transfer_frame*>(__address)->__promise_;
STDEXEC_ATTRIBUTE(musttail)
return STDEXEC::__coroutine_resume_nothrow(__promise.__continue_.address());
}

struct __promise
{
__std::coroutine_handle<> __continue_{};
} __promise_;

static thread_local __symmetric_transfer_frame value;
};

inline thread_local __symmetric_transfer_frame __symmetric_transfer_frame::value{
{&__symmetric_transfer_frame::__resume},
{}};

inline auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __destroy, //
__std::coroutine_handle<> __continue) noexcept //
-> __std::coroutine_handle<>
Expand All @@ -252,13 +233,6 @@ namespace STDEXEC
return __std::coroutine_handle<>::from_address(&__destroy_and_continue_frame::value);
}

inline auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __continue) noexcept //
-> __std::coroutine_handle<>
{
__symmetric_transfer_frame::value.__promise_.__continue_ = __continue;
return __std::coroutine_handle<>::from_address(&__symmetric_transfer_frame::value);
}

# else

STDEXEC_ATTRIBUTE(always_inline)
Expand All @@ -270,14 +244,8 @@ namespace STDEXEC
return __continue;
}

STDEXEC_ATTRIBUTE(always_inline)
auto __coroutine_destroy_and_continue(__std::coroutine_handle<> __continue) noexcept //
-> __std::coroutine_handle<>
{
return __continue;
}
# endif // !defined(STDEXEC_MSVC_CORO_DESTROY_BUG_WORKAROUND)

# endif // STDEXEC_MSVC() && STDEXEC_MSVC_VERSION < 1950
} // namespace STDEXEC

#endif // !STDEXEC_NO_STDCPP_COROUTINES()
52 changes: 48 additions & 4 deletions test/stdexec/types/test_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ namespace
}));
}

// FUTURE TODO: add support so that `co_await sndr` can return a reference.

constinit int global_int = 0;

constexpr auto wrap_ref = ex::then([](auto &i) noexcept { return std::ref(i); });
Expand Down Expand Up @@ -374,17 +372,28 @@ namespace
struct operation
{
Receiver rcvr_;
bool complete_inline_ = true;

void start() & noexcept
{
ex::set_stopped(std::move(rcvr_));
if (complete_inline_)
{
ex::set_stopped(std::move(rcvr_));
}
else
{
std::thread([rcvr = std::move(rcvr_)]() mutable noexcept
{ ex::set_stopped(std::move(rcvr)); })
.detach();
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
};

template <class Receiver>
auto connect(Receiver rcvr) && -> operation<Receiver>
{
return {std::move(rcvr)};
return {std::move(rcvr), complete_inline_};
}

struct attrs
Expand All @@ -402,6 +411,8 @@ namespace
{
return {};
}

bool complete_inline_ = true;
};

TEST_CASE("task co_awaiting inline|async_affine stopped sender does not deadlock",
Expand Down Expand Up @@ -445,6 +456,39 @@ namespace
static_assert(!ex::sender_to<ex::task<void>, sink>);
static_assert(ex::sender_in<ex::task<void>, ex::__sync_wait::__env>);

auto await_stopped_sender(bool complete_inline) -> ex::task<void>
{
co_await inline_affine_stopped_sender{complete_inline};
}

TEST_CASE("repro for NVIDIA/stdexec#2047", "[types][task]")
{
[[maybe_unused]]
// repeat this test 1000 times because it can expose race conditions
int i = GENERATE(repeat(1000, values({1})));
auto pool = exec::static_thread_pool(1);

auto scope = ex::counting_scope();
ex::spawn(ex::starts_on(pool.get_scheduler(), await_stopped_sender(true))
| ex::upon_error([](auto) noexcept { std::terminate(); }),
scope.get_token());
ex::sync_wait(scope.join());
}

TEST_CASE("repro for NVIDIA/stdexec#2047 async completion from another thread", "[types][task]")
{
[[maybe_unused]]
// repeat this test 1000 times because it can expose race conditions
int i = GENERATE(repeat(1000, values({1})));
auto pool = exec::static_thread_pool(1);

auto scope = ex::counting_scope();
ex::spawn(ex::starts_on(pool.get_scheduler(), await_stopped_sender(false))
| ex::upon_error([](auto) noexcept { std::terminate(); }),
scope.get_token());
ex::sync_wait(scope.join());
}

// TODO: add tests for stop token support in task

} // anonymous namespace
Expand Down
Loading