Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions include/stdexec/__detail/__when_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,12 @@ namespace stdexec {
using __f = __meval<
__concat_completion_signatures,
__meval<__eptr_completion_if_t, __all_nothrow_decay_copyable_results<_Senders...>>,
completion_signatures<set_stopped_t()>,
__minvoke<__with_default<__qq<__set_values_sig_t>, completion_signatures<>>, _Senders...>,
__transform_completion_signatures<
__completion_signatures_of_t<_Senders, _Env...>,
__mconst<completion_signatures<>>::__f,
__set_error_t,
completion_signatures<>,
completion_signatures<set_stopped_t()>,
__concat_completion_signatures
>...
>;
Expand Down Expand Up @@ -191,7 +190,7 @@ namespace stdexec {

struct _INVALID_ARGUMENTS_TO_WHEN_ALL_ { };

template <class _ErrorsVariant, class _ValuesTuple, class _StopToken>
template <class _ErrorsVariant, class _ValuesTuple, class _StopToken, bool _SendsStopped>
struct __when_all_state {
using __stop_callback_t = stop_callback_for_t<_StopToken, __on_stop_request>;

Expand Down Expand Up @@ -222,7 +221,11 @@ namespace stdexec {
}
break;
case __stopped:
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
if constexpr (_SendsStopped) {
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
} else {
STDEXEC_UNREACHABLE();
}
break;
default:;
}
Expand All @@ -243,7 +246,11 @@ namespace stdexec {
using _Traits = __traits<_Env, _Child...>;
using _ErrorsVariant = _Traits::__errors_variant;
using _ValuesTuple = _Traits::__values_tuple;
using _State = __when_all_state<_ErrorsVariant, _ValuesTuple, stop_token_of_t<_Env>>;
using _State = __when_all_state<
_ErrorsVariant,
_ValuesTuple,
stop_token_of_t<_Env>,
(sends_stopped<_Child, _Env> || ...)>;
return _State{sizeof...(_Child)};
};
}
Expand Down Expand Up @@ -309,15 +316,9 @@ namespace stdexec {
// register stop callback:
__state.__on_stop_.emplace(
get_stop_token(stdexec::get_env(__rcvr)), __on_stop_request{__state.__stop_source_});
if (__state.__stop_source_.stop_requested()) {
// Stop has already been requested. Don't bother starting
// the child operations.
stdexec::set_stopped(static_cast<_Receiver&&>(__rcvr));
} else {
(stdexec::start(__child_ops), ...);
if constexpr (sizeof...(__child_ops) == 0) {
__state.__complete(__rcvr);
}
(stdexec::start(__child_ops), ...);
if constexpr (sizeof...(__child_ops) == 0) {
__state.__complete(__rcvr);
}
};

Expand Down
4 changes: 2 additions & 2 deletions test/exec/test_fork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace {
STATIC_REQUIRE(
set_equivalent<
completion_signatures_of_t<decltype(sndr), env<>>,
completion_signatures<set_value_t(), set_error_t(std::exception_ptr), set_stopped_t()>
completion_signatures<set_value_t(), set_error_t(std::exception_ptr)>
>);
}

Expand Down Expand Up @@ -65,7 +65,7 @@ namespace {
STATIC_REQUIRE(
set_equivalent<
completion_signatures_of_t<decltype(sndr), env<>>,
completion_signatures<set_value_t(int, int), set_error_t(std::exception_ptr), set_stopped_t()>
completion_signatures<set_value_t(int, int), set_error_t(std::exception_ptr)>
>);

auto [i1, i2] = sync_wait(sndr).value();
Expand Down
25 changes: 21 additions & 4 deletions test/stdexec/algos/adaptors/test_when_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,14 @@ namespace {
ex::when_all(ex::just(13), ex::just_error(std::exception_ptr{}), ex::just_stopped()));
}

TEST_CASE("when_all has the sends_stopped == true", "[adaptors][when_all]") {
check_sends_stopped<true>(ex::when_all(ex::just(13)));
check_sends_stopped<true>(ex::when_all(ex::just_error(-1)));
TEST_CASE(
"when_all has sends_stopped == true if and only if at least one child sends stopped",
"[adaptors][when_all]") {
check_sends_stopped<false>(ex::when_all(ex::just(13)));
check_sends_stopped<false>(ex::when_all(ex::just_error(-1)));
check_sends_stopped<true>(ex::when_all(ex::just_stopped()));

check_sends_stopped<true>(ex::when_all(ex::just(3), ex::just(0.14)));
check_sends_stopped<false>(ex::when_all(ex::just(3), ex::just(0.14)));
check_sends_stopped<true>(ex::when_all(ex::just(3), ex::just_error(-1), ex::just_stopped()));
}

Expand Down Expand Up @@ -381,4 +383,19 @@ namespace {
wait_for_value(std::move(snd), std::string{"hello world"});
}
}

TEST_CASE("when_all defers stop handling to its children", "[adaptors][when_all]") {
ex::inplace_stop_source source;
source.request_stop();
auto snd = ex::when_all(ex::just(), ex::just());
static_assert(set_equivalent<
ex::completion_signatures_of_t<decltype(snd), ex::env<>>,
ex::completion_signatures<ex::set_value_t()>>);
auto env = ex::prop(ex::get_stop_token, source.get_token());
static_assert(set_equivalent<
ex::completion_signatures_of_t<decltype(snd), decltype(env)>,
ex::completion_signatures<ex::set_value_t()>>);
auto op = ex::connect(snd, expect_void_receiver{});
ex::start(op);
}
} // namespace
Loading