Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 117 additions & 86 deletions include/asioexec/completion_token.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,7 @@ namespace asioexec {
::STDEXEC::set_stopped_t()
>;

template <typename, typename>
class completion_handler;

template <typename Signatures, typename Receiver>
template <typename Mutex, typename Signatures, typename Receiver>
struct operation_state_base {
class frame_;

Expand All @@ -153,57 +150,71 @@ namespace asioexec {

Receiver r_;
asio_impl::cancellation_signal signal_;
std::recursive_mutex m_;
[[no_unique_address]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in operation states, this should be STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS

Mutex m_;
frame_* frames_{nullptr};
std::exception_ptr ex_;
bool abandoned_{false};

class frame_ {
operation_state_base& self_;
std::unique_lock<std::recursive_mutex> l_;
operation_state_base* self_;
frame_* prev_;
public:
explicit frame_(operation_state_base& self) noexcept
: self_(self)
, l_(self.m_)
: self_([&]() noexcept {
self.m_.lock();
return &self;
}())
Comment on lines +164 to +167
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lambdas are expensive at compile time. this could simply be:

Suggested change
: self_([&]() noexcept {
self.m_.lock();
return &self;
}())
: self_((self.m_.lock(), &self))

or if you find that distasteful, you could use a delegating constructor:

  explicit frame_(operation_state_base& self) noexcept
    : frame_(std::unique_lock(self.m_), self) {
  }
private:
  explicit frame_(std::unique_lock<Mutex> lk, operation_state_base& self) noexcept
    : self_(&self) {
    lk.release();
  }

, prev_(self.frames_) {
self.frames_ = this;
}

constexpr frame_(frame_&& other) noexcept
: self_(std::exchange(other.self_, nullptr))
, prev_(std::exchange(other.prev_, nullptr)) {
}

frame_(const frame_&) = delete;

~frame_() noexcept {
if (l_) {
STDEXEC_ASSERT(self_.frames_ == this);
self_.frames_ = prev_;
if (!self_.frames_ && self_.abandoned_) {
if (self_) {
std::unique_lock l(self_->m_, std::adopt_lock);
STDEXEC_ASSERT(self_->frames_ == this);
self_->frames_ = prev_;
if (!self_->frames_ && self_->abandoned_) {
// We are the last frame and the handler is gone so it's up to us to
// finalize the operation
l_.unlock();
self_.callback_.reset();
if (self_.ex_) {
::STDEXEC::set_error(static_cast<Receiver&&>(self_.r_), std::move(self_.ex_));
l.unlock();
self_->callback_.reset();
if (self_->ex_) {
::STDEXEC::set_error(static_cast<Receiver&&>(self_->r_), std::move(self_->ex_));
} else {
::STDEXEC::set_stopped(static_cast<Receiver&&>(self_.r_));
::STDEXEC::set_stopped(static_cast<Receiver&&>(self_->r_));
}
}
}
}

explicit operator bool() const noexcept {
return bool(l_);
return bool(self_);
}

void release() noexcept {
auto ptr = this;
do {
STDEXEC_ASSERT(ptr->l_);
STDEXEC_ASSERT(self_.frames_ == ptr);
ptr = ptr->prev_;
self_.frames_->l_.unlock();
self_.frames_->prev_ = nullptr;
self_.frames_ = ptr;
} while (ptr);
auto&& self = *self_;
STDEXEC_ASSERT(this == self.frames_);
for (;;) {
STDEXEC_ASSERT(self.frames_);
STDEXEC_ASSERT(self.frames_->self_ == &self);
self.frames_->self_ = nullptr;
const auto prev = self.frames_->prev_;
self.frames_->prev_ = nullptr;
self.frames_ = prev;
self.m_.unlock();
if (!prev) {
break;
}
Comment on lines +209 to +215
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use std::exchange?

Suggested change
const auto prev = self.frames_->prev_;
self.frames_->prev_ = nullptr;
self.frames_ = prev;
self.m_.unlock();
if (!prev) {
break;
}
self.frames_ = std::exchange(self.frames_->prev_, nullptr);
self.m_.unlock();
if (!self.frames_) {
break;
}

}
STDEXEC_ASSERT(!self_);
}
};

Expand Down Expand Up @@ -238,11 +249,12 @@ namespace asioexec {
callback_;
};

template <typename Signatures, typename Receiver>
template <typename Mutex, typename Signatures, typename Receiver>
class completion_handler {
operation_state_base<Signatures, Receiver>* self_;
using operation_state_type_ = operation_state_base<Mutex, Signatures, Receiver>;
operation_state_type_* self_;
public:
explicit completion_handler(operation_state_base<Signatures, Receiver>& self) noexcept
explicit completion_handler(operation_state_type_& self) noexcept
: self_(&self) {
}

Expand All @@ -257,7 +269,7 @@ namespace asioexec {
// When this goes out of scope it might send set stopped or set error, or
// it might defer that to the executor frames above us on the call stack
// (if any)
const typename operation_state_base<Signatures, Receiver>::frame_ frame(*self_);
const typename operation_state_type_::frame_ frame(*self_);
self_->abandoned_ = true;
}
}
Expand Down Expand Up @@ -296,15 +308,20 @@ namespace asioexec {
return self_->signal_.slot();
}

operation_state_base<Signatures, Receiver>& state() const noexcept {
operation_state_type_& state() const noexcept {
STDEXEC_ASSERT(self_);
return *self_;
}
};

template <typename Signatures, typename Receiver, typename Initiation, typename Args>
class operation_state : operation_state_base<Signatures, Receiver> {
using base_ = operation_state_base<Signatures, Receiver>;
template <
typename Mutex,
typename Signatures,
typename Receiver,
typename Initiation,
typename Args>
class operation_state : operation_state_base<Mutex, Signatures, Receiver> {
using base_ = operation_state_base<Mutex, Signatures, Receiver>;
Initiation init_;
Args args_;
public:
Expand All @@ -326,7 +343,7 @@ namespace asioexec {
[&](auto&&... args) {
std::invoke(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STDEXEC::__invoke compiles about ~2x faster than libstdc++'s std::invoke.

static_cast<Initiation&&>(init_),
completion_handler<Signatures, Receiver>(*this),
completion_handler<Mutex, Signatures, Receiver>(*this),
static_cast<decltype(args)&&>(args)...);
},
std::move(args_));
Expand All @@ -349,9 +366,12 @@ namespace asioexec {
}
};

template <typename Signatures, typename Initiation, typename... Args>
template <typename Mutex, typename Signatures, typename Initiation, typename... Args>
class sender {
using args_type_ = std::tuple<std::decay_t<Args>...>;
template <typename Receiver>
using operation_state_type_ =
operation_state<Mutex, Signatures, std::remove_cvref_t<Receiver>, Initiation, args_type_>;
public:
using sender_concept = ::STDEXEC::sender_t;

Expand All @@ -375,32 +395,25 @@ namespace asioexec {
template <typename Receiver>
requires ::STDEXEC::receiver_of<
std::remove_cvref_t<Receiver>,
::STDEXEC::completion_signatures_of_t<const sender&, ::STDEXEC::env_of_t<Receiver>>
>
constexpr auto connect(Receiver&& receiver) const & noexcept(
std::is_nothrow_constructible_v<
operation_state<Signatures, std::remove_cvref_t<Receiver>, Initiation, args_type_>,
Receiver,
const Initiation&,
const args_type_&
>) {
return operation_state<Signatures, std::remove_cvref_t<Receiver>, Initiation, args_type_>(
static_cast<Receiver&&>(receiver), init_, args_);
::STDEXEC::completion_signatures_of_t<const sender&, ::STDEXEC::env_of_t<Receiver>>>
constexpr auto connect(Receiver&& receiver) const & noexcept(std::is_nothrow_constructible_v<
operation_state_type_<Receiver>,
Receiver,
const Initiation&,
const args_type_&>) {
return operation_state_type_<Receiver>(static_cast<Receiver&&>(receiver), init_, args_);
}

template <typename Receiver>
requires ::STDEXEC::receiver_of<
std::remove_cvref_t<Receiver>,
::STDEXEC::completion_signatures_of_t<sender, ::STDEXEC::env_of_t<Receiver>>
>
constexpr auto connect(Receiver&& receiver) && noexcept(
std::is_nothrow_constructible_v<
operation_state<Signatures, std::remove_cvref_t<Receiver>, Initiation, args_type_>,
Receiver,
Initiation,
args_type_
>) {
return operation_state<Signatures, std::remove_cvref_t<Receiver>, Initiation, args_type_>(
::STDEXEC::completion_signatures_of_t<sender, ::STDEXEC::env_of_t<Receiver>>>
constexpr auto connect(Receiver&& receiver) && noexcept(std::is_nothrow_constructible_v<
operation_state_type_<Receiver>,
Receiver,
Initiation,
args_type_>) {
return operation_state_type_<Receiver>(
static_cast<Receiver&&>(receiver),
static_cast<Initiation&&>(init_),
static_cast<args_type_&&>(args_));
Expand All @@ -410,9 +423,10 @@ namespace asioexec {
args_type_ args_;
};

template <typename Signatures, typename Receiver, typename Executor>
template <typename Mutex, typename Signatures, typename Receiver, typename Executor>
class executor {
operation_state_base<Signatures, Receiver>& self_;
using operation_state_type_ = operation_state_base<Mutex, Signatures, Receiver>;
operation_state_type_& self_;
Executor ex_;

template <typename F>
Expand All @@ -422,9 +436,7 @@ namespace asioexec {
};
}
public:
constexpr explicit executor(
operation_state_base<Signatures, Receiver>& self,
const Executor& ex) noexcept
constexpr explicit executor(operation_state_type_& self, const Executor& ex) noexcept
: self_(self)
, ex_(ex) {
}
Expand All @@ -443,7 +455,7 @@ namespace asioexec {
}
constexpr decltype(auto) prefer(Args&&... args) const noexcept {
const auto ex = asio_impl::prefer(ex_, static_cast<Args&&>(args)...);
return executor<Signatures, Receiver, std::remove_cvref_t<decltype(ex)>>(self_, ex);
return executor<Mutex, Signatures, Receiver, std::remove_cvref_t<decltype(ex)>>(self_, ex);
}

template <typename... Args>
Expand All @@ -452,7 +464,7 @@ namespace asioexec {
}
constexpr decltype(auto) require(Args&&... args) const noexcept {
const auto ex = asio_impl::require(ex_, static_cast<Args&&>(args)...);
return executor<Signatures, Receiver, std::remove_cvref_t<decltype(ex)>>(self_, ex);
return executor<Mutex, Signatures, Receiver, std::remove_cvref_t<decltype(ex)>>(self_, ex);
}

template <typename T>
Expand Down Expand Up @@ -503,59 +515,78 @@ namespace asioexec {
bool operator!=(const executor& rhs) const = default;
};

template <typename Mutex>
struct t {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please give this a more descriptive name

static constexpr auto as_default_on = ::asioexec::as_default_on<t>;
template <typename IoObject>
using as_default_on_t = ::asioexec::as_default_on_t<t, IoObject>;
};

struct null_basic_lockable {
constexpr void lock() noexcept {
}
constexpr void unlock() noexcept {
}
};

} // namespace detail::completion_token

struct completion_token_t {
static constexpr auto as_default_on = asioexec::as_default_on<completion_token_t>;
template <typename IoObject>
using as_default_on_t = asioexec::as_default_on_t<completion_token_t, IoObject>;
};
using completion_token_t = detail::completion_token::t<std::recursive_mutex>;

inline const completion_token_t completion_token{};

using thread_unsafe_completion_token_t =
detail::completion_token::t<detail::completion_token::null_basic_lockable>;

inline const thread_unsafe_completion_token_t thread_unsafe_completion_token{};

} // namespace asioexec

namespace ASIOEXEC_ASIO_NAMESPACE {

template <typename... Signatures>
struct async_result<::asioexec::completion_token_t, Signatures...> {
template <typename Mutex, typename... Signatures>
struct async_result<::asioexec::detail::completion_token::t<Mutex>, Signatures...> {
template <typename Initiation, typename... Args>
requires(std::is_constructible_v<std::decay_t<Args>, Args> && ...)
static constexpr auto
initiate(Initiation&& i, const ::asioexec::completion_token_t&, Args&&... args) {
static constexpr auto initiate(
Initiation&& i,
const ::asioexec::detail::completion_token::t<Mutex>&,
Args&&... args) {
return ::asioexec::detail::completion_token::sender<
Mutex,
::asioexec::detail::completion_token::completion_signatures<Signatures...>,
std::remove_cvref_t<Initiation>,
Args...
>(static_cast<Initiation&&>(i), static_cast<Args&&>(args)...);
Args...>(static_cast<Initiation&&>(i), static_cast<Args&&>(args)...);
}
};

template <typename Signatures, typename Receiver, typename Executor>
template <typename Mutex, typename Signatures, typename Receiver, typename Executor>
struct associated_executor<
::asioexec::detail::completion_token::completion_handler<Signatures, Receiver>,
Executor
> {
using type = ::asioexec::detail::completion_token::executor<Signatures, Receiver, Executor>;
::asioexec::detail::completion_token::completion_handler<Mutex, Signatures, Receiver>,
Executor> {
using type =
::asioexec::detail::completion_token::executor<Mutex, Signatures, Receiver, Executor>;

static type get(
const ::asioexec::detail::completion_token::completion_handler<Signatures, Receiver>& h,
const ::asioexec::detail::completion_token::completion_handler<Mutex, Signatures, Receiver>&
h,
const Executor& ex) noexcept {
return type(h.state(), ex);
}
};

template <typename Signatures, typename Receiver, typename Allocator>
template <typename Mutex, typename Signatures, typename Receiver, typename Allocator>
requires requires(const Receiver& r) { ::STDEXEC::get_allocator(::STDEXEC::get_env(r)); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
requires requires(const Receiver& r) { ::STDEXEC::get_allocator(::STDEXEC::get_env(r)); }
requires STDEXEC::__callable<STDEXEC::get_allocator_t, STDEXEC::env_of_t<Receiver>>

struct associated_allocator<
::asioexec::detail::completion_token::completion_handler<Signatures, Receiver>,
::asioexec::detail::completion_token::completion_handler<Mutex, Signatures, Receiver>,
Allocator
> {
using type = std::remove_cvref_t<decltype(::STDEXEC::get_allocator(
::STDEXEC::get_env(std::declval<const Receiver&>())))>;

static type get(
const ::asioexec::detail::completion_token::completion_handler<Signatures, Receiver>& h,
const ::asioexec::detail::completion_token::completion_handler<Mutex, Signatures, Receiver>&
h,
const Allocator&) noexcept {
return ::STDEXEC::get_allocator(::STDEXEC::get_env(h.state().r_));
}
Expand Down
Loading
Loading