Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
324921f
Minor fixes
lucteo Jan 5, 2025
24f0567
Use generic functions for replaceability.
lucteo Jan 5, 2025
ff27546
Add support for runtime environment to be passed to the backend.
lucteo Jan 5, 2025
cb412ab
Allow inplace_stop_token to be passed to system context backend.
lucteo Jan 5, 2025
7ef6c8d
Implement replaceability with a Phoenix singleton.
lucteo Jan 6, 2025
d174c7e
Use libdispatch backend if available.
lucteo Jan 7, 2025
263c854
Improve handling of type-erased receiver environment.
lucteo Jan 11, 2025
d7498f8
Merge branch 'main' into syssched-R6-work
ericniebler Jan 25, 2025
19c916f
Fix include problem.
lucteo Mar 3, 2025
0242420
Merge remote-tracking branch 'upstream/main' into syssched-R6-work
lucteo Mar 3, 2025
391d2fd
Rename the backend `system_scheduler` into `parallel_scheduler_backend`.
lucteo Mar 3, 2025
33411c4
Replace `query_system_context` template with `query_parallel_schedule…
lucteo Mar 3, 2025
7e6f9dc
Rename `get_system_scheduler` -> `get_parallel_scheduler`.
lucteo Mar 3, 2025
8a113ed
Various small fixes to align with LEWG feedback.
lucteo Mar 3, 2025
b793a00
Use a spin lock instead of a mutex in `__instance_data`
lucteo Mar 11, 2025
1ca1f3f
Merge remote-tracking branch 'upstream/main' into syssched-R6-work
lucteo Mar 18, 2025
5c7a4a3
Merge branch 'syssched-R6-work' into parallel-scheduler
lucteo Mar 21, 2025
5b0fb9f
Merge branch 'main-before-reformat' into parallel-scheduler
lucteo Mar 21, 2025
4a6e213
Merge remote-tracking branch 'upstream/main' into parallel-scheduler
lucteo Mar 21, 2025
e3df79c
Apply clang-tidy. Add `get_system_scheduler` as deprecated.
lucteo Mar 22, 2025
9a75926
Merge branch 'main' into parallel-scheduler
lucteo Mar 22, 2025
160695b
Merge remote-tracking branch 'origin/main' into parallel-scheduler
ericniebler Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ namespace exec::__system_context_default_impl {
using namespace stdexec::tags;
using system_context_replaceability::receiver;
using system_context_replaceability::bulk_item_receiver;
using system_context_replaceability::storage;
using system_context_replaceability::system_scheduler;
using system_context_replaceability::__system_context_backend_factory;
using system_context_replaceability::parallel_scheduler_backend;
using system_context_replaceability::__parallel_scheduler_backend_factory;

/// Receiver that calls the callback when the operation completes.
template <class _Sender>
Expand Down Expand Up @@ -95,25 +94,27 @@ namespace exec::__system_context_default_impl {
__r->set_stopped();
}

decltype(auto) get_env() const noexcept {
[[nodiscard]]
auto get_env() const noexcept -> decltype(auto) {
auto __o = __r_->try_query<stdexec::inplace_stop_token>();
stdexec::inplace_stop_token __st = __o ? *__o : stdexec::inplace_stop_token{};
return stdexec::prop{stdexec::get_stop_token, __st};
}
};

/// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment.
inline auto __ensure_alignment(storage __storage, size_t __alignment) noexcept -> storage {
auto __pn = reinterpret_cast<uintptr_t>(__storage.__data);
inline auto __ensure_alignment(std::span<std::byte> __storage, size_t __alignment) noexcept
-> std::span<std::byte> {
auto __pn = reinterpret_cast<uintptr_t>(__storage.data());
if (__pn % __alignment == 0) {
return __storage;
} else if (__storage.__size < __alignment) {
return {nullptr, 0};
} else if (__storage.size() < __alignment) {
return {};
} else {
auto __new_pn = (__pn + __alignment - 1) & ~(__alignment - 1);
return {
reinterpret_cast<void*>(__new_pn),
static_cast<uint32_t>(__storage.__size - (__new_pn - __pn))};
reinterpret_cast<std::byte*>(__new_pn),
static_cast<size_t>(__storage.size() - (__new_pn - __pn))};
}
}

Expand All @@ -125,13 +126,14 @@ namespace exec::__system_context_default_impl {
bool __on_heap_;

/// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation.
static auto __construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr)
-> __operation* {
static auto
__construct_maybe_alloc(std::span<std::byte> __storage, receiver* __completion, _Sender __sndr)
-> __operation* {
__storage = __ensure_alignment(__storage, alignof(__operation));
if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) {
if (__storage.data() == nullptr || __storage.size() < sizeof(__operation)) {
return new __operation(std::move(__sndr), __completion, true);
} else {
return new (__storage.__data) __operation(std::move(__sndr), __completion, false);
return new (__storage.data()) __operation(std::move(__sndr), __completion, false);
}
}

Expand All @@ -157,8 +159,8 @@ namespace exec::__system_context_default_impl {
};

template <typename _BaseSchedulerContext>
struct __system_scheduler_generic_impl : system_scheduler {
__system_scheduler_generic_impl()
struct __generic_impl : parallel_scheduler_backend {
__generic_impl()
: __pool_scheduler_(__pool_.get_scheduler()) {
}
private:
Expand All @@ -173,7 +175,7 @@ namespace exec::__system_context_default_impl {
bulk_item_receiver* __r_;

void operator()(unsigned long __idx) const noexcept {
__r_->start(static_cast<uint32_t>(__idx));
__r_->execute(static_cast<uint32_t>(__idx));
}
};

Expand All @@ -186,28 +188,29 @@ namespace exec::__system_context_default_impl {
std::declval<__bulk_functor>()))>;

public:
void schedule(storage __storage, receiver* __r) noexcept override {
void schedule(std::span<std::byte> __storage, receiver& __r) noexcept override {
try {
auto __sndr = stdexec::schedule(__pool_scheduler_);
auto __os =
__schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
__schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
__os->start();
} catch (std::exception& __e) {
__r->set_error(std::current_exception());
__r.set_error(std::current_exception());
}
}

void
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept
override {
void bulk_schedule(
uint32_t __size,
std::span<std::byte> __storage,
bulk_item_receiver& __r) noexcept override {
try {
auto __sndr =
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{&__r});
auto __os =
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, &__r, std::move(__sndr));
__os->start();
} catch (std::exception& __e) {
__r->set_error(std::current_exception());
__r.set_error(std::current_exception());
}
}
};
Expand Down Expand Up @@ -237,8 +240,8 @@ namespace exec::__system_context_default_impl {
}

/// Set `__new_factory` as the new factory for `_Interface` and return the old one.
auto
__set_backend_factory(__system_context_backend_factory<_Interface> __new_factory) -> __system_context_backend_factory<_Interface> {
auto __set_backend_factory(__parallel_scheduler_backend_factory __new_factory)
-> __parallel_scheduler_backend_factory {
// Replace the factory, keeping track of the old one.
auto __old_factory = __factory_.exchange(__new_factory);
// Create a new instance with the new factory.
Expand All @@ -255,10 +258,10 @@ namespace exec::__system_context_default_impl {
private:
std::atomic<bool> __instance_locked_{false};
std::shared_ptr<_Interface> __instance_{nullptr};
std::atomic<__system_context_backend_factory<_Interface>> __factory_{__default_factory};
std::atomic<__parallel_scheduler_backend_factory> __factory_{__default_factory};

/// The default factory returns an instance of `_Impl`.
static std::shared_ptr<_Interface> __default_factory() {
static auto __default_factory() -> std::shared_ptr<_Interface> {
return std::make_shared<_Impl>();
}

Expand All @@ -267,19 +270,20 @@ namespace exec::__system_context_default_impl {
// Spin until we acquire the lock.
}
}

void __release_instance_lock() {
__instance_locked_.store(false, std::memory_order_release);
}
};

#if STDEXEC_ENABLE_LIBDISPATCH
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::libdispatch_queue>;
using __parallel_scheduler_backend_impl = __generic_impl<exec::libdispatch_queue>;
#else
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::static_thread_pool>;
using __parallel_scheduler_backend_impl = __generic_impl<exec::static_thread_pool>;
#endif

/// The singleton to hold the `system_scheduler` instance.
inline constinit __instance_data<system_scheduler, __system_scheduler_impl>
__system_scheduler_singleton{};
/// The singleton to hold the `parallel_scheduler_backend` instance.
inline constinit __instance_data<parallel_scheduler_backend, __parallel_scheduler_backend_impl>
__parallel_scheduler_backend_singleton{};

} // namespace exec::__system_context_default_impl
37 changes: 12 additions & 25 deletions include/exec/__detail/__system_context_default_impl_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,21 @@
#define __STDEXEC_SYSTEM_CONTEXT_API extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak))

namespace exec::system_context_replaceability {
/// The default implementation of the `query_system_context` function template.
template <__queryable_interface _Interface>
__STDEXEC_SYSTEM_CONTEXT_API auto query_system_context() -> std::shared_ptr<_Interface> {
return {};
}

/// The default specialization of `query_system_context` for `system_scheduler`.
template <>
auto query_system_context<system_scheduler>() -> std::shared_ptr<system_scheduler> {
return __system_context_default_impl::__system_scheduler_singleton.__get_current_instance();
}

/// The default implementation of the `set_system_context_backend_factory` function template.
template <__queryable_interface _Interface>
__STDEXEC_SYSTEM_CONTEXT_API auto
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory)
-> __system_context_backend_factory<_Interface> {
return nullptr;
/// Get the backend for the parallel scheduler.
/// Users might replace this function.
auto query_parallel_scheduler_backend() -> std::shared_ptr<parallel_scheduler_backend> {
return __system_context_default_impl::__parallel_scheduler_backend_singleton
.__get_current_instance();
}

/// The default specialization of `set_system_context_backend_factory` for `system_scheduler`.
template <>
auto set_system_context_backend_factory<system_scheduler>(
__system_context_backend_factory<system_scheduler> __new_factory)
-> __system_context_backend_factory<system_scheduler> {
return __system_context_default_impl::__system_scheduler_singleton.__set_backend_factory(
__new_factory);
/// Set a factory for the parallel scheduler backend.
/// Can be used to replace the parallel scheduler at runtime.
/// Out of spec.
auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
-> __parallel_scheduler_backend_factory {
return __system_context_default_impl::__parallel_scheduler_backend_singleton
.__set_backend_factory(__new_factory);
}


} // namespace exec::system_context_replaceability
53 changes: 26 additions & 27 deletions include/exec/__detail/__system_context_replaceability_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include "stdexec/__detail/__execution_fwd.hpp"

#include <cstdint>
#include <cstddef>
#include <exception>
#include <optional>
#include <memory>
#include <span>

struct __uuid {
std::uint64_t __parts1;
Expand Down Expand Up @@ -62,19 +64,21 @@ namespace exec::system_context_replaceability {
template <typename _T>
concept __runtime_property = __runtime_property_helper<_T>::__is_property;

/// Query the system context for an interface of type `_Interface`.
template <__queryable_interface _Interface>
extern auto query_system_context() -> std::shared_ptr<_Interface>;
struct parallel_scheduler_backend;

/// The type of a factory that can create interfaces of type `_Interface`.
template <__queryable_interface _Interface>
using __system_context_backend_factory = std::shared_ptr<_Interface> (*)();
/// Get the backend for the parallel scheduler.
/// Users might replace this function.
auto query_parallel_scheduler_backend() -> std::shared_ptr<parallel_scheduler_backend>;

/// Sets the factory that creates the system context backend for an interface of type `_Interface`.
template <__queryable_interface _Interface>
extern auto
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory)
-> __system_context_backend_factory<_Interface>;
/// The type of a factory that can create `parallel_scheduler_backend` instances.
/// Out of spec.
using __parallel_scheduler_backend_factory = std::shared_ptr<parallel_scheduler_backend> (*)();

/// Set a factory for the parallel scheduler backend.
/// Can be used to replace the parallel scheduler at runtime.
/// Out of spec.
auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
-> __parallel_scheduler_backend_factory;

/// Interface for completing a sender operation. Backend will call frontend though this interface
/// for completing the `schedule` and `schedule_bulk` operations.
Expand Down Expand Up @@ -109,29 +113,24 @@ namespace exec::system_context_replaceability {
/// Receiver for bulk sheduling operations.
struct bulk_item_receiver : receiver {
/// Called for each item of a bulk operation, possible on different threads.
virtual void start(std::uint32_t) noexcept = 0;
};

/// Describes a storage space.
/// Used to pass preallocated storage from the frontend to the backend.
struct storage {
void* __data;
std::uint32_t __size;
virtual void execute(std::uint32_t) noexcept = 0;
};

/// Interface for the system scheduler
struct system_scheduler {
/// Interface for the parallel scheduler backend.
struct parallel_scheduler_backend {
static constexpr __uuid __interface_identifier{0x5ee9202498c4bd4f, 0xa1df2508ffcd9d7e};

virtual ~system_scheduler() = default;
virtual ~parallel_scheduler_backend() = default;

/// Schedule work on system scheduler, calling `__r` when done and using `__s` for preallocated
/// Schedule work on parallel scheduler, calling `__r` when done and using `__s` for preallocated
/// memory.
virtual void schedule(storage __s, receiver* __r) noexcept = 0;
/// Schedule bulk work of size `__n` on system scheduler, calling `__r` for each item and then
virtual void schedule(std::span<std::byte> __s, receiver& __r) noexcept = 0;
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item and then
/// when done, and using `__s` for preallocated memory.
virtual void
bulk_schedule(std::uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0;
virtual void bulk_schedule(
std::uint32_t __n,
std::span<std::byte> __s,
bulk_item_receiver& __r) noexcept = 0;
};

} // namespace exec::system_context_replaceability
Expand Down
Loading