Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 26 additions & 106 deletions include/exec/__detail/__system_context_replaceability_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,128 +18,48 @@
#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H

#include "../../stdexec/__detail/__execution_fwd.hpp"
#include "../../stdexec/__detail/__system_context_replaceability_api.hpp"

#include <cstddef>
#include <cstdint>
#include <exception>
#include <memory>
#include <optional>
#include <span>

struct __uuid {
std::uint64_t __parts1;
std::uint64_t __parts2;

friend auto operator==(__uuid, __uuid) noexcept -> bool = default;
};

namespace exec::system_context_replaceability {
using STDEXEC::system_context_replaceability::__parallel_scheduler_backend_factory;

/// Helper for the `__queryable_interface` concept.
template <__uuid X>
using __check_constexpr_uuid = void;

/// Concept for a queryable interface. Ensures that the interface has a `__interface_identifier` member.
template <typename _T>
concept __queryable_interface = requires() {
typename __check_constexpr_uuid<_T::__interface_identifier>;
};

/// The details for making `_T` a runtime property.
template <typename _T>
struct __runtime_property_helper {
/// Is `_T` a property?
static constexpr bool __is_property = false;
/// The unique identifier for the property.
static constexpr __uuid __property_identifier{0, 0};
};

/// `inplace_stope_token` is a runtime property.
template <>
struct __runtime_property_helper<STDEXEC::inplace_stop_token> {
static constexpr bool __is_property = true;
static constexpr __uuid __property_identifier{0x8779c09d8aa249df, 0x867db0e653202604};
};

/// Concept for a runtime property.
template <typename _T>
concept __runtime_property = __runtime_property_helper<_T>::__is_property;

struct parallel_scheduler_backend;
/// Interface for the parallel scheduler backend.
using parallel_scheduler_backend [[deprecated(
"Use STDEXEC::system_context_replaceability::parallel_scheduler_backend instead.")]] =
STDEXEC::system_context_replaceability::parallel_scheduler_backend;

/// Get the backend for the parallel scheduler.
/// Users might replace this function.
auto query_parallel_scheduler_backend() -> std::shared_ptr<parallel_scheduler_backend>;

/// The type of a factory that can create `parallel_scheduler_backend` instances.
/// Out of spec.
using __parallel_scheduler_backend_factory = std::shared_ptr<parallel_scheduler_backend> (*)();
[[deprecated(
"Use STDEXEC::system_context_replaceability::query_parallel_scheduler_backend instead.")]]
inline auto query_parallel_scheduler_backend()
-> std::shared_ptr<STDEXEC::system_context_replaceability::parallel_scheduler_backend> {
return STDEXEC::system_context_replaceability::query_parallel_scheduler_backend();
}

/// Set a factory for the parallel scheduler backend.
/// Can be used to replace the parallel scheduler at runtime.
/// Out of spec.
auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
-> __parallel_scheduler_backend_factory;
[[deprecated(
"Use STDEXEC::system_context_replaceability::set_parallel_scheduler_backend instead.")]]
inline auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
-> __parallel_scheduler_backend_factory {
return STDEXEC::system_context_replaceability::set_parallel_scheduler_backend(__new_factory);
}

/// Interface for completing a sender operation. Backend will call frontend though this interface
/// for completing the `schedule` and `schedule_bulk` operations.
struct receiver {
virtual ~receiver() = default;

protected:
virtual auto __query_env(__uuid, void*) noexcept -> bool = 0;

public:
/// Called when the system scheduler completes successfully.
virtual void set_value() noexcept = 0;
/// Called when the system scheduler completes with an error.
virtual void set_error(std::exception_ptr) noexcept = 0;
/// Called when the system scheduler was stopped.
virtual void set_stopped() noexcept = 0;

/// Query the receiver for a property of type `_P`.
template <typename _P>
auto try_query() noexcept -> std::optional<std::decay_t<_P>> {
if constexpr (__runtime_property<_P>) {
std::decay_t<_P> __p;
bool __success =
__query_env(__runtime_property_helper<std::decay_t<_P>>::__property_identifier, &__p);
return __success ? std::make_optional(std::move(__p)) : std::nullopt;
} else {
return std::nullopt;
}
}
};

/// Receiver for bulk sheduling operations.
struct bulk_item_receiver : receiver {
/// Called for each item of a bulk operation, possible on different threads.
virtual void execute(std::uint32_t, std::uint32_t) noexcept = 0;
};

/// Interface for the parallel scheduler backend.
struct parallel_scheduler_backend {
static constexpr __uuid __interface_identifier{0x5ee9202498c4bd4f, 0xa1df2508ffcd9d7e};

virtual ~parallel_scheduler_backend() = default;

/// Schedule work on parallel scheduler, calling `__r` when done and using `__s` for preallocated
/// memory.
virtual void schedule(std::span<std::byte> __s, receiver& __r) noexcept = 0;
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for different
/// subranges of [0, __n), and using `__s` for preallocated memory.
virtual void schedule_bulk_chunked(
std::uint32_t __n,
std::span<std::byte> __s,
bulk_item_receiver& __r) noexcept = 0;
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item, and
/// using `__s` for preallocated memory.
virtual void schedule_bulk_unchunked(
std::uint32_t __n,
std::span<std::byte> __s,
bulk_item_receiver& __r) noexcept = 0;
};

using receiver
[[deprecated("Use STDEXEC::system_context_replaceability::receiver_proxy instead.")]] =
STDEXEC::system_context_replaceability::receiver_proxy;

/// Receiver for bulk scheduling operations.
using bulk_item_receiver [[deprecated(
"Use STDEXEC::system_context_replaceability::bulk_item_receiver_proxy instead.")]] =
STDEXEC::system_context_replaceability::bulk_item_receiver_proxy;
} // namespace exec::system_context_replaceability

#endif
100 changes: 63 additions & 37 deletions include/exec/system_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
*/
#pragma once

#include <utility>

#include "../stdexec/execution.hpp"
#include "__detail/__system_context_replaceability_api.hpp"

#include <optional>
#include <utility>

#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE
# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 72
#endif
Expand All @@ -43,48 +44,61 @@ namespace exec {
namespace detail {
/// Allows a frontend receiver of type `_Rcvr` to be passed to the backend.
template <class _Rcvr>
struct __receiver_adapter : system_context_replaceability::receiver {
struct __receiver_adapter : STDEXEC::system_context_replaceability::receiver_proxy {
explicit __receiver_adapter(_Rcvr&& __rcvr)
: __rcvr_{std::forward<_Rcvr>(__rcvr)} {
}

auto __query_env(__uuid __id, void* __dest) noexcept -> bool override {
using system_context_replaceability::__runtime_property_helper;
using __StopToken = decltype(STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_)));
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __StopToken>) {
if (__id == __runtime_property_helper<STDEXEC::inplace_stop_token>::__property_identifier) {
*static_cast<STDEXEC::inplace_stop_token*>(__dest) = STDEXEC::get_stop_token(
STDEXEC::get_env(__rcvr_));
return true;
}
}
return false;
: __rcvr_{static_cast<_Rcvr&&>(__rcvr)} {
}

void set_value() noexcept override {
STDEXEC::set_value(std::forward<_Rcvr>(__rcvr_));
}

void set_error(std::exception_ptr __ex) noexcept override {
void set_error(std::exception_ptr&& __ex) noexcept override {
STDEXEC::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex));
}

void set_stopped() noexcept override {
STDEXEC::set_stopped(std::forward<_Rcvr>(__rcvr_));
}

protected:
void __query_env(
STDEXEC::__type_index __query_type,
STDEXEC::__type_index __value_type,
void* __dest) const noexcept override {
if (__query_type == STDEXEC::__mtypeid<STDEXEC::get_stop_token_t>) {
__query(STDEXEC::get_stop_token, __value_type, __dest);
}
}

private:
void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest)
const noexcept {
using __stop_token_t = STDEXEC::stop_token_of_t<STDEXEC::env_of_t<_Rcvr>>;
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __stop_token_t>) {
if (__value_type == STDEXEC::__mtypeid<STDEXEC::inplace_stop_token>) {
using __dest_t = std::optional<STDEXEC::inplace_stop_token>;
*static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_));
}
}
}

public:
STDEXEC_ATTRIBUTE(no_unique_address)
_Rcvr __rcvr_;
};

/// The type large enough to store the data produced by a sender.
/// BUGBUG: this seems wrong. i think this should be a variant of tuples of possible
/// results.
template <class _Sender>
using __sender_data_t = decltype(STDEXEC::sync_wait(std::declval<_Sender>()).value());

} // namespace detail

class parallel_scheduler;
class __parallel_sender;

template <bool, STDEXEC::sender _S, std::integral _Size, class _Fn, bool>
class __parallel_bulk_sender;

Expand All @@ -106,7 +120,7 @@ namespace exec {

namespace detail {
using __backend_ptr =
std::shared_ptr<system_context_replaceability::parallel_scheduler_backend>;
std::shared_ptr<STDEXEC::system_context_replaceability::parallel_scheduler_backend>;

template <class T>
auto __make_parallel_scheduler_from(T, __backend_ptr) noexcept;
Expand Down Expand Up @@ -199,7 +213,7 @@ namespace exec {
auto& __scheduler_impl = __preallocated_.__as<__backend_ptr>();
auto __impl = std::move(__scheduler_impl);
std::destroy_at(&__scheduler_impl);
__impl->schedule(__preallocated_.__as_storage(), __rcvr_);
__impl->schedule(__rcvr_, __preallocated_.__as_storage());
}

/// Object that receives completion from the work described by the sender.
Expand Down Expand Up @@ -312,7 +326,8 @@ namespace exec {
/// This represents the base class that abstracts the storage of the values sent by the previous sender.
/// Derived class will properly implement the receiver methods.
template <class _Previous>
struct __forward_args_receiver : system_context_replaceability::bulk_item_receiver {
struct __forward_args_receiver
: STDEXEC::system_context_replaceability::bulk_item_receiver_proxy {
using __storage_t = detail::__sender_data_t<_Previous>;

/// Storage for the arguments received from the previous sender.
Expand All @@ -329,24 +344,11 @@ namespace exec {
/// Stores `__as` in the base class storage, with the right types.
explicit __typed_forward_args_receiver(_As&&... __as) {
static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_));
// BUGBUG: this seems wrong. we are not ever destroying this tuple.
new (__base_t::__arguments_data_)
std::tuple<STDEXEC::__decay_t<_As>...>{std::move(__as)...};
}

auto __query_env(__uuid __id, void* __dest) noexcept -> bool override {
auto __state = reinterpret_cast<_BulkState*>(this);
using system_context_replaceability::__runtime_property_helper;
using __StopToken = decltype(STDEXEC::get_stop_token(STDEXEC::get_env(__state->__rcvr_)));
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __StopToken>) {
if (__id == __runtime_property_helper<STDEXEC::inplace_stop_token>::__property_identifier) {
*static_cast<STDEXEC::inplace_stop_token*>(__dest) = STDEXEC::get_stop_token(
STDEXEC::get_env(__state->__rcvr_));
return true;
}
}
return false;
}

/// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender.
void set_value() noexcept override {
auto __state = reinterpret_cast<_BulkState*>(this);
Expand Down Expand Up @@ -396,6 +398,30 @@ namespace exec {
*reinterpret_cast<std::tuple<_As...>*>(__base_t::__arguments_data_));
}
}

protected:
void __query_env(
STDEXEC::__type_index __query_type,
STDEXEC::__type_index __value_type,
void* __dest) const noexcept override {
if (__query_type == STDEXEC::__mtypeid<STDEXEC::get_stop_token_t>) {
__query(STDEXEC::get_stop_token, __value_type, __dest);
}
}

private:
void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest)
const noexcept {
auto __state = reinterpret_cast<const _BulkState*>(this);
using __stop_token_t = STDEXEC::stop_token_of_t<STDEXEC::env_of_t<__rcvr_t>>;
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __stop_token_t>) {
using __dest_t = std::optional<STDEXEC::inplace_stop_token>;
if (__value_type == STDEXEC::__mtypeid<STDEXEC::inplace_stop_token>) {
*static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token(
STDEXEC::get_env(__state->__rcvr_));
}
}
}
};

/// The state needed to execute the bulk sender created from system context, minus the preallocates space.
Expand Down Expand Up @@ -645,7 +671,7 @@ namespace exec {
};

inline auto get_parallel_scheduler() -> parallel_scheduler {
auto __impl = system_context_replaceability::query_parallel_scheduler_backend();
auto __impl = STDEXEC::system_context_replaceability::query_parallel_scheduler_backend();
if (!__impl) {
STDEXEC_THROW(std::runtime_error{"No system context implementation found"});
}
Expand Down Expand Up @@ -728,5 +754,5 @@ namespace exec {

#if defined(STDEXEC_SYSTEM_CONTEXT_HEADER_ONLY)
# define STDEXEC_SYSTEM_CONTEXT_INLINE inline
# include "__detail/__system_context_default_impl_entry.hpp"
# include "../stdexec/__detail/__system_context_default_impl_entry.hpp"
#endif
Loading
Loading