Skip to content

Commit

Permalink
address feedback: refactored to use async_scope and take a pack of se…
Browse files Browse the repository at this point in the history
…nders
  • Loading branch information
kirkshoop committed May 25, 2024
1 parent a30ee87 commit 86a1738
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 46 deletions.
8 changes: 8 additions & 0 deletions include/exec/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ namespace exec {
struct __impl;
struct async_scope;

template<class _A>
concept __async_scope =
!std::is_copy_constructible_v<_A> && !std::is_move_constructible_v<_A> &&
!std::is_copy_assignable_v<_A> && !std::is_move_assignable_v<_A> &&
requires (_A& __a) {
{__a.nest(stdexec::just())} -> sender_of<stdexec::set_value_t()>;
};

struct __task : __immovable {
const __impl* __scope_;
void (*__notify_waiter)(__task*) noexcept;
Expand Down
128 changes: 82 additions & 46 deletions include/exec/start_now.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "stdexec/__detail/__meta.hpp"
#include "stdexec/__detail/__type_traits.hpp"

#include "async_scope.hpp"

namespace exec {
/////////////////////////////////////////////////////////////////////////////
// NOT TO SPEC: __start_now
Expand All @@ -37,7 +39,7 @@ namespace exec {
} // namespace

template <class _Ty>
using __ref_t = decltype(__ref(__declval<_Ty&>()));
using __ref_t = decltype(__ref(stdexec::__declval<_Ty&>()));

struct __joiner {
virtual ~__joiner() {}
Expand All @@ -46,27 +48,25 @@ namespace exec {

template <class _StgRef>
struct __receiver {
using receiver_concept = receiver_t;
using receiver_concept = stdexec::receiver_t;
using __t = __receiver;
using __id = __receiver;

using _Storage = __decay_t<__call_result_t<_StgRef>>;
using _Storage = stdexec::__decay_t<stdexec::__call_result_t<_StgRef>>;
using _Env = typename _Storage::__env_t;

_StgRef __stgref_;

template <class... _As>
void set_value(_As&&... __as) noexcept {
auto __joiner = __stgref_().__joiner_.exchange(nullptr);
if (__joiner) {__joiner->join();}
__stgref_().complete();
}

template <class _Error>
void set_error(_Error&& __err) noexcept = delete;

void set_stopped() noexcept {
auto __joiner = __stgref_().__joiner_.exchange(nullptr);
if (__joiner) {__joiner->join();}
__stgref_().complete();
}

// Forward all receiever queries.
Expand All @@ -75,107 +75,143 @@ namespace exec {
}
};

static inline const __joiner __empty_joiner_{};
inline const __joiner __empty_joiner_{};

template <class _StgRef, class _Receiver>
struct __operation : __joiner {
using __id = __operation;
using __t = __operation;

using _Storage = __decay_t<__call_result_t<_StgRef>>;

_StgRef __stgref_;
mutable _Receiver __rcvr_;

template<class _R>
__operation(_StgRef __stgref, _R&& __r) : __stgref_(__stgref), __rcvr_((_R&&)__r) {}

void join() const noexcept override {
set_value(std::move(__rcvr_));
stdexec::set_value(std::move(__rcvr_));
}

template <__decays_to<__operation> _Self>
STDEXEC_MEMFN_DECL(
auto start)(this _Self& __self) noexcept //
-> void {
void start() noexcept {
const __joiner* expected = &__empty_joiner_;
if (!__self.__stgref_().__joiner_.compare_exchange_strong(expected, &__self)) {
__self.join();
if (!__stgref_().__joiner_.compare_exchange_strong(expected, this)) {
join();
}
}
};

template <class _StgRef>
struct __sender {
using sender_concept = sender_t;
using sender_concept = stdexec::sender_t;
using __id = __sender;
using __t = __sender;

using _Storage = __decay_t<__call_result_t<_StgRef>>;

template <class _Env>
using __completions_t = completion_signatures<set_value_t(), set_stopped_t()>;
using __completions_t = stdexec::completion_signatures<stdexec::set_value_t(), stdexec::set_stopped_t()>;

_StgRef __stgref_;

template <__decays_to<__sender> _Self, class _Receiver>
requires receiver_of<_Receiver, __completions_t<env_of_t<_Receiver>>>
using connect_t = stdexec::connect_t;
template <stdexec::__decays_to<__sender> _Self, stdexec::receiver _Receiver>
requires stdexec::receiver_of<_Receiver, __completions_t>
STDEXEC_MEMFN_DECL(
auto connect)(this _Self&& __self, _Receiver __rcvr) //
noexcept(std::is_nothrow_move_constructible_v<_Receiver>)
-> __operation<_StgRef, std::remove_cvref_t<_Receiver>> {
return {static_cast<_Self&&>(__self).__stgref_, static_cast<_Receiver&&>(__rcvr)};
}

template <class _Env>
auto get_completion_signatures(_Env&&) -> __completions_t<_Env> {
auto get_completion_signatures(stdexec::__ignore = {}) -> __completions_t {
return {};
}
};

template <class _SenderId, class _EnvId>
template <class _EnvId, class _AsyncScopeId, class... _SenderId>
struct __storage {
using _Sender = stdexec::__t<_SenderId>;
using _Env = stdexec::__t<_EnvId>;
using _AsyncScope = stdexec::__t<_AsyncScopeId>;
using __receiver_t = __receiver<__ref_t<__storage>>;
using __sender_t = __sender<__ref_t<const __storage>>;
using __env_t = __env::__join_t<_Env, __env::__with<inplace_stop_token, get_stop_token_t>>;

static_assert(sender_to<_Sender, __receiver_t>, "The sender passed to start_now does not satisfy the constraints");
using __env_t = stdexec::__env::__join_t<stdexec::__env::__with<stdexec::inplace_stop_token, stdexec::get_stop_token_t>, _Env>;
template<class _S>
using __nested_t = decltype(stdexec::__declval<_AsyncScope&>().nest(stdexec::__declval<_S&&>()));

mutable std::atomic<const __joiner*> __joiner_{&__empty_joiner_};
mutable std::atomic<const __joiner*> __joiner_;
mutable std::atomic<int> __pending_;

STDEXEC_ATTRIBUTE((no_unique_address))
inplace_stop_source source;
stdexec::inplace_stop_source __source_;
STDEXEC_ATTRIBUTE((no_unique_address))
__env_t __env_;
STDEXEC_ATTRIBUTE((no_unique_address))
connect_result_t<_Sender, __receiver_t> __op_state_;
stdexec::__decayed_tuple<stdexec::connect_result_t<__nested_t<stdexec::__t<_SenderId>>, __receiver_t>...> __op_state_;

template<class _Sender>
auto __construct(_Sender&& __sndr) noexcept {
return [&, this](){return stdexec::connect(static_cast<_Sender&&>(__sndr), __receiver_t{__ref(*this)});};
}

__storage(_Sender&& __sndr, _Env __env)
: __env_(__env::__join(std::move(__env), __env::__with{source.get_token(), get_stop_token}))
, __op_state_(connect(static_cast<_Sender&&>(__sndr), __receiver_t{__ref(*this)})) {
start(__op_state_);
__storage(_Env __env, _AsyncScope& __scope, stdexec::__t<_SenderId>... __sndr)
: __joiner_(&__empty_joiner_)
, __pending_(0)
, __source_()
, __env_(stdexec::__env::__join(stdexec::__env::__with{__source_.get_token(), stdexec::get_stop_token}, std::move(__env)))
, __op_state_(stdexec::__conv{__construct(__scope.nest(__sndr))}...) {
__pending_ = sizeof...(_SenderId);
stdexec::__apply([](auto&... __op_state) noexcept { bool arr[]{(stdexec::start(__op_state), true)...}; (void)arr; }, __op_state_);
}

bool request_stop() noexcept {
return source.request_stop();
return __source_.request_stop();
}

inplace_stop_token get_token() const noexcept {
return source.get_token();
stdexec::inplace_stop_token get_token() const noexcept {
return __source_.get_token();
}

auto join() const noexcept -> __sender_t {
[[nodiscard]] auto async_wait() const noexcept -> __sender_t {
return __sender_t{__ref(*this)};
}

private:
friend struct __receiver<__ref_t<__storage>>;
void complete() noexcept {
if (--__pending_ == 0) {
auto __joiner = __joiner_.exchange(nullptr);
if (__joiner) {__joiner->join();}
}
}
};

template <class _Env, class _AsyncScope, class... _Sender>
using __storage_t = __storage<
stdexec::__id<std::remove_cvref_t<_Env>>,
stdexec::__id<std::remove_cvref_t<_AsyncScope>>,
stdexec::__id<std::remove_cvref_t<_Sender>>...>;

struct start_now_t {
template <sender _Sender, class _Env = __root_env_t>
__storage<__id<_Sender>, __id<_Env>> operator()(_Sender&& __sndr, _Env&& __env = {}) const noexcept(false) {
return __storage<__id<_Sender>, __id<_Env>>{
static_cast<_Sender&&>(__sndr), static_cast<_Env&&>(__env)};
template <stdexec::queryable _Env, exec::__scope::__async_scope _AsyncScope, stdexec::sender... _Sender>
requires (!exec::__scope::__async_scope<_Env>) && (!stdexec::sender<_Env>) && (!stdexec::sender<_AsyncScope>)
__storage_t<_Env, _AsyncScope, _Sender...> operator()(_Env&& __env, _AsyncScope& __scope, _Sender&&... __sndr) const
noexcept(
std::is_nothrow_move_constructible_v<_Env> &&
(std::is_nothrow_move_constructible_v<_Sender> && ... && true)) {
static_assert(stdexec::unstoppable_token<stdexec::stop_token_of_t<_Env>>, "start_now() requires that the given environment does not have a stoppable token");
static_assert((stdexec::__nofail_sender<_Sender> && ... && true), "start_now() requires that the given senders have no set_error(..) completions");
using __receiver_t = __receiver<__ref_t<__storage_t<_Env, _AsyncScope, _Sender...>>>;
static_assert((stdexec::sender_to<_Sender, __receiver_t> && ... && true), "The senders passed to start_now do not satisfy the constraints");
return __storage_t<_Env, _AsyncScope, _Sender...>{
static_cast<_Env&&>(__env), std::ref(__scope), static_cast<_Sender&&>(__sndr)...};
}
template <exec::__scope::__async_scope _AsyncScope, stdexec::sender... _Sender>
requires (!stdexec::sender<_AsyncScope>)
__storage_t<stdexec::__root_env_t, _AsyncScope, _Sender...> operator()(_AsyncScope& __scope, _Sender&&... __sndr) const
noexcept(
(std::is_nothrow_move_constructible_v<_Sender> && ... && true)) {
static_assert((stdexec::__nofail_sender<_Sender> && ... && true), "start_now() requires that the given senders have no set_error(..) completions");
using __receiver_t = __receiver<__ref_t<__storage_t<stdexec::__root_env_t, _AsyncScope, _Sender...>>>;
static_assert((stdexec::sender_to<_Sender, __receiver_t> && ... && true), "The senders passed to start_now do not satisfy the constraints");
return __storage_t<stdexec::__root_env_t, _AsyncScope, _Sender...>{
stdexec::__root_env_t{}, std::ref(__scope), static_cast<_Sender&&>(__sndr)...};
}
};
} // namespace __start_now_
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ set(stdexec_test_sources
exec/async_scope/test_dtor.cpp
exec/async_scope/test_spawn.cpp
exec/async_scope/test_spawn_future.cpp
exec/async_scope/test_start_now.cpp
exec/async_scope/test_empty.cpp
exec/async_scope/test_stop.cpp
exec/test_when_any.cpp
Expand Down

0 comments on commit 86a1738

Please sign in to comment.