diff --git a/CMakeLists.txt b/CMakeLists.txt index 9ff54d0cc..a47b66424 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -414,7 +414,7 @@ set_target_properties(system_context PROPERTIES CXX_STANDARD_REQUIRED ON CXX_EXTENSIONS OFF) target_compile_options(system_context PUBLIC - $<$:/Zc:__cplusplus /Zc:preprocessor> + $<$:/Zc:__cplusplus /Zc:preprocessor /Zc:externConstexpr> ) add_library(STDEXEC::system_context ALIAS system_context) target_link_libraries(system_context PUBLIC stdexec) diff --git a/include/exec/__detail/__system_context_replaceability_api.hpp b/include/exec/__detail/__system_context_replaceability_api.hpp index 752c45501..77b8072a6 100644 --- a/include/exec/__detail/__system_context_replaceability_api.hpp +++ b/include/exec/__detail/__system_context_replaceability_api.hpp @@ -24,7 +24,7 @@ #include namespace exec::system_context_replaceability { - using STDEXEC::system_context_replaceability::__parallel_scheduler_backend_factory; + using STDEXEC::system_context_replaceability::__parallel_scheduler_backend_factory_t; /// Interface for the parallel scheduler backend. using parallel_scheduler_backend [[deprecated( @@ -40,15 +40,20 @@ namespace exec::system_context_replaceability { return STDEXEC::system_context_replaceability::query_parallel_scheduler_backend(); } + STDEXEC_PRAGMA_PUSH() + STDEXEC_PRAGMA_IGNORE_GNU("-Wdeprecated-declarations") + STDEXEC_PRAGMA_IGNORE_MSVC(4996) // warning C4996: 'function': was declared deprecated + STDEXEC_PRAGMA_IGNORE_EDG(deprecated_entity) /// Set a factory for the parallel scheduler backend. /// Can be used to replace the parallel scheduler at runtime. /// Out of spec. [[deprecated( "Use STDEXEC::system_context_replaceability::set_parallel_scheduler_backend instead.")]] - inline auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory) - -> __parallel_scheduler_backend_factory { + inline auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory_t __new_factory) + -> __parallel_scheduler_backend_factory_t { return STDEXEC::system_context_replaceability::set_parallel_scheduler_backend(__new_factory); } + STDEXEC_PRAGMA_POP() /// Interface for completing a sender operation. Backend will call frontend though this interface /// for completing the `schedule` and `schedule_bulk` operations. diff --git a/include/exec/system_context.hpp b/include/exec/system_context.hpp index 8708ea30f..d3b609d7f 100644 --- a/include/exec/system_context.hpp +++ b/include/exec/system_context.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Lee Howes, Lucian Radu Teodorescu + * Copyright (c) 2026 NVIDIA Corporation * * Licensed under the Apache License Version 2.0 with LLVM Exceptions * (the "License"); you may not use this file except in compliance with @@ -15,744 +15,25 @@ */ #pragma once -#include "../stdexec/execution.hpp" -#include "__detail/__system_context_replaceability_api.hpp" +#include "../stdexec/__detail/__execution_fwd.hpp" -#include -#include +#include "../stdexec/__detail/__parallel_scheduler.hpp" // IWYU pragma: export +#include "__detail/__system_context_replaceability_api.hpp" // IWYU pragma: export -#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE -# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 72 +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: The header is deprecated. Please include instead.") +#else +# warning \ + "The header is deprecated. Please include instead." #endif -#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN -# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN 8 -#endif -#ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE -# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE 152 -#endif -#ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN -# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN 8 -#endif - -// TODO: make these configurable by providing policy to the system context namespace exec { - struct CANNOT_DISPATCH_THE_BULK_ALGORITHM_TO_THE_PARALLEL_SCHEDULER; - struct BECAUSE_THERE_IS_NO_PARALLEL_SCHEDULER_IN_THE_ENVIRONMENT; - struct ADD_A_CONTINUES_ON_TRANSITION_TO_THE_PARALLEL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM; - - namespace detail { - /// Allows a frontend receiver of type `_Rcvr` to be passed to the backend. - template - struct __receiver_adapter : STDEXEC::system_context_replaceability::receiver_proxy { - explicit __receiver_adapter(_Rcvr&& __rcvr) - : __rcvr_{static_cast<_Rcvr&&>(__rcvr)} { - } - - void set_value() noexcept override { - STDEXEC::set_value(std::forward<_Rcvr>(__rcvr_)); - } - - void set_error(std::exception_ptr&& __ex) noexcept override { - STDEXEC::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex)); - } - - void set_stopped() noexcept override { - STDEXEC::set_stopped(std::forward<_Rcvr>(__rcvr_)); - } - - protected: - void __query_env( - STDEXEC::__type_index __query_type, - STDEXEC::__type_index __value_type, - void* __dest) const noexcept override { - if (__query_type == STDEXEC::__mtypeid) { - __query(STDEXEC::get_stop_token, __value_type, __dest); - } - } - - private: - void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest) - const noexcept { - using __stop_token_t = STDEXEC::stop_token_of_t>; - if constexpr (std::is_same_v) { - if (__value_type == STDEXEC::__mtypeid) { - using __dest_t = std::optional; - *static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_)); - } - } - } - - public: - STDEXEC_ATTRIBUTE(no_unique_address) - _Rcvr __rcvr_; - }; - - /// The type large enough to store the data produced by a sender. - /// BUGBUG: this seems wrong. i think this should be a variant of tuples of possible - /// results. - template - using __sender_data_t = decltype(STDEXEC::sync_wait(std::declval<_Sender>()).value()); - - } // namespace detail - - class parallel_scheduler; - class __parallel_sender; - - template - class __parallel_bulk_sender; - - /// Returns a scheduler that can add work to the underlying execution context. - auto get_parallel_scheduler() -> parallel_scheduler; - - /// Concept that matches `bulk_chunked` and `bulk_unchunked` senders. - template - concept __bulk_chunked_or_unchunked = - STDEXEC::sender_expr_for<_Sender, STDEXEC::bulk_chunked_t> - || STDEXEC::sender_expr_for<_Sender, STDEXEC::bulk_unchunked_t>; - - /// The execution domain of the parallel_scheduler, used for the purposes of customizing - /// sender algorithms such as `bulk_chunked` and `bulk_unchunked`. - struct __parallel_scheduler_domain : STDEXEC::default_domain { - template <__bulk_chunked_or_unchunked _Sender, class _Env> - auto transform_sender(STDEXEC::set_value_t, _Sender&& __sndr, const _Env& __env) const noexcept; - }; - - namespace detail { - using __backend_ptr = - std::shared_ptr; - - template - auto __make_parallel_scheduler_from(T, __backend_ptr) noexcept; - - /// Describes the environment of this sender. - struct __parallel_scheduler_env { - /// Returns the system scheduler as the completion scheduler for `set_value_t`. - template _Tag> - [[nodiscard]] - auto query(STDEXEC::get_completion_scheduler_t<_Tag>) const noexcept { - return detail::__make_parallel_scheduler_from(_Tag(), __scheduler_); - } - - /// The underlying implementation of the scheduler we are using. - __backend_ptr __scheduler_; - }; - - template - struct __aligned_storage { - alignas(_Align) unsigned char __data_[_Size]; - - auto __as_storage() noexcept -> std::span { - return {reinterpret_cast(__data_), _Size}; - } - - template - auto __as() noexcept -> _T& { - static_assert(alignof(_T) <= _Align); - return *reinterpret_cast<_T*>(__data_); - } - - auto __as_ptr() noexcept -> void* { - return __data_; - } - }; - - /* - Storage needed for a frontend operation-state: - - schedule: - - __receiver_adapter::__vtable -- 8 - - __receiver_adapter::__rcvr_ (Rcvr) -- assuming 0 - - __system_op::__preallocated_ (__preallocated) -- 72 - --------------------- - Total: 80; extra 8 bytes compared to backend needs. - - for bulk: - - __bulk_state_base::__fun_ (_Fn) -- 0 (assuming empty function) - - __bulk_state_base::__rcvr_ (_Rcvr) -- 0 (assuming empty receiver) - - __forward_args_receiver::__vtable -- 8 - - __forward_args_receiver::__arguments_data_ (array of bytes) -- 8 (depending on previous sender) - - __bulk_state_base::__prepare_storage_for_backend (fun ptr) -- 8 - - __bulk_state_base::__size_ (_Size) -- 4 - - __bulk_state::__preallocated_ (__preallocated_) -- 152 - - __previous_operation_state_ (__inner_op_state) -- 104 - - __bulk_intermediate_receiver::__state_ (__state_&) -- 8 - - __bulk_intermediate_receiver::__scheduler_ (parallel_scheduler*) -- 8 - --------------------- - Total: 176; extra 24 bytes compared to backend needs. - - [*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations. - */ - - /// The operation state used to execute the work described by this sender. - template - struct __system_op { - /// Constructs `this` from `__rcvr` and `__scheduler_impl`. - __system_op(_Rcvr&& __rcvr, __backend_ptr __scheduler_impl) - : __rcvr_{std::forward<_Rcvr>(__rcvr)} { - // Before the operation starts, we store the scheduelr implementation in __preallocated_. - // After the operation starts, we don't need this pointer anymore, and the storage can be used by the backend - auto* __p = &__preallocated_.__as<__backend_ptr>(); - std::construct_at(__p, std::move(__scheduler_impl)); - } - - ~__system_op() = default; - - __system_op(const __system_op&) = delete; - __system_op(__system_op&&) = delete; - auto operator=(const __system_op&) -> __system_op& = delete; - auto operator=(__system_op&&) -> __system_op& = delete; - - /// Starts the work stored in `this`. - void start() & noexcept { - auto st = STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_.__rcvr_)); - if (st.stop_requested()) { - STDEXEC::set_stopped(__rcvr_); - return; - } - auto& __scheduler_impl = __preallocated_.__as<__backend_ptr>(); - auto __impl = std::move(__scheduler_impl); - std::destroy_at(&__scheduler_impl); - __impl->schedule(__rcvr_, __preallocated_.__as_storage()); - } - - /// Object that receives completion from the work described by the sender. - __receiver_adapter<_Rcvr> __rcvr_; - - /// Preallocated space for storing the operation state on the implementation size. - /// We also store here the backend interface for the scheduler before we actually start the operation. - __aligned_storage< - STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE, - STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN - > - __preallocated_; - }; - } // namespace detail - - /// The sender used to schedule new work in the system context. - class __parallel_sender { - public: - /// Marks this type as being a sender; not to spec. - using sender_concept = STDEXEC::sender_t; - /// Declares the completion signals sent by `this`. - using completion_signatures = STDEXEC::completion_signatures< - STDEXEC::set_value_t(), - STDEXEC::set_stopped_t(), - STDEXEC::set_error_t(std::exception_ptr) - >; - - /// Implementation detail. Constructs the sender to wrap `__impl`. - explicit __parallel_sender(detail::__backend_ptr __impl) - : __scheduler_{std::move(__impl)} { - } - - /// Gets the environment of this sender. - [[nodiscard]] - auto get_env() const noexcept -> detail::__parallel_scheduler_env { - return {__scheduler_}; - } - - /// Value completion happens on the parallel scheduler. - [[nodiscard]] - auto query(STDEXEC::get_completion_scheduler_t) const noexcept - -> parallel_scheduler; - - /// Connects `__self` to `__rcvr`, returning the operation state containing the work to be done. - template - auto connect(_Rcvr __rcvr) && noexcept(STDEXEC::__nothrow_move_constructible<_Rcvr>) - -> detail::__system_op<__parallel_sender, _Rcvr> { - return {std::move(__rcvr), std::move(__scheduler_)}; - } - - template - auto connect(_Rcvr __rcvr) & noexcept(STDEXEC::__nothrow_move_constructible<_Rcvr>) - -> detail::__system_op<__parallel_sender, _Rcvr> { - return {std::move(__rcvr), __scheduler_}; - } - - private: - /// The underlying implementation of the system scheduler. - detail::__backend_ptr __scheduler_; - }; - - /// A scheduler that can add work to the system context. - class parallel_scheduler { - public: - parallel_scheduler() = delete; - - /// Returns `true` iff `*this` refers to the same scheduler as the argument. - auto operator==(const parallel_scheduler&) const noexcept -> bool = default; - - /// Implementation detail. Constructs the scheduler to wrap `__impl`. - explicit parallel_scheduler(detail::__backend_ptr&& __impl) - : __impl_(__impl) { - } - - /// Returns the forward progress guarantee of `this`. - [[nodiscard]] - auto query(STDEXEC::get_forward_progress_guarantee_t) const noexcept - -> STDEXEC::forward_progress_guarantee; - - /// Returns the execution domain of `this`. - [[nodiscard]] - auto query(STDEXEC::get_domain_t) const noexcept -> __parallel_scheduler_domain { - return {}; - } - - /// Schedules new work, returning the sender that signals the start of the work. - [[nodiscard]] - auto schedule() const noexcept -> __parallel_sender { - return __parallel_sender{__impl_}; - } - - private: - template - friend class __parallel_bulk_sender; - - /// The underlying implementation of the scheduler. - detail::__backend_ptr __impl_; - }; - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // bulk - - namespace detail { - template - auto __make_parallel_scheduler_from(T, __backend_ptr __impl) noexcept { - return parallel_scheduler{std::move(__impl)}; - } - - /// Helper that knows how to store the values sent by `_Previous` and pass them to bulk item calls or to the completion signal. - /// This represents the base class that abstracts the storage of the values sent by the previous sender. - /// Derived class will properly implement the receiver methods. - template - struct __forward_args_receiver - : STDEXEC::system_context_replaceability::bulk_item_receiver_proxy { - using __storage_t = detail::__sender_data_t<_Previous>; - - /// Storage for the arguments received from the previous sender. - alignas(__storage_t) unsigned char __arguments_data_[sizeof(__storage_t)]; - }; + using parallel_scheduler + [[deprecated("Please use stdexec::parallel_scheduler instead")]] = STDEXEC::parallel_scheduler; - /// Derived class that properly forwards the arguments received from `_Previous` to the receiver methods. - /// Uses the storage defined in the base class. No extra data is added here. - template - struct __typed_forward_args_receiver : __forward_args_receiver<_Previous> { - using __base_t = __forward_args_receiver<_Previous>; - using __rcvr_t = _BulkState::__rcvr_t; - - /// Stores `__as` in the base class storage, with the right types. - explicit __typed_forward_args_receiver(_As&&... __as) { - static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_)); - // BUGBUG: this seems wrong. we are not ever destroying this tuple. - new (__base_t::__arguments_data_) - std::tuple...>{std::move(__as)...}; - } - - /// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender. - void set_value() noexcept override { - auto __state = reinterpret_cast<_BulkState*>(this); - std::apply( - [&](auto&&... __args) { - STDEXEC::set_value( - std::forward<__rcvr_t>(__state->__rcvr_), std::forward<_As>(__args)...); - }, - *reinterpret_cast*>(__base_t::__arguments_data_)); - } - - /// Calls `set_error()` on the final receiver of the bulk operation, passing `__ex`. - void set_error(std::exception_ptr __ex) noexcept override { - auto __state = reinterpret_cast<_BulkState*>(this); - STDEXEC::set_error(std::forward<__rcvr_t>(__state->__rcvr_), std::move(__ex)); - } - - /// Calls `set_stopped()` on the final receiver of the bulk operation. - void set_stopped() noexcept override { - auto __state = reinterpret_cast<_BulkState*>(this); - STDEXEC::set_stopped(std::forward<__rcvr_t>(__state->__rcvr_)); - } - - /// Calls the bulk functor passing `__index` and the values from the previous sender. - void execute(uint32_t __begin, uint32_t __end) noexcept override { - auto __state = reinterpret_cast<_BulkState*>(this); - if constexpr (_BulkState::__is_unchunked) { - (void) __end; // not used - // If we are not parallelizing, we need to run all the iterations sequentially. - uint32_t __increments = 1; - if constexpr (!_BulkState::__parallelize) { - __increments = static_cast(__state->__size_); - } - for (uint32_t __i = __begin; __i < __begin + __increments; __i++) { - std::apply( - [&](auto&&... __args) { __state->__fun_(__i, __args...); }, - *reinterpret_cast*>(__base_t::__arguments_data_)); - } - } else { - // If we are not parallelizing, we need to pass the entire range to the functor. - if constexpr (!_BulkState::__parallelize) { - __begin = 0; - __end = static_cast(__state->__size_); - } - std::apply( - [&](auto&&... __args) { __state->__fun_(__begin, __end, __args...); }, - *reinterpret_cast*>(__base_t::__arguments_data_)); - } - } - - protected: - void __query_env( - STDEXEC::__type_index __query_type, - STDEXEC::__type_index __value_type, - void* __dest) const noexcept override { - if (__query_type == STDEXEC::__mtypeid) { - __query(STDEXEC::get_stop_token, __value_type, __dest); - } - } - - private: - void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest) - const noexcept { - auto __state = reinterpret_cast(this); - using __stop_token_t = STDEXEC::stop_token_of_t>; - if constexpr (std::is_same_v) { - using __dest_t = std::optional; - if (__value_type == STDEXEC::__mtypeid) { - *static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token( - STDEXEC::get_env(__state->__rcvr_)); - } - } - } - }; - - /// The state needed to execute the bulk sender created from system context, minus the preallocates space. - /// The preallocated space is obtained by calling the `__prepare_storage_for_backend` function pointer. - template < - STDEXEC::sender _Previous, - std::integral _Size, - class _Fn, - class _Rcvr, - bool _IsUnchunked, - bool _Parallelize - > - struct __bulk_state_base { - using __rcvr_t = _Rcvr; - using __forward_args_helper_t = __forward_args_receiver<_Previous>; - static constexpr bool __is_unchunked = _IsUnchunked; - static constexpr bool __parallelize = _Parallelize; - - /// Storage for the arguments and the helper needed to pass the arguments from the previous bulk sender to the bulk functor and receiver. - /// Needs to be the first member, to easier the convertion between `__forward_args_helper_` and `this`. - alignas(__forward_args_helper_t) unsigned char __forward_args_helper_[sizeof( - __forward_args_helper_t)]{}; - - /// The function to be executed to perform the bulk work. - STDEXEC_ATTRIBUTE(no_unique_address) - _Fn __fun_; - /// The receiver object that receives completion from the work described by the sender. - STDEXEC_ATTRIBUTE(no_unique_address) - _Rcvr __rcvr_; - - /// Function that prepares the preallocated storage for calling the backend. - std::span (*__prepare_storage_for_backend)(__bulk_state_base*){nullptr}; - /// The size of the bulk operation. - _Size __size_; - - __bulk_state_base(_Fn&& __fun, _Rcvr&& __rcvr, _Size __size) - : __fun_{std::move(__fun)} - , __rcvr_{std::move(__rcvr)} - , __size_{__size} { - } - }; - - /// Receiver that is used in "bulk" to connect to the input sender of the bulk operation. - template - struct __bulk_intermediate_receiver { - /// Declare that this is a `receiver`. - using receiver_concept = STDEXEC::receiver_t; - - /// Object that holds the relevant data for the entire bulk operation. - _BulkState& __state_; - /// The underlying implementation of the scheduler we are using. - __backend_ptr __scheduler_{nullptr}; - - template - void set_value(_As&&... __as) noexcept { - auto st = STDEXEC::get_stop_token(STDEXEC::get_env(__state_.__rcvr_)); - if (st.stop_requested()) { - STDEXEC::set_stopped(__state_.__rcvr_); - return; - } - - // Store the input data in the shared state. - using __typed_forward_args_receiver_t = - __typed_forward_args_receiver<_Previous, _BulkState, _As...>; - auto __r = new (&__state_.__forward_args_helper_) - __typed_forward_args_receiver_t(std::forward<_As>(__as)...); - - auto __scheduler = __scheduler_; - auto __size = static_cast(__state_.__size_); - - auto __storage = __state_.__prepare_storage_for_backend(&__state_); - // This might destroy the `this` object. - - // Schedule the bulk work on the system scheduler. - // This will invoke `execute` on our receiver multiple times, and then a completion signal (e.g., `set_value`). - if constexpr (_BulkState::__is_unchunked) { - __scheduler - ->schedule_bulk_unchunked(_BulkState::__parallelize ? __size : 1, __storage, *__r); - } else { - __scheduler - ->schedule_bulk_chunked(_BulkState::__parallelize ? __size : 1, __storage, *__r); - } - } - - /// Invoked when the previous sender completes with "stopped" to stop the entire work. - void set_stopped() noexcept { - STDEXEC::set_stopped(std::move(__state_.__rcvr_)); - } - - /// Invoked when the previous sender completes with error to forward the error to the connected receiver. - template - void set_error(__E __e) noexcept { - STDEXEC::set_error(std::move(__state_.__rcvr_), std::move(__e)); - } - - /// Gets the environment of this receiver; returns the environment of the connected receiver. - [[nodiscard]] - auto get_env() const noexcept -> decltype(auto) { - return STDEXEC::get_env(__state_.__rcvr_); - } - }; - - /// The operation state object for the system bulk sender. - template < - bool _IsUnchunked, - STDEXEC::sender _Previous, - std::integral _Size, - class _Fn, - class _Rcvr, - bool _Parallelize - > - struct __system_bulk_op - : __bulk_state_base<_Previous, _Size, _Fn, _Rcvr, _IsUnchunked, _Parallelize> { - - /// The type that holds the state of the bulk operation. - using __bulk_state_base_t = - __bulk_state_base<_Previous, _Size, _Fn, _Rcvr, _IsUnchunked, _Parallelize>; - - /// The type of the receiver that will be connected to the previous sender. - using __intermediate_receiver_t = - __bulk_intermediate_receiver<__bulk_state_base_t, _Previous>; - - /// The type of inner operation state, which is the result of connecting the previous sender to the bulk intermediate receiver. - using __inner_op_state = STDEXEC::connect_result_t<_Previous, __intermediate_receiver_t>; - - static constexpr size_t _PreallocatedSize = - (std::max) (size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE), sizeof(__inner_op_state)); - static constexpr size_t _PreallocatedAlign = - (std::max) (size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN), - alignof(__inner_op_state)); - - /// Preallocated space for storing the inner operation state, and then storage space for the backend call. - __aligned_storage<_PreallocatedSize, _PreallocatedAlign> __preallocated_; - - /// Destroys the inner operation state object, and returns the preallocated storage for it to be used by the backend. - static auto - __prepare_storage_for_backend_impl(__bulk_state_base_t* __base) -> std::span { - auto* __self = static_cast<__system_bulk_op*>(__base); - // We don't need anymore the storage for the previous operation state. - __self->__preallocated_.template __as<__inner_op_state>().~__inner_op_state(); - // Reuse the preallocated storage for the backend. - return __self->__preallocated_.__as_storage(); - } - - /// Constructs `this` from `__snd` and `__rcvr`, using the object returned by `__initFunc` to start the operation. - /// - /// Using a functor to initialize the operation state allows the use of `this` to get the - /// underlying implementation object. - /// - /// `_Snd` is a `__parallel_bulk_sender`. - template - __system_bulk_op(_Snd&& __snd, _Rcvr&& __rcvr, _InitF&& __initFunc) - : __bulk_state_base_t{std::move(__snd.__fun_), std::move(__rcvr), __snd.__size_} { - // Write the function that prepares the storage for the backend. - __bulk_state_base_t::__prepare_storage_for_backend = - &__system_bulk_op::__prepare_storage_for_backend_impl; - - // Start using the preallocated buffer to store the inner operation state. - new (__preallocated_.__as_ptr()) __inner_op_state(__initFunc(*this)); - } - - __system_bulk_op(const __system_bulk_op&) = delete; - __system_bulk_op(__system_bulk_op&&) = delete; - auto operator=(const __system_bulk_op&) -> __system_bulk_op& = delete; - auto operator=(__system_bulk_op&&) -> __system_bulk_op& = delete; - - /// Starts the work stored in `*this`. - void start() & noexcept { - // Start previous operation state. - // Bulk operation will be started when the previous sender completes. - STDEXEC::start(__preallocated_.template __as<__inner_op_state>()); - } - }; - } // namespace detail - - /// The sender used to schedule bulk work in the system context. - template < - bool _IsUnchunked, - STDEXEC::sender _Previous, - std::integral _Size, - class _Fn, - bool _Parallelize - > - class __parallel_bulk_sender { - /// Meta-function that returns the completion signatures of `this`. - template - using __completions_t = STDEXEC::transform_completion_signatures< - STDEXEC::__completion_signatures_of_t, _Env...>, - STDEXEC::completion_signatures - >; - - template - friend struct detail::__system_bulk_op; - - public: - /// Marks this type as being a sender - using sender_concept = STDEXEC::sender_t; - - /// Constructs `this`. - __parallel_bulk_sender( - parallel_scheduler __sched, - _Previous __previous, - _Size __size, - _Fn&& __fun) - : __scheduler_{__sched.__impl_} - , __previous_{std::move(__previous)} - , __size_{std::move(__size)} - , __fun_{std::move(__fun)} { - } - - /// Gets the environment of this sender. - [[nodiscard]] - auto get_env() const noexcept -> detail::__parallel_scheduler_env { - return {__scheduler_}; - } - - /// Connects `__self` to `__rcvr`, returning the operation state containing the work to be done. - template - auto connect(_Rcvr __rcvr) && noexcept(STDEXEC::__nothrow_move_constructible<_Rcvr>) - -> detail::__system_bulk_op<_IsUnchunked, _Previous, _Size, _Fn, _Rcvr, _Parallelize> { - using __res_t = - detail::__system_bulk_op<_IsUnchunked, _Previous, _Size, _Fn, _Rcvr, _Parallelize>; - using __receiver_t = __res_t::__intermediate_receiver_t; - return {std::move(*this), std::move(__rcvr), [this](auto& __op) { - // Connect bulk input receiver with the previous operation and store in the operating state. - return STDEXEC::connect( - std::move(this->__previous_), __receiver_t{__op, std::move(this->__scheduler_)}); - }}; - } - - /// Gets the completion signatures for this sender. - template _Self, class... _Env> - static consteval auto get_completion_signatures() -> __completions_t<_Self, _Env...> { - return {}; - } - - private: - /// The underlying implementation of the scheduler we are using. - detail::__backend_ptr __scheduler_{nullptr}; - /// The previous sender, the one that produces the input value for the bulk function. - _Previous __previous_; - /// The size of the bulk operation. - _Size __size_; - /// The function to be executed to perform the bulk work. - STDEXEC_ATTRIBUTE(no_unique_address) - _Fn __fun_; - }; - - inline auto get_parallel_scheduler() -> parallel_scheduler { - auto __impl = STDEXEC::system_context_replaceability::query_parallel_scheduler_backend(); - if (!__impl) { - STDEXEC_THROW(std::runtime_error{"No system context implementation found"}); - } - return parallel_scheduler{std::move(__impl)}; - } - - [[deprecated("get_system_scheduler has been renamed get_parallel_scheduler")]] - inline auto get_system_scheduler() -> parallel_scheduler { - return get_parallel_scheduler(); - } - - inline auto __parallel_sender::query(STDEXEC::get_completion_scheduler_t) - const noexcept -> parallel_scheduler { - return detail::__make_parallel_scheduler_from(STDEXEC::set_value_t{}, __scheduler_); - } - - inline auto parallel_scheduler::query(STDEXEC::get_forward_progress_guarantee_t) const noexcept - -> STDEXEC::forward_progress_guarantee { - return STDEXEC::forward_progress_guarantee::parallel; - } - - struct __transform_parallel_bulk_sender { - template - auto - operator()(STDEXEC::bulk_chunked_t, _Data&& __data, _Previous&& __previous) const noexcept { - auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data); - using __policy_t = std::remove_cvref_t; - constexpr bool __parallelize = std::same_as<__policy_t, STDEXEC::parallel_policy> - || std::same_as<__policy_t, STDEXEC::parallel_unsequenced_policy>; - return __parallel_bulk_sender< - false, - _Previous, - decltype(__shape), - decltype(__fn), - __parallelize - >{__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)}; - } - - template - auto - operator()(STDEXEC::bulk_unchunked_t, _Data&& __data, _Previous&& __previous) const noexcept { - auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data); - using __policy_t = std::remove_cvref_t; - constexpr bool __parallelize = std::same_as<__policy_t, STDEXEC::parallel_policy> - || std::same_as<__policy_t, STDEXEC::parallel_unsequenced_policy>; - return __parallel_bulk_sender< - true, - _Previous, - decltype(__shape), - decltype(__fn), - __parallelize - >{__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)}; - } - - parallel_scheduler __sched_; - }; - - template <__bulk_chunked_or_unchunked _Sender, class _Env> - auto __parallel_scheduler_domain::transform_sender( - STDEXEC::set_value_t, - _Sender&& __sndr, - const _Env& __env) const noexcept { - if constexpr (STDEXEC::__completes_on<_Sender, parallel_scheduler, _Env>) { - auto __sched = STDEXEC::get_scheduler(__env); - return STDEXEC::__apply( - __transform_parallel_bulk_sender{__sched}, static_cast<_Sender&&>(__sndr)); - } else { - return STDEXEC::__not_a_sender< - STDEXEC::_WHAT_(CANNOT_DISPATCH_THE_BULK_ALGORITHM_TO_THE_PARALLEL_SCHEDULER), - STDEXEC::_WHY_(BECAUSE_THERE_IS_NO_PARALLEL_SCHEDULER_IN_THE_ENVIRONMENT), - STDEXEC::_WHERE_(STDEXEC::_IN_ALGORITHM_, STDEXEC::tag_of_t<_Sender>), - STDEXEC::_TO_FIX_THIS_ERROR_( - ADD_A_CONTINUES_ON_TRANSITION_TO_THE_PARALLEL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM), - STDEXEC::_WITH_PRETTY_SENDER_<_Sender>, - STDEXEC::_WITH_ENVIRONMENT_(_Env) - >(); - } + [[deprecated("Please use stdexec::get_parallel_scheduler instead")]] + inline auto get_parallel_scheduler() noexcept -> STDEXEC::parallel_scheduler { + return STDEXEC::get_parallel_scheduler(); } } // namespace exec - -#if defined(STDEXEC_SYSTEM_CONTEXT_HEADER_ONLY) -# define STDEXEC_SYSTEM_CONTEXT_INLINE inline -# include "../stdexec/__detail/__system_context_default_impl_entry.hpp" -#endif diff --git a/include/stdexec/__detail/__bulk.hpp b/include/stdexec/__detail/__bulk.hpp index 1307521d4..d1d3b9509 100644 --- a/include/stdexec/__detail/__bulk.hpp +++ b/include/stdexec/__detail/__bulk.hpp @@ -16,12 +16,12 @@ #pragma once #include "__execution_fwd.hpp" -#include "__execution_legacy.hpp" // include these after __execution_fwd.hpp #include "__basic_sender.hpp" #include "__completion_signatures_of.hpp" #include "__diagnostics.hpp" +#include "__execution_legacy.hpp" // IWYU pragma: export #include "__meta.hpp" #include "__sender_adaptor_closure.hpp" #include "__senders.hpp" // IWYU pragma: keep for __well_formed_sender diff --git a/include/stdexec/__detail/__parallel_scheduler.hpp b/include/stdexec/__detail/__parallel_scheduler.hpp new file mode 100644 index 000000000..c2814c61c --- /dev/null +++ b/include/stdexec/__detail/__parallel_scheduler.hpp @@ -0,0 +1,754 @@ +/* + * Copyright (c) 2023 Lee Howes, Lucian Radu Teodorescu + * Copyright (c) 2026 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "__execution_fwd.hpp" + +#include "__bulk.hpp" +#include "__domain.hpp" +#include "__schedulers.hpp" +#include "__sender_introspection.hpp" +#include "__senders.hpp" +#include "__system_context_replaceability_api.hpp" +#include "__transform_completion_signatures.hpp" +#include "__transform_sender.hpp" + +#include +#include + +// TODO: make these configurable by providing policy to the system context +#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE +# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 72 +#endif +#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN +# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN 8 +#endif +#ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE +# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE 152 +#endif +#ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN +# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN 8 +#endif + +namespace STDEXEC { + struct CANNOT_DISPATCH_THE_BULK_ALGORITHM_TO_THE_PARALLEL_SCHEDULER; + struct BECAUSE_THERE_IS_NO_PARALLEL_SCHEDULER_IN_THE_ENVIRONMENT; + struct ADD_A_CONTINUES_ON_TRANSITION_TO_THE_PARALLEL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM; + + namespace __detail { + /// Allows a frontend receiver of type `_Rcvr` to be passed to the backend. + template + struct __receiver_adapter : system_context_replaceability::receiver_proxy { + explicit __receiver_adapter(_Rcvr&& __rcvr) + : __rcvr_{static_cast<_Rcvr&&>(__rcvr)} { + } + + void set_value() noexcept override { + STDEXEC::set_value(std::forward<_Rcvr>(__rcvr_)); + } + + void set_error(std::exception_ptr&& __ex) noexcept override { + STDEXEC::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex)); + } + + void set_stopped() noexcept override { + STDEXEC::set_stopped(std::forward<_Rcvr>(__rcvr_)); + } + + protected: + void __query_env(__type_index __query_type, __type_index __value_type, void* __dest) + const noexcept override { + if (__query_type == __mtypeid) { + __query(get_stop_token, __value_type, __dest); + } + } + + private: + void __query(get_stop_token_t, __type_index __value_type, void* __dest) const noexcept { + using __stop_token_t = stop_token_of_t>; + if constexpr (std::is_same_v) { + if (__value_type == __mtypeid) { + using __dest_t = std::optional; + *static_cast<__dest_t*>(__dest) = get_stop_token(STDEXEC::get_env(__rcvr_)); + } + } + } + + public: + STDEXEC_ATTRIBUTE(no_unique_address) + _Rcvr __rcvr_; + }; + + /// The type large enough to store the data produced by a sender. + /// BUGBUG: this seems wrong. i think this should be a variant of tuples of possible + /// results. + template + using __sender_data_t = decltype(STDEXEC::sync_wait(std::declval<_Sender>()).value()); + + } // namespace __detail + + class parallel_scheduler; + class __parallel_sender; + + template + class __parallel_bulk_sender; + + /// Returns a scheduler that can add work to the underlying execution context. + auto get_parallel_scheduler() -> parallel_scheduler; + + /// Concept that matches `bulk_chunked` and `bulk_unchunked` senders. + template + concept __bulk_chunked_or_unchunked = sender_expr_for<_Sender, bulk_chunked_t> + || sender_expr_for<_Sender, bulk_unchunked_t>; + + /// The execution domain of the parallel_scheduler, used for the purposes of customizing + /// sender algorithms such as `bulk_chunked` and `bulk_unchunked`. + struct __parallel_scheduler_domain : default_domain { + template <__bulk_chunked_or_unchunked _Sender, class _Env> + auto transform_sender(set_value_t, _Sender&& __sndr, const _Env& __env) const noexcept; + }; + + namespace __detail { + using __backend_ptr_t = + std::shared_ptr; + + template + auto __make_parallel_scheduler_from(_SetTag, __backend_ptr_t) noexcept -> parallel_scheduler; + + /// Describes the environment of this sender. + struct __parallel_scheduler_env { + /// Returns the system scheduler as the completion scheduler for `set_value_t`. + template <__one_of _Tag> + [[nodiscard]] + auto query(get_completion_scheduler_t<_Tag>) const noexcept { + return __detail::__make_parallel_scheduler_from(_Tag(), __sched_); + } + + /// The underlying implementation of the scheduler we are using. + __backend_ptr_t __sched_; + }; + + template + struct __aligned_storage { + alignas(_Align) unsigned char __data_[_Size]; + + auto __as_storage() noexcept -> std::span { + return {reinterpret_cast(__data_), _Size}; + } + + template + auto __as() noexcept -> _T& { + static_assert(alignof(_T) <= _Align); + return *reinterpret_cast<_T*>(__data_); + } + + auto __as_ptr() noexcept -> void* { + return __data_; + } + }; + + /* + Storage needed for a frontend operation-state: + + schedule: + - __receiver_adapter::__vtable -- 8 + - __receiver_adapter::__rcvr_ (Rcvr) -- assuming 0 + - __system_op::__preallocated_ (__preallocated) -- 72 + --------------------- + Total: 80; extra 8 bytes compared to backend needs. + + for bulk: + - __bulk_state_base::__fun_ (_Fn) -- 0 (assuming empty function) + - __bulk_state_base::__rcvr_ (_Rcvr) -- 0 (assuming empty receiver) + - __forward_args_receiver::__vtable -- 8 + - __forward_args_receiver::__arguments_data_ (array of bytes) -- 8 (depending on previous sender) + - __bulk_state_base::__prepare_storage_for_backend (fun ptr) -- 8 + - __bulk_state_base::__size_ (_Size) -- 4 + - __bulk_state::__preallocated_ (__preallocated_) -- 152 + - __previous_operation_state_ (__inner_op_state) -- 104 + - __bulk_intermediate_receiver::__state_ (__state_&) -- 8 + - __bulk_intermediate_receiver::__sched_ (parallel_scheduler*) -- 8 + --------------------- + Total: 176; extra 24 bytes compared to backend needs. + + [*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations. + */ + + /// The operation state used to execute the work described by this sender. + template + struct __system_op { + /// Constructs `this` from `__rcvr` and `__sched_impl`. + __system_op(_Rcvr&& __rcvr, __backend_ptr_t __sched_impl) + : __rcvr_{std::forward<_Rcvr>(__rcvr)} { + /// Before the operation starts, we store the scheduler implementation in + /// __preallocated_. After the operation starts, we don't need this pointer + /// anymore, and the storage can be used by the backend + auto* __p = &__preallocated_.__as<__backend_ptr_t>(); + std::construct_at(__p, std::move(__sched_impl)); + } + + ~__system_op() = default; + + __system_op(const __system_op&) = delete; + __system_op(__system_op&&) = delete; + auto operator=(const __system_op&) -> __system_op& = delete; + auto operator=(__system_op&&) -> __system_op& = delete; + + /// Starts the work stored in `this`. + void start() & noexcept { + auto st = get_stop_token(STDEXEC::get_env(__rcvr_.__rcvr_)); + if (st.stop_requested()) { + STDEXEC::set_stopped(__rcvr_); + return; + } + auto& __sched_impl = __preallocated_.__as<__backend_ptr_t>(); + auto __impl = std::move(__sched_impl); + std::destroy_at(&__sched_impl); + __impl->schedule(__rcvr_, __preallocated_.__as_storage()); + } + + /// Object that receives completion from the work described by the sender. + __receiver_adapter<_Rcvr> __rcvr_; + + /// Preallocated space for storing the operation state on the implementation size. + /// We also store here the backend interface for the scheduler before we actually + /// start the operation. + __aligned_storage< + STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE, + STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN + > + __preallocated_; + }; + } // namespace __detail + + /// The sender used to schedule new work in the system context. + class __parallel_sender { + public: + /// Marks this type as being a sender + using sender_concept = sender_t; + + /// Implementation __detail. Constructs the sender to wrap `__impl`. + explicit __parallel_sender(__detail::__backend_ptr_t __impl) + : __sched_{std::move(__impl)} { + } + + /// Declares the completion signals sent by `this`. + template + static consteval auto get_completion_signatures() noexcept { + return completion_signatures< + set_value_t(), + set_stopped_t(), + set_error_t(std::exception_ptr) + >(); + } + + /// Gets the environment of this sender. + [[nodiscard]] + auto get_env() const noexcept -> __detail::__parallel_scheduler_env { + return {__sched_}; + } + + /// Value completion happens on the parallel scheduler. + [[nodiscard]] + auto query(get_completion_scheduler_t) const noexcept -> parallel_scheduler; + + /// Connects `__self` to `__rcvr`, returning the operation state containing the work to be done. + template + auto connect(_Rcvr __rcvr) && noexcept(__nothrow_move_constructible<_Rcvr>) + -> __detail::__system_op<__parallel_sender, _Rcvr> { + return {std::move(__rcvr), std::move(__sched_)}; + } + + template + auto connect(_Rcvr __rcvr) & noexcept(__nothrow_move_constructible<_Rcvr>) + -> __detail::__system_op<__parallel_sender, _Rcvr> { + return {std::move(__rcvr), __sched_}; + } + + private: + /// The underlying implementation of the system scheduler. + __detail::__backend_ptr_t __sched_; + }; + + /// A scheduler that can add work to the system context. + class parallel_scheduler { + public: + parallel_scheduler() = delete; + + /// Returns `true` iff `*this` refers to the same scheduler as the argument. + auto operator==(const parallel_scheduler&) const noexcept -> bool = default; + + /// Returns the forward progress guarantee of `this`. + [[nodiscard]] + auto query(get_forward_progress_guarantee_t) const noexcept -> forward_progress_guarantee; + + /// Returns the execution domain of `this`. + [[nodiscard]] + auto query(get_domain_t) const noexcept -> __parallel_scheduler_domain { + return {}; + } + + /// Schedules new work, returning the sender that signals the start of the work. + [[nodiscard]] + auto schedule() const noexcept -> __parallel_sender { + return __parallel_sender{__impl_}; + } + + private: + template + friend class __parallel_bulk_sender; + + template + friend auto __detail::__make_parallel_scheduler_from(_Tag, __detail::__backend_ptr_t) noexcept + -> parallel_scheduler; + + /// Implementation __detail. Constructs the scheduler to wrap `__impl`. + explicit parallel_scheduler(__detail::__backend_ptr_t&& __impl) + : __impl_(__impl) { + } + + /// The underlying implementation of the scheduler. + __detail::__backend_ptr_t __impl_; + }; + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // bulk + + namespace __detail { + template + auto __make_parallel_scheduler_from(T, __backend_ptr_t __impl) noexcept -> parallel_scheduler { + return parallel_scheduler{std::move(__impl)}; + } + + /// Helper that knows how to store the values sent by `_Previous` and pass them to bulk item calls or to the completion signal. + /// This represents the base class that abstracts the storage of the values sent by the previous sender. + /// Derived class will properly implement the receiver methods. + template + struct __forward_args_receiver : system_context_replaceability::bulk_item_receiver_proxy { + using __storage_t = __detail::__sender_data_t<_Previous>; + + /// Storage for the arguments received from the previous sender. + alignas(__storage_t) unsigned char __arguments_data_[sizeof(__storage_t)]; + }; + + /// Derived class that properly forwards the arguments received from `_Previous` to the receiver methods. + /// Uses the storage defined in the base class. No extra data is added here. + template + struct __typed_forward_args_receiver : __forward_args_receiver<_Previous> { + using __base_t = __forward_args_receiver<_Previous>; + using __rcvr_t = _BulkState::__rcvr_t; + + /// Stores `__as` in the base class storage, with the right types. + explicit __typed_forward_args_receiver(_As&&... __as) { + static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_)); + // BUGBUG: this seems wrong. we are not ever destroying this tuple. + new (__base_t::__arguments_data_) std::tuple<__decay_t<_As>...>{std::move(__as)...}; + } + + /// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender. + void set_value() noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + std::apply( + [&](auto&&... __args) { + STDEXEC::set_value( + std::forward<__rcvr_t>(__state->__rcvr_), std::forward<_As>(__args)...); + }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + + /// Calls `set_error()` on the final receiver of the bulk operation, passing `__ex`. + void set_error(std::exception_ptr __ex) noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + STDEXEC::set_error(std::forward<__rcvr_t>(__state->__rcvr_), std::move(__ex)); + } + + /// Calls `set_stopped()` on the final receiver of the bulk operation. + void set_stopped() noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + STDEXEC::set_stopped(std::forward<__rcvr_t>(__state->__rcvr_)); + } + + /// Calls the bulk functor passing `__index` and the values from the previous sender. + void execute(uint32_t __begin, uint32_t __end) noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + if constexpr (_BulkState::__is_unchunked) { + (void) __end; // not used + // If we are not parallelizing, we need to run all the iterations sequentially. + uint32_t __increments = 1; + if constexpr (!_BulkState::__parallelize) { + __increments = static_cast(__state->__size_); + } + for (uint32_t __i = __begin; __i < __begin + __increments; __i++) { + std::apply( + [&](auto&&... __args) { __state->__fun_(__i, __args...); }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + } else { + // If we are not parallelizing, we need to pass the entire range to the functor. + if constexpr (!_BulkState::__parallelize) { + __begin = 0; + __end = static_cast(__state->__size_); + } + std::apply( + [&](auto&&... __args) { __state->__fun_(__begin, __end, __args...); }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + } + + protected: + void __query_env(__type_index __query_type, __type_index __value_type, void* __dest) + const noexcept override { + if (__query_type == __mtypeid) { + __query(get_stop_token, __value_type, __dest); + } + } + + private: + void __query(get_stop_token_t, __type_index __value_type, void* __dest) const noexcept { + auto __state = reinterpret_cast(this); + using __stop_token_t = stop_token_of_t>; + if constexpr (std::is_same_v) { + using __dest_t = std::optional; + if (__value_type == __mtypeid) { + *static_cast<__dest_t*>(__dest) = get_stop_token(STDEXEC::get_env(__state->__rcvr_)); + } + } + } + }; + + /// The state needed to execute the bulk sender created from system context, minus the preallocates space. + /// The preallocated space is obtained by calling the `__prepare_storage_for_backend` function pointer. + template < + sender _Previous, + std::integral _Size, + class _Fn, + class _Rcvr, + bool _IsUnchunked, + bool _Parallelize + > + struct __bulk_state_base { + using __rcvr_t = _Rcvr; + using __forward_args_helper_t = __forward_args_receiver<_Previous>; + static constexpr bool __is_unchunked = _IsUnchunked; + static constexpr bool __parallelize = _Parallelize; + + /// Storage for the arguments and the helper needed to pass the arguments from the previous bulk sender to the bulk functor and receiver. + /// Needs to be the first member, to easier the convertion between `__forward_args_helper_` and `this`. + alignas(__forward_args_helper_t) unsigned char __forward_args_helper_[sizeof( + __forward_args_helper_t)]{}; + + /// The function to be executed to perform the bulk work. + STDEXEC_ATTRIBUTE(no_unique_address) + _Fn __fun_; + /// The receiver object that receives completion from the work described by the sender. + STDEXEC_ATTRIBUTE(no_unique_address) + _Rcvr __rcvr_; + + /// Function that prepares the preallocated storage for calling the backend. + std::span (*__prepare_storage_for_backend)(__bulk_state_base*){nullptr}; + /// The size of the bulk operation. + _Size __size_; + + __bulk_state_base(_Fn&& __fun, _Rcvr&& __rcvr, _Size __size) + : __fun_{std::move(__fun)} + , __rcvr_{std::move(__rcvr)} + , __size_{__size} { + } + }; + + /// Receiver that is used in "bulk" to connect to the input sender of the bulk operation. + template + struct __bulk_intermediate_receiver { + /// Declare that this is a `receiver`. + using receiver_concept = receiver_t; + + /// Object that holds the relevant data for the entire bulk operation. + _BulkState& __state_; + /// The underlying implementation of the scheduler we are using. + __backend_ptr_t __sched_{nullptr}; + + template + void set_value(_As&&... __as) noexcept { + auto st = get_stop_token(STDEXEC::get_env(__state_.__rcvr_)); + if (st.stop_requested()) { + STDEXEC::set_stopped(__state_.__rcvr_); + return; + } + + // Store the input data in the shared state. + using __typed_forward_args_receiver_t = + __typed_forward_args_receiver<_Previous, _BulkState, _As...>; + auto __r = new (&__state_.__forward_args_helper_) + __typed_forward_args_receiver_t(std::forward<_As>(__as)...); + + auto __scheduler = __sched_; + auto __size = static_cast(__state_.__size_); + + auto __storage = __state_.__prepare_storage_for_backend(&__state_); + // This might destroy the `this` object. + + // Schedule the bulk work on the system scheduler. + // This will invoke `execute` on our receiver multiple times, and then a completion signal (e.g., `set_value`). + if constexpr (_BulkState::__is_unchunked) { + __scheduler + ->schedule_bulk_unchunked(_BulkState::__parallelize ? __size : 1, __storage, *__r); + } else { + __scheduler + ->schedule_bulk_chunked(_BulkState::__parallelize ? __size : 1, __storage, *__r); + } + } + + /// Invoked when the previous sender completes with "stopped" to stop the entire work. + void set_stopped() noexcept { + STDEXEC::set_stopped(std::move(__state_.__rcvr_)); + } + + /// Invoked when the previous sender completes with error to forward the error to the connected receiver. + template + void set_error(__E __e) noexcept { + STDEXEC::set_error(std::move(__state_.__rcvr_), std::move(__e)); + } + + /// Gets the environment of this receiver; returns the environment of the connected receiver. + [[nodiscard]] + auto get_env() const noexcept -> decltype(auto) { + return STDEXEC::get_env(__state_.__rcvr_); + } + }; + + /// The operation state object for the system bulk sender. + template < + bool _IsUnchunked, + sender _Previous, + std::integral _Size, + class _Fn, + class _Rcvr, + bool _Parallelize + > + struct __system_bulk_op + : __bulk_state_base<_Previous, _Size, _Fn, _Rcvr, _IsUnchunked, _Parallelize> { + + /// The type that holds the state of the bulk operation. + using __bulk_state_base_t = + __bulk_state_base<_Previous, _Size, _Fn, _Rcvr, _IsUnchunked, _Parallelize>; + + /// The type of the receiver that will be connected to the previous sender. + using __intermediate_receiver_t = + __bulk_intermediate_receiver<__bulk_state_base_t, _Previous>; + + /// The type of inner operation state, which is the result of connecting the previous sender to the bulk intermediate receiver. + using __inner_op_state = connect_result_t<_Previous, __intermediate_receiver_t>; + + static constexpr size_t _PreallocatedSize = + (std::max) (size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE), sizeof(__inner_op_state)); + static constexpr size_t _PreallocatedAlign = + (std::max) (size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN), + alignof(__inner_op_state)); + + /// Preallocated space for storing the inner operation state, and then storage space for the backend call. + __aligned_storage<_PreallocatedSize, _PreallocatedAlign> __preallocated_; + + /// Destroys the inner operation state object, and returns the preallocated storage for it to be used by the backend. + static auto + __prepare_storage_for_backend_impl(__bulk_state_base_t* __base) -> std::span { + auto* __self = static_cast<__system_bulk_op*>(__base); + // We don't need anymore the storage for the previous operation state. + __self->__preallocated_.template __as<__inner_op_state>().~__inner_op_state(); + // Reuse the preallocated storage for the backend. + return __self->__preallocated_.__as_storage(); + } + + /// Constructs `this` from `__snd` and `__rcvr`, using the object returned by `__initFunc` to start the operation. + /// + /// Using a functor to initialize the operation state allows the use of `this` to get the + /// underlying implementation object. + /// + /// `_Snd` is a `__parallel_bulk_sender`. + template + __system_bulk_op(_Snd&& __snd, _Rcvr&& __rcvr, _InitF&& __initFunc) + : __bulk_state_base_t{std::move(__snd.__fun_), std::move(__rcvr), __snd.__size_} { + // Write the function that prepares the storage for the backend. + __bulk_state_base_t::__prepare_storage_for_backend = + &__system_bulk_op::__prepare_storage_for_backend_impl; + + // Start using the preallocated buffer to store the inner operation state. + new (__preallocated_.__as_ptr()) __inner_op_state(__initFunc(*this)); + } + + __system_bulk_op(const __system_bulk_op&) = delete; + __system_bulk_op(__system_bulk_op&&) = delete; + auto operator=(const __system_bulk_op&) -> __system_bulk_op& = delete; + auto operator=(__system_bulk_op&&) -> __system_bulk_op& = delete; + + /// Starts the work stored in `*this`. + void start() & noexcept { + // Start previous operation state. + // Bulk operation will be started when the previous sender completes. + STDEXEC::start(__preallocated_.template __as<__inner_op_state>()); + } + }; + } // namespace __detail + + /// The sender used to schedule bulk work in the system context. + template + class __parallel_bulk_sender { + /// Meta-function that returns the completion signatures of `this`. + template + using __completions_t = transform_completion_signatures< + __completion_signatures_of_t<__copy_cvref_t<_Self, _Previous>, _Env...>, + completion_signatures + >; + + template + friend struct __detail::__system_bulk_op; + + public: + /// Marks this type as being a sender + using sender_concept = sender_t; + + /// Constructs `this`. + __parallel_bulk_sender( + parallel_scheduler __sched, + _Previous __previous, + _Size __size, + _Fn&& __fun) + : __sched_{__sched.__impl_} + , __previous_{std::move(__previous)} + , __size_{std::move(__size)} + , __fun_{std::move(__fun)} { + } + + /// Gets the environment of this sender. + [[nodiscard]] + auto get_env() const noexcept -> __detail::__parallel_scheduler_env { + return {__sched_}; + } + + /// Connects `__self` to `__rcvr`, returning the operation state containing the work to be done. + template + auto connect(_Rcvr __rcvr) && noexcept(__nothrow_move_constructible<_Rcvr>) + -> __detail::__system_bulk_op<_IsUnchunked, _Previous, _Size, _Fn, _Rcvr, _Parallelize> { + using __res_t = + __detail::__system_bulk_op<_IsUnchunked, _Previous, _Size, _Fn, _Rcvr, _Parallelize>; + using __receiver_t = __res_t::__intermediate_receiver_t; + return {std::move(*this), std::move(__rcvr), [this](auto& __op) { + // Connect bulk input receiver with the previous operation and store in the operating state. + return STDEXEC::connect( + std::move(this->__previous_), __receiver_t{__op, std::move(this->__sched_)}); + }}; + } + + /// Gets the completion signatures for this sender. + template <__decays_to<__parallel_bulk_sender> _Self, class... _Env> + static consteval auto get_completion_signatures() -> __completions_t<_Self, _Env...> { + return {}; + } + + private: + /// The underlying implementation of the scheduler we are using. + __detail::__backend_ptr_t __sched_{nullptr}; + /// The previous sender, the one that produces the input value for the bulk function. + _Previous __previous_; + /// The size of the bulk operation. + _Size __size_; + /// The function to be executed to perform the bulk work. + STDEXEC_ATTRIBUTE(no_unique_address) + _Fn __fun_; + }; + + inline auto get_parallel_scheduler() -> parallel_scheduler { + auto __impl = system_context_replaceability::query_parallel_scheduler_backend(); + if (!__impl) { + STDEXEC_THROW(std::runtime_error{"No system context implementation found"}); + } + return __detail::__make_parallel_scheduler_from(set_value, std::move(__impl)); + } + + [[deprecated("get_system_scheduler has been renamed get_parallel_scheduler")]] + inline auto get_system_scheduler() -> parallel_scheduler { + return get_parallel_scheduler(); + } + + inline auto __parallel_sender::query(get_completion_scheduler_t) const noexcept + -> parallel_scheduler { + return __detail::__make_parallel_scheduler_from(set_value_t{}, __sched_); + } + + inline auto parallel_scheduler::query(get_forward_progress_guarantee_t) const noexcept + -> forward_progress_guarantee { + return forward_progress_guarantee::parallel; + } + + struct __transform_parallel_bulk_sender { + template + auto operator()(bulk_chunked_t, _Data&& __data, _Previous&& __previous) const noexcept { + auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data); + using __policy_t = std::remove_cvref_t; + constexpr bool __parallelize = std::same_as<__policy_t, parallel_policy> + || std::same_as<__policy_t, parallel_unsequenced_policy>; + return __parallel_bulk_sender< + false, + _Previous, + decltype(__shape), + decltype(__fn), + __parallelize + >{__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)}; + } + + template + auto operator()(bulk_unchunked_t, _Data&& __data, _Previous&& __previous) const noexcept { + auto [__pol, __shape, __fn] = static_cast<_Data&&>(__data); + using __policy_t = std::remove_cvref_t; + constexpr bool __parallelize = std::same_as<__policy_t, parallel_policy> + || std::same_as<__policy_t, parallel_unsequenced_policy>; + return __parallel_bulk_sender< + true, + _Previous, + decltype(__shape), + decltype(__fn), + __parallelize + >{__sched_, static_cast<_Previous&&>(__previous), __shape, std::move(__fn)}; + } + + parallel_scheduler __sched_; + }; + + template <__bulk_chunked_or_unchunked _Sender, class _Env> + auto + __parallel_scheduler_domain::transform_sender(set_value_t, _Sender&& __sndr, const _Env& __env) + const noexcept { + if constexpr (__completes_on<_Sender, parallel_scheduler, _Env>) { + auto __sched = get_scheduler(__env); + return __apply(__transform_parallel_bulk_sender{__sched}, static_cast<_Sender&&>(__sndr)); + } else { + return __not_a_sender< + _WHAT_(CANNOT_DISPATCH_THE_BULK_ALGORITHM_TO_THE_PARALLEL_SCHEDULER), + _WHY_(BECAUSE_THERE_IS_NO_PARALLEL_SCHEDULER_IN_THE_ENVIRONMENT), + _WHERE_(_IN_ALGORITHM_, tag_of_t<_Sender>), + _TO_FIX_THIS_ERROR_( + ADD_A_CONTINUES_ON_TRANSITION_TO_THE_PARALLEL_SCHEDULER_BEFORE_THE_BULK_ALGORITHM), + _WITH_PRETTY_SENDER_<_Sender>, + _WITH_ENVIRONMENT_(_Env) + >(); + } + } +} // namespace STDEXEC + +#if defined(STDEXEC_SYSTEM_CONTEXT_HEADER_ONLY) +# define STDEXEC_SYSTEM_CONTEXT_INLINE inline +# include "__system_context_default_impl_entry.hpp" +#endif diff --git a/include/stdexec/__detail/__parallel_scheduler_backend.hpp b/include/stdexec/__detail/__parallel_scheduler_backend.hpp index be13155bb..0480e0ca0 100644 --- a/include/stdexec/__detail/__parallel_scheduler_backend.hpp +++ b/include/stdexec/__detail/__parallel_scheduler_backend.hpp @@ -20,7 +20,6 @@ #include "__execution_fwd.hpp" // include these after __execution_fwd.hpp -// #include "any_allocator.cuh" #include "../functional.hpp" // IWYU pragma: keep for __with_default #include "../stop_token.hpp" // IWYU pragma: keep for get_stop_token_t #include "__any_allocator.hpp" @@ -93,13 +92,15 @@ namespace STDEXEC { /// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for different /// subranges of [0, __n), and using `__s` for preallocated memory. - virtual void - schedule_bulk_chunked(size_t, bulk_item_receiver_proxy&, std::span) noexcept = 0; + virtual void schedule_bulk_chunked( + std::size_t, + bulk_item_receiver_proxy&, + std::span) noexcept = 0; /// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item, and /// using `__s` for preallocated memory. virtual void schedule_bulk_unchunked( - size_t, + std::size_t, bulk_item_receiver_proxy&, std::span) noexcept = 0; }; diff --git a/include/stdexec/__detail/__system_context_default_impl.hpp b/include/stdexec/__detail/__system_context_default_impl.hpp index 659d8d37f..4c3e1c439 100644 --- a/include/stdexec/__detail/__system_context_default_impl.hpp +++ b/include/stdexec/__detail/__system_context_default_impl.hpp @@ -27,7 +27,7 @@ #endif namespace STDEXEC::__system_context_default_impl { - using system_context_replaceability::__parallel_scheduler_backend_factory; + using system_context_replaceability::__parallel_scheduler_backend_factory_t; /// Receiver that calls the callback when the operation completes. template @@ -321,7 +321,7 @@ namespace STDEXEC::__system_context_default_impl { // Otherwise, create a new instance using the factory. // Note: we are lazy-loading the instance to avoid creating it if it is not needed. - auto __new_instance = __factory_.load(STDEXEC::__std::memory_order_relaxed)(); + auto __new_instance = __factory_.load(__std::memory_order_relaxed)(); // Store the newly created instance. __acquire_instance_lock(); @@ -331,8 +331,8 @@ namespace STDEXEC::__system_context_default_impl { } /// Set `__new_factory` as the new factory for `_Interface` and return the old one. - auto __set_backend_factory(__parallel_scheduler_backend_factory __new_factory) - -> __parallel_scheduler_backend_factory { + auto __set_backend_factory(__parallel_scheduler_backend_factory_t __new_factory) + -> __parallel_scheduler_backend_factory_t { // Replace the factory, keeping track of the old one. auto __old_factory = __factory_.exchange(__new_factory); // Create a new instance with the new factory. @@ -347,9 +347,9 @@ namespace STDEXEC::__system_context_default_impl { } private: - STDEXEC::__std::atomic __instance_locked_{false}; + __std::atomic __instance_locked_{false}; std::shared_ptr<_Interface> __instance_{nullptr}; - STDEXEC::__std::atomic<__parallel_scheduler_backend_factory> __factory_{__default_factory}; + __std::atomic<__parallel_scheduler_backend_factory_t> __factory_{__default_factory}; /// The default factory returns an instance of `_Impl`. static auto __default_factory() -> std::shared_ptr<_Interface> { @@ -357,13 +357,13 @@ namespace STDEXEC::__system_context_default_impl { } void __acquire_instance_lock() { - while (__instance_locked_.exchange(true, STDEXEC::__std::memory_order_acquire)) { + while (__instance_locked_.exchange(true, __std::memory_order_acquire)) { // Spin until we acquire the lock. } } void __release_instance_lock() { - __instance_locked_.store(false, STDEXEC::__std::memory_order_release); + __instance_locked_.store(false, __std::memory_order_release); } }; diff --git a/include/stdexec/__detail/__system_context_default_impl_entry.hpp b/include/stdexec/__detail/__system_context_default_impl_entry.hpp index aea9c2dbb..8fe6a7c3e 100644 --- a/include/stdexec/__detail/__system_context_default_impl_entry.hpp +++ b/include/stdexec/__detail/__system_context_default_impl_entry.hpp @@ -16,11 +16,11 @@ */ #pragma once -// This file assumes STDEXEC_SYSTEM_CONTEXT_INLINE is defined before including it. But clang-tidy -// and doxygen don't know that, so we need to include the header that defines it when clang-tidy and -// doxygen are invoked. +// This file assumes STDEXEC_SYSTEM_CONTEXT_INLINE is defined before including it. But +// clang-tidy and doxygen don't know that, so we need to include the header that defines +// it when clang-tidy and doxygen are invoked. #if defined(STDEXEC_CLANG_TIDY_INVOKED) || defined(STDEXEC_DOXYGEN_INVOKED) -# include "../../exec/system_context.hpp" // IWYU pragma: keep +# include "__parallel_scheduler.hpp" // IWYU pragma: keep #endif #if !defined(STDEXEC_SYSTEM_CONTEXT_INLINE) @@ -29,24 +29,23 @@ #include "__system_context_default_impl.hpp" // IWYU pragma: keep -#define __STDEXEC_SYSTEM_CONTEXT_API extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE(weak) - namespace STDEXEC::system_context_replaceability { /// Get the backend for the parallel scheduler. /// Users might replace this function. - auto query_parallel_scheduler_backend() -> std::shared_ptr { - return __system_context_default_impl::__parallel_scheduler_backend_singleton + STDEXEC_SYSTEM_CONTEXT_INLINE auto + query_parallel_scheduler_backend() -> std::shared_ptr { + return STDEXEC::__system_context_default_impl::__parallel_scheduler_backend_singleton .__get_current_instance(); } /// Set a factory for the parallel scheduler backend. /// Can be used to replace the parallel scheduler at runtime. - /// Out of spec. - auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory) - -> __parallel_scheduler_backend_factory { - return __system_context_default_impl::__parallel_scheduler_backend_singleton + /// NOT TO SPEC + extern STDEXEC_SYSTEM_CONTEXT_INLINE // + auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory_t __new_factory) + -> __parallel_scheduler_backend_factory_t { + return STDEXEC::__system_context_default_impl::__parallel_scheduler_backend_singleton .__set_backend_factory(__new_factory); } - } // namespace STDEXEC::system_context_replaceability diff --git a/include/stdexec/__detail/__system_context_replaceability_api.hpp b/include/stdexec/__detail/__system_context_replaceability_api.hpp index 9bba61ca8..acfe4366c 100644 --- a/include/stdexec/__detail/__system_context_replaceability_api.hpp +++ b/include/stdexec/__detail/__system_context_replaceability_api.hpp @@ -22,17 +22,22 @@ #include namespace STDEXEC::system_context_replaceability { - /// The type of a factory that can create `parallel_scheduler_backend` instances. - /// TODO(ericniebler): NOT TO SPEC. - using __parallel_scheduler_backend_factory = std::shared_ptr (*)(); - /// Get the backend for the parallel scheduler. /// Users might replace this function. + STDEXEC_ATTRIBUTE(weak) auto query_parallel_scheduler_backend() -> std::shared_ptr; + /// The type of a factory that can create `parallel_scheduler_backend` instances. + /// NOT TO SPEC + using __parallel_scheduler_backend_factory_t = std::shared_ptr (*)(); + /// Set a factory for the parallel scheduler backend. /// Can be used to replace the parallel scheduler at runtime. - /// TODO(ericniebler): NOT TO SPEC. - auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory) - -> __parallel_scheduler_backend_factory; + /// NOT TO SPEC + [[deprecated( + "Replacing the parallel scheduler backend at runtime is not recommended and may lead to " + "unexpected behavior. Use weak linking to replace the parallel scheduler at compile time " + "instead.")]] + auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory_t __new_factory) + -> __parallel_scheduler_backend_factory_t; } // namespace STDEXEC::system_context_replaceability diff --git a/include/stdexec/__detail/__task_scheduler.hpp b/include/stdexec/__detail/__task_scheduler.hpp index ea56a1183..e68d537ea 100644 --- a/include/stdexec/__detail/__task_scheduler.hpp +++ b/include/stdexec/__detail/__task_scheduler.hpp @@ -25,7 +25,6 @@ #include "__env.hpp" #include "__meta.hpp" #include "__parallel_scheduler_backend.hpp" -#include "__queries.hpp" #include "__schedulers.hpp" #include "__transform_completion_signatures.hpp" #include "__typeinfo.hpp" @@ -42,23 +41,23 @@ namespace STDEXEC { class task_scheduler; struct task_scheduler_domain; - namespace __detail { + namespace __task { // The concrete type-erased sender returned by task_scheduler::schedule() - struct __task_sender; + struct __sender; template - struct __task_bulk_sender; + struct __bulk_sender; template - struct __task_bulk_state; + struct __bulk_state; template - struct __task_bulk_receiver; + struct __bulk_receiver; struct __task_scheduler_backend : system_context_replaceability::parallel_scheduler_backend { [[nodiscard]] - virtual auto - query(get_forward_progress_guarantee_t) const noexcept -> forward_progress_guarantee = 0; + virtual auto query(get_forward_progress_guarantee_t) const noexcept // + -> forward_progress_guarantee = 0; virtual auto __equal_to(const void* __other, __type_index __type) -> bool = 0; }; @@ -66,7 +65,7 @@ namespace STDEXEC { template concept __non_task_scheduler = __not_same_as && scheduler<_Sch>; - } // namespace __detail + } // namespace __task struct _CANNOT_DISPATCH_BULK_ALGORITHM_TO_TASK_SCHEDULER_BECAUSE_THERE_IS_NO_TASK_SCHEDULER_IN_THE_ENVIRONMENT; struct _ADD_A_CONTINUES_ON_TRANSITION_TO_THE_TASK_SCHEDULER_BEFORE_THE_BULK_ALGORITHM; @@ -95,7 +94,7 @@ namespace STDEXEC { >{}; } else { auto __sch = get_completion_scheduler(get_env(__sndr), __env); - return __detail::__task_bulk_sender<_Sndr>{static_cast<_Sndr&&>(__sndr), std::move(__sch)}; + return __task::__bulk_sender<_Sndr>{static_cast<_Sndr&&>(__sndr), std::move(__sch)}; } } }; @@ -114,20 +113,19 @@ namespace STDEXEC { public: using scheduler_concept = scheduler_t; - template > - requires __detail::__non_task_scheduler<_Sch> + template <__task::__non_task_scheduler _Sch, class _Alloc = std::allocator> explicit task_scheduler(_Sch __sch, _Alloc __alloc = {}) : __backend_( std::allocate_shared<__backend_for<_Sch, _Alloc>>(__alloc, std::move(__sch), __alloc)) { } [[nodiscard]] - auto schedule() const noexcept -> __detail::__task_sender; + auto schedule() const noexcept -> __task::__sender; [[nodiscard]] bool operator==(const task_scheduler& __rhs) const noexcept = default; - template <__detail::__non_task_scheduler _Sch> + template <__task::__non_task_scheduler _Sch> [[nodiscard]] auto operator==(const _Sch& __other) const noexcept -> bool { return __backend_->__equal_to(std::addressof(__other), __mtypeid<_Sch>); @@ -150,13 +148,13 @@ namespace STDEXEC { private: template - friend struct __detail::__task_bulk_sender; - friend struct __detail::__task_sender; + friend struct __task::__bulk_sender; + friend struct __task::__sender; - __detail::__backend_ptr_t __backend_; + __task::__backend_ptr_t __backend_; }; - namespace __detail { + namespace __task { //! @brief A type-erased opstate returned when connecting the result of //! task_scheduler::schedule() to a receiver. template @@ -185,7 +183,7 @@ namespace STDEXEC { }; //! @brief A type-erased sender returned by task_scheduler::schedule(). - struct __task_sender { + struct __sender { using sender_concept = sender_t; using __completions_t = completion_signatures< set_value_t(), // @@ -193,7 +191,7 @@ namespace STDEXEC { set_stopped_t() >; - explicit __task_sender(task_scheduler __sch) + explicit __sender(task_scheduler __sch) : __attrs_{std::move(__sch)} { } @@ -223,7 +221,7 @@ namespace STDEXEC { //! task_scheduler. Its set_value member stores the predecessor's values in the bulk //! operation state and then starts the bulk operation. template - struct __task_bulk_receiver { + struct __bulk_receiver { using receiver_concept = receiver_t; template @@ -261,7 +259,7 @@ namespace STDEXEC { return STDEXEC::get_env(__state_->__rcvr_); } - __task_bulk_state<_BulkTag, _Policy, _Fn, _Rcvr, _Values>* __state_; + __bulk_state<_BulkTag, _Policy, _Fn, _Rcvr, _Values>* __state_; }; //! Returns a visitor (callable) used to invoke the bulk (unchunked) function with the @@ -340,13 +338,13 @@ namespace STDEXEC { //! complete, set_value is called, which forwards the predecessor's values to the //! downstream receiver. template - struct __task_bulk_state + struct __bulk_state : __detail::__receiver_proxy_base< _Rcvr, system_context_replaceability::bulk_item_receiver_proxy > { - explicit __task_bulk_state(_Rcvr __rcvr, size_t __shape, _Fn __fn, __backend_ptr_t __backend) - : __task_bulk_state::__receiver_proxy_base(std::move(__rcvr)) + explicit __bulk_state(_Rcvr __rcvr, size_t __shape, _Fn __fn, __backend_ptr_t __backend) + : __bulk_state::__receiver_proxy_base(std::move(__rcvr)) , __fn_(std::move(__fn)) , __shape_(__shape) , __backend_(std::move(__backend)) { @@ -375,7 +373,7 @@ namespace STDEXEC { std::same_as<__policy_t, STDEXEC::parallel_policy> || std::same_as<__policy_t, STDEXEC::parallel_unsequenced_policy>; __visit( - __detail::__get_execute_bulk_fn<__parallelize>( + __task::__get_execute_bulk_fn<__parallelize>( _BulkTag(), __fn_, __shape_, __begin, __end), __values_); } @@ -386,7 +384,7 @@ namespace STDEXEC { private: template - friend struct __task_bulk_receiver; + friend struct __bulk_receiver; _Fn __fn_; size_t __shape_; @@ -398,10 +396,10 @@ namespace STDEXEC { //////////////////////////////////////////////////////////////////////////////////// // Operation state for task scheduler bulk operations template - struct __task_bulk_opstate { + struct __bulk_opstate { using operation_state_concept = operation_state_t; - explicit __task_bulk_opstate( + explicit __bulk_opstate( _Sndr&& __sndr, size_t __shape, _Fn __fn, @@ -422,18 +420,18 @@ namespace STDEXEC { __decayed_tuple, __mbind_front_q<__variant, __monostate>::__f >; - using __rcvr_t = __task_bulk_receiver<_BulkTag, _Policy, _Fn, _Rcvr, __values_t>; + using __rcvr_t = __bulk_receiver<_BulkTag, _Policy, _Fn, _Rcvr, __values_t>; using __opstate1_t = connect_result_t<_Sndr, __rcvr_t>; - __task_bulk_state<_BulkTag, _Policy, _Fn, _Rcvr, __values_t> __state_; + __bulk_state<_BulkTag, _Policy, _Fn, _Rcvr, __values_t> __state_; __opstate1_t __opstate1_; }; template - struct __task_bulk_sender { + struct __bulk_sender { using sender_concept = sender_t; - explicit __task_bulk_sender(_Sndr __sndr, task_scheduler __sch) + explicit __bulk_sender(_Sndr __sndr, task_scheduler __sch) : __sndr_(std::move(__sndr)) , __attrs_{std::move(__sch)} { } @@ -442,7 +440,7 @@ namespace STDEXEC { auto connect(_Rcvr __rcvr) && { auto& [__tag, __data, __child] = __sndr_; auto& [__pol, __shape, __fn] = __data; - return __task_bulk_opstate< + return __bulk_opstate< decltype(__tag), decltype(__pol), decltype(__child), @@ -455,8 +453,7 @@ namespace STDEXEC { std::move(__attrs_.__sched_.__backend_)}; } - template - requires __same_as<_Self, __task_bulk_sender> // accept only rvalues. + template <__same_as<__bulk_sender> _Self, class _Env> // accept only rvalues [[nodiscard]] static consteval auto get_completion_signatures() { // This calls get_completion_signatures on the wrapped bulk_[un]chunked sender. We @@ -559,16 +556,16 @@ namespace STDEXEC { >; __child_opstate_t __opstate_; }; - } // namespace __detail + } // namespace __task [[nodiscard]] - inline auto task_scheduler::schedule() const noexcept -> __detail::__task_sender { - return __detail::__task_sender{*this}; + inline auto task_scheduler::schedule() const noexcept -> __task::__sender { + return __task::__sender{*this}; } template class task_scheduler::__backend_for - : public __detail::__task_scheduler_backend + : public __task::__task_scheduler_backend , _Alloc { template friend struct __detail::__proxy_receiver; @@ -582,7 +579,7 @@ namespace STDEXEC { using __opstate_t = connect_result_t<_Sndr, __detail::__proxy_receiver<_RcvrProxy>>; const bool __in_situ = __storage.size() >= sizeof(__opstate_t); _Alloc& __alloc = *this; - auto& __opstate = __detail::__emplace_into<__detail::__opstate<_Alloc, _Sndr>>( + auto& __opstate = __task::__emplace_into<__task::__opstate<_Alloc, _Sndr>>( __storage, __alloc, __alloc, static_cast<_Sndr&&>(__sndr), __rcvr_proxy, __in_situ); STDEXEC::start(__opstate); } @@ -608,7 +605,7 @@ namespace STDEXEC { system_context_replaceability::bulk_item_receiver_proxy& __rcvr_proxy, std::span __storage) noexcept final { auto __sndr = STDEXEC::bulk_chunked( - STDEXEC::schedule(__sch_), par, __size, __detail::__bulk_chunked_fn{__rcvr_proxy}); + STDEXEC::schedule(__sch_), par, __size, __task::__bulk_chunked_fn{__rcvr_proxy}); __schedule(__rcvr_proxy, std::move(__sndr), __storage); } @@ -617,7 +614,7 @@ namespace STDEXEC { system_context_replaceability::bulk_item_receiver_proxy& __rcvr_proxy, std::span __storage) noexcept final { auto __sndr = STDEXEC::bulk_unchunked( - STDEXEC::schedule(__sch_), par, __size, __detail::__bulk_unchunked_fn{__rcvr_proxy}); + STDEXEC::schedule(__sch_), par, __size, __task::__bulk_unchunked_fn{__rcvr_proxy}); __schedule(__rcvr_proxy, std::move(__sndr), __storage); } diff --git a/include/stdexec/execution.hpp b/include/stdexec/execution.hpp index b11f7d575..5d4ea7aba 100644 --- a/include/stdexec/execution.hpp +++ b/include/stdexec/execution.hpp @@ -41,6 +41,7 @@ #include "__detail/__meta.hpp" #include "__detail/__on.hpp" #include "__detail/__operation_states.hpp" +#include "__detail/__parallel_scheduler.hpp" #include "__detail/__read_env.hpp" #include "__detail/__receiver_adaptor.hpp" #include "__detail/__receivers.hpp" diff --git a/include/tbbexec/tbb_thread_pool.hpp b/include/tbbexec/tbb_thread_pool.hpp index 37010f1da..fb47f6d9b 100644 --- a/include/tbbexec/tbb_thread_pool.hpp +++ b/include/tbbexec/tbb_thread_pool.hpp @@ -18,7 +18,13 @@ #include -#warning Deprecated header file, please include the header file instead and use the execpools::tbb_thread_pool class that is identical as tbbexec::thread_pool class. +#if STDEXEC_MSVC() +# pragma message( \ + "WARNING: Deprecated header file, please include the header file instead and use the execpools::tbb_thread_pool class that is identical as tbbexec::thread_pool class.") +#else +# warning \ + "Deprecated header file, please include the header file instead and use the execpools::tbb_thread_pool class that is identical as tbbexec::thread_pool class." +#endif namespace tbbexec { using tbb_thread_pool = execpools::tbb_thread_pool; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d5bc40c94..2069f0542 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -93,6 +93,18 @@ target_link_libraries(test.stdexec PRIVATE common_test_settings) +add_executable(test.system_context_replaceability + test_main.cpp + stdexec/schedulers/test_system_context_replaceability.cpp) +target_link_libraries(test.system_context_replaceability + PUBLIC + STDEXEC::stdexec + STDEXEC::system_context + stdexec_executable_flags + Catch2::Catch2 + PRIVATE + common_test_settings) + add_executable(test.scratch test_main.cpp test_scratch.cpp) target_link_libraries(test.scratch PUBLIC @@ -111,6 +123,9 @@ include(${icm_SOURCE_DIR}/icm_build_failure_testing.cmake) catch_discover_tests(test.stdexec) catch_discover_tests(test.scratch) +if(NOT STDEXEC_ENABLE_CUDA) + catch_discover_tests(test.system_context_replaceability) +endif() add_subdirectory(exec) diff --git a/test/exec/CMakeLists.txt b/test/exec/CMakeLists.txt index 16746de74..a9f14de74 100644 --- a/test/exec/CMakeLists.txt +++ b/test/exec/CMakeLists.txt @@ -64,7 +64,6 @@ set(exec_test_sources $<$:../execpools/test_tbb_thread_pool.cpp> $<$:../execpools/test_taskflow_thread_pool.cpp> $<$:../execpools/test_asio_thread_pool.cpp> - test_system_context.cpp $<$:test_libdispatch.cpp> test_unless_stop_requested.cpp ) @@ -81,21 +80,8 @@ target_link_libraries(test.exec PRIVATE common_test_settings) -add_executable(test.system_context_replaceability ../test_main.cpp test_system_context_replaceability.cpp) -target_link_libraries(test.system_context_replaceability - PUBLIC - STDEXEC::stdexec - STDEXEC::system_context - stdexec_executable_flags - Catch2::Catch2 - PRIVATE - common_test_settings) - # Discover the Catch2 test built by the application catch_discover_tests(test.exec) -if(NOT STDEXEC_ENABLE_CUDA) - catch_discover_tests(test.system_context_replaceability) -endif() # Test which parses a source file for an expected error message icm_add_build_failure_test( diff --git a/test/exec/test_system_context.cpp b/test/stdexec/schedulers/test_parallel_scheduler.cpp similarity index 78% rename from test/exec/test_system_context.cpp rename to test/stdexec/schedulers/test_parallel_scheduler.cpp index f0467dcab..db1ead33a 100644 --- a/test/exec/test_system_context.cpp +++ b/test/stdexec/schedulers/test_parallel_scheduler.cpp @@ -31,41 +31,41 @@ namespace ex = STDEXEC; namespace scr = ex::system_context_replaceability; -TEST_CASE("system_context can return a scheduler", "[types][system_scheduler]") { - auto sched = exec::get_parallel_scheduler(); +TEST_CASE("system_context can return a scheduler", "[scheduler][parallel_scheduler]") { + auto sched = STDEXEC::get_parallel_scheduler(); STATIC_REQUIRE(ex::scheduler); } -TEST_CASE("system scheduler is not default constructible", "[types][system_scheduler]") { - auto sched = exec::get_parallel_scheduler(); +TEST_CASE("system scheduler is not default constructible", "[scheduler][parallel_scheduler]") { + auto sched = STDEXEC::get_parallel_scheduler(); using sched_t = decltype(sched); STATIC_REQUIRE(!std::is_default_constructible_v); STATIC_REQUIRE(std::is_destructible_v); } -TEST_CASE("system scheduler is copyable and movable", "[types][system_scheduler]") { - auto sched = exec::get_parallel_scheduler(); +TEST_CASE("system scheduler is copyable and movable", "[scheduler][parallel_scheduler]") { + auto sched = STDEXEC::get_parallel_scheduler(); using sched_t = decltype(sched); STATIC_REQUIRE(std::is_copy_constructible_v); STATIC_REQUIRE(std::is_move_constructible_v); } -TEST_CASE("a copied scheduler is equal to the original", "[types][system_scheduler]") { - auto sched1 = exec::get_parallel_scheduler(); +TEST_CASE("a copied scheduler is equal to the original", "[scheduler][parallel_scheduler]") { + auto sched1 = STDEXEC::get_parallel_scheduler(); auto sched2 = sched1; REQUIRE(sched1 == sched2); } TEST_CASE( "two schedulers obtained from get_parallel_scheduler() are equal", - "[types][system_scheduler]") { - auto sched1 = exec::get_parallel_scheduler(); - auto sched2 = exec::get_parallel_scheduler(); + "[scheduler][parallel_scheduler]") { + auto sched1 = STDEXEC::get_parallel_scheduler(); + auto sched2 = STDEXEC::get_parallel_scheduler(); REQUIRE(sched1 == sched2); } -TEST_CASE("system scheduler can produce a sender", "[types][system_scheduler]") { - auto snd = ex::schedule(exec::get_parallel_scheduler()); +TEST_CASE("system scheduler can produce a sender", "[scheduler][parallel_scheduler]") { + auto snd = ex::schedule(STDEXEC::get_parallel_scheduler()); using sender_t = decltype(snd); STATIC_REQUIRE(ex::sender); @@ -73,16 +73,16 @@ TEST_CASE("system scheduler can produce a sender", "[types][system_scheduler]") STATIC_REQUIRE(ex::sender_of); } -TEST_CASE("trivial schedule task on system context", "[types][system_scheduler]") { - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); +TEST_CASE("trivial schedule task on system context", "[scheduler][parallel_scheduler]") { + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); ex::sync_wait(ex::schedule(sched)); } -TEST_CASE("simple schedule task on system context", "[types][system_scheduler]") { +TEST_CASE("simple schedule task on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); @@ -93,21 +93,21 @@ TEST_CASE("simple schedule task on system context", "[types][system_scheduler]") (void) snd; } -TEST_CASE("simple schedule forward progress guarantee", "[types][system_scheduler]") { - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); +TEST_CASE("simple schedule forward progress guarantee", "[scheduler][parallel_scheduler]") { + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); REQUIRE(ex::get_forward_progress_guarantee(sched) == ex::forward_progress_guarantee::parallel); } -TEST_CASE("get_completion_scheduler", "[types][system_scheduler]") { - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); +TEST_CASE("get_completion_scheduler", "[scheduler][parallel_scheduler]") { + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); REQUIRE(ex::get_completion_scheduler(ex::get_env(ex::schedule(sched))) == sched); } -TEST_CASE("simple chain task on system context", "[types][system_scheduler]") { +TEST_CASE("simple chain task on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; std::thread::id pool_id2{}; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); auto snd2 = ex::then(std::move(snd), [&] { pool_id2 = std::this_thread::get_id(); }); @@ -121,8 +121,8 @@ TEST_CASE("simple chain task on system context", "[types][system_scheduler]") { (void) snd2; } -TEST_CASE("checks stop_token before starting the work", "[types][system_scheduler]") { - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); +TEST_CASE("checks stop_token before starting the work", "[scheduler][parallel_scheduler]") { + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); exec::async_scope scope; scope.request_stop(); @@ -141,11 +141,11 @@ TEST_CASE("checks stop_token before starting the work", "[types][system_schedule REQUIRE_FALSE(called); } -TEST_CASE("simple bulk task on system context", "[types][system_scheduler]") { +TEST_CASE("simple bulk task on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); constexpr size_t num_tasks = 16; std::thread::id pool_ids[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk(ex::schedule(sched), ex::par, num_tasks, [&](size_t id) { pool_ids[id] = std::this_thread::get_id(); @@ -159,13 +159,13 @@ TEST_CASE("simple bulk task on system context", "[types][system_scheduler]") { } } -TEST_CASE("simple bulk chaining on system context", "[types][system_scheduler]") { +TEST_CASE("simple bulk chaining on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); constexpr size_t num_tasks = 16; std::thread::id pool_id{}; std::thread::id propagated_pool_ids[num_tasks]; std::thread::id pool_ids[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); @@ -194,11 +194,11 @@ TEST_CASE("simple bulk chaining on system context", "[types][system_scheduler]") CHECK(std::get<0>(res.value()) == pool_id); } -TEST_CASE("simple bulk_chunked task on system context", "[types][system_scheduler]") { +TEST_CASE("simple bulk_chunked task on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); constexpr unsigned long num_tasks = 16; std::thread::id pool_ids[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_chunked( ex::schedule(sched), ex::par, num_tasks, [&](unsigned long b, unsigned long e) { @@ -214,11 +214,11 @@ TEST_CASE("simple bulk_chunked task on system context", "[types][system_schedule } } -TEST_CASE("simple bulk_unchunked task on system context", "[types][system_scheduler]") { +TEST_CASE("simple bulk_unchunked task on system context", "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); constexpr size_t num_tasks = 16; std::thread::id pool_ids[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_unchunked(ex::schedule(sched), ex::par, num_tasks, [&](size_t id) { pool_ids[id] = std::this_thread::get_id(); @@ -234,11 +234,11 @@ TEST_CASE("simple bulk_unchunked task on system context", "[types][system_schedu TEST_CASE( "bulk_unchunked with seq will run everything on one thread", - "[types][system_scheduler]") { + "[scheduler][parallel_scheduler]") { std::thread::id this_id = std::this_thread::get_id(); constexpr size_t num_tasks = 16; std::thread::id pool_ids[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_unchunked(ex::schedule(sched), ex::seq, num_tasks, [&](size_t id) { pool_ids[id] = std::this_thread::get_id(); @@ -254,10 +254,10 @@ TEST_CASE( } } -TEST_CASE("bulk_chunked on parallel_scheduler performs chunking", "[types][system_scheduler]") { +TEST_CASE("bulk_chunked on parallel_scheduler performs chunking", "[scheduler][parallel_scheduler]") { std::atomic has_chunking = false; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, 10'000, [&](int b, int e) { if (e - b > 1) { has_chunking = true; @@ -270,11 +270,11 @@ TEST_CASE("bulk_chunked on parallel_scheduler performs chunking", "[types][syste TEST_CASE( "bulk_chunked on parallel_scheduler covers the entire range", - "[types][system_scheduler]") { + "[scheduler][parallel_scheduler]") { constexpr size_t num_tasks = 200; bool covered[num_tasks]; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::par, num_tasks, [&](size_t b, size_t e) { for (auto i = b; i < e; ++i) { @@ -290,11 +290,11 @@ TEST_CASE( TEST_CASE( "bulk_chunked with seq on parallel_scheduler doesn't do chunking", - "[types][system_scheduler]") { + "[scheduler][parallel_scheduler]") { constexpr size_t num_tasks = 200; std::atomic execution_count = 0; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto bulk_snd = ex::bulk_chunked(ex::schedule(sched), ex::seq, num_tasks, [&](size_t b, size_t e) { REQUIRE(b == 0); @@ -352,14 +352,14 @@ struct my_inline_scheduler_backend_impl : scr::parallel_scheduler_backend { TEST_CASE( "can change the implementation of system context at runtime", - "[types][system_scheduler]") { + "[scheduler][parallel_scheduler]") { static auto my_scheduler_backend = std::make_shared(); auto old_factory = scr::set_parallel_scheduler_backend( []() -> std::shared_ptr { return my_scheduler_backend; }); std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); @@ -375,7 +375,7 @@ TEST_CASE( TEST_CASE( "can change the implementation of system context at runtime, with an inline scheduler", - "[types][system_scheduler]") { + "[scheduler][parallel_scheduler]") { auto old_factory = scr::set_parallel_scheduler_backend( []() -> std::shared_ptr { return std::make_shared(); @@ -383,7 +383,7 @@ TEST_CASE( std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); @@ -394,7 +394,7 @@ TEST_CASE( (void) scr::set_parallel_scheduler_backend(old_factory); } -TEST_CASE("empty environment always returns nullopt for any query", "[types][system_scheduler]") { +TEST_CASE("empty environment always returns nullopt for any query", "[scheduler][parallel_scheduler]") { struct my_receiver : scr::receiver_proxy { void __query_env(ex::__type_index, ex::__type_index, void*) const noexcept override { } @@ -416,7 +416,7 @@ TEST_CASE("empty environment always returns nullopt for any query", "[types][sys REQUIRE(rcvr.try_query>(ex::get_allocator) == std::nullopt); } -TEST_CASE("environment with a stop token can expose its stop token", "[types][system_scheduler]") { +TEST_CASE("environment with a stop token can expose its stop token", "[scheduler][parallel_scheduler]") { struct my_receiver : ex::system_context_replaceability::receiver_proxy { void set_value() noexcept override { } diff --git a/test/exec/test_system_context_replaceability.cpp b/test/stdexec/schedulers/test_system_context_replaceability.cpp similarity index 86% rename from test/exec/test_system_context_replaceability.cpp rename to test/stdexec/schedulers/test_system_context_replaceability.cpp index 1428a0da6..ee56b0e43 100644 --- a/test/exec/test_system_context_replaceability.cpp +++ b/test/stdexec/schedulers/test_system_context_replaceability.cpp @@ -14,11 +14,16 @@ * limitations under the License. */ -#include +#include #include -#include #include +#include + +#if defined(STDEXEC_SYSTEM_CONTEXT_HEADER_ONLY) +# error This should be testing replacement of the system context with weak linking. +#endif + namespace ex = STDEXEC; namespace scr = ex::system_context_replaceability; @@ -41,7 +46,7 @@ namespace { } // namespace namespace STDEXEC::system_context_replaceability { - // Should replace the function defined in __system_context_default_impl.hpp + // Should replace the function defined in __system_context_default_impl_entry.hpp auto query_parallel_scheduler_backend() -> std::shared_ptr { return std::make_shared(); @@ -50,10 +55,10 @@ namespace STDEXEC::system_context_replaceability { TEST_CASE( "Check that we are using a replaced system context (with weak linking)", - "[system_scheduler][replaceability]") { + "[scheduler][parallel_scheduler][replaceability]") { std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; - exec::parallel_scheduler sched = exec::get_parallel_scheduler(); + STDEXEC::parallel_scheduler sched = STDEXEC::get_parallel_scheduler(); auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); });