Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ include(CheckIncludeFileCXX)
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
CHECK_INCLUDE_FILE_CXX("dispatch/dispatch.h" STDEXEC_FOUND_LIBDISPATCH)
option(STDEXEC_ENABLE_LIBDISPATCH "Enable the tests for the Grand Central Dispatch scheduler" ${STDEXEC_FOUND_LIBDISPATCH})
target_compile_definitions(stdexec INTERFACE STDEXEC_ENABLE_LIBDISPATCH)
endif()

option (STDEXEC_ENABLE_NUMA "Enable NUMA affinity for static_thread_pool" OFF)
Expand Down
119 changes: 79 additions & 40 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
#include "__system_context_replaceability_api.hpp"
#include "stdexec/execution.hpp"
#include "exec/static_thread_pool.hpp"
#if STDEXEC_ENABLE_LIBDISPATCH
# include "exec/libdispatch_queue.hpp"
#endif

#include <thread>
#include <atomic>

namespace exec::__system_context_default_impl {
using namespace stdexec::tags;
using system_context_replaceability::receiver;
using system_context_replaceability::bulk_item_receiver;
using system_context_replaceability::storage;
using system_context_replaceability::system_scheduler;
using system_context_replaceability::__system_context_replaceability;

using __pool_scheduler_t = decltype(std::declval<exec::static_thread_pool>().get_scheduler());
using system_context_replaceability::__system_context_backend_factory;

/// Receiver that calls the callback when the operation completes.
template <class _Sender>
Expand All @@ -53,6 +57,8 @@ namespace exec::__system_context_default_impl {
---------------------
Total: 152; extra 24 bytes compared to internal operation state.

Using libdispatch backend, the operation sizes are 48 (down from 80) and 128 (down from 160).

[*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations.
*/

Expand Down Expand Up @@ -89,6 +95,12 @@ namespace exec::__system_context_default_impl {
__op->__destruct(); // destroys the operation, including `this`.
__r->set_stopped();
}

decltype(auto) get_env() const noexcept {
auto __o = __r_->try_query<stdexec::inplace_stop_token>();
stdexec::inplace_stop_token __st = __o ? *__o : stdexec::inplace_stop_token{};
return stdexec::prop{stdexec::get_stop_token, __st};
}
};

/// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment.
Expand Down Expand Up @@ -145,13 +157,16 @@ namespace exec::__system_context_default_impl {
}
};

struct __system_scheduler_impl : system_scheduler {
__system_scheduler_impl()
template <typename _BaseSchedulerContext>
struct __system_scheduler_generic_impl : system_scheduler {
__system_scheduler_generic_impl()
: __pool_scheduler_(__pool_.get_scheduler()) {
}
private:
using __pool_scheduler_t = decltype(std::declval<_BaseSchedulerContext>().get_scheduler());

/// The underlying thread pool.
exec::static_thread_pool __pool_;
_BaseSchedulerContext __pool_;
__pool_scheduler_t __pool_scheduler_;

//! Functor called by the `bulk` operation; sends a `start` signal to the frontend.
Expand Down Expand Up @@ -184,7 +199,8 @@ namespace exec::__system_context_default_impl {
}

void
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept override {
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept
override {
try {
auto __sndr =
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
Expand All @@ -197,51 +213,74 @@ namespace exec::__system_context_default_impl {
}
};

/// Keeps track of the object implementing the system context interfaces.
struct __instance_holder {
/// Keeps track of the backends for the system context interfaces.
template <typename _Interface, typename _Impl>
struct __instance_data {
/// Gets the current instance; if there is no instance, uses the current factory to create one.
std::shared_ptr<_Interface> __get_current_instance() {
// If we have a valid instance, return it.
__acquire_instance_lock();
auto __r = __instance_;
__release_instance_lock();
if (__r) {
return __r;
}

/// Get the only instance of this class.
static __instance_holder& __singleton() {
static __instance_holder __this_instance_;
return __this_instance_;
}
// Otherwise, create a new instance using the factory.
// Note: we are lazy-loading the instance to avoid creating it if it is not needed.
auto __new_instance = __factory_.load(std::memory_order_relaxed)();

/// Get the currently selected system context object.
system_scheduler* __get_current_instance() const noexcept {
return __current_instance_;
// Store the newly created instance.
__acquire_instance_lock();
__instance_ = __new_instance;
__release_instance_lock();
return __new_instance;
}

/// Allows changing the currently selected system context object; used for testing.
void __set_current_instance(system_scheduler* __instance) noexcept {
__current_instance_ = __instance;
/// Set `__new_factory` as the new factory for `_Interface` and return the old one.
__system_context_backend_factory<_Interface>
__set_backend_factory(__system_context_backend_factory<_Interface> __new_factory) {
// Replace the factory, keeping track of the old one.
auto __old_factory = __factory_.exchange(__new_factory);
// Create a new instance with the new factory.
auto __new_instance = __new_factory();
// Replace the current instance with the new one.
__acquire_instance_lock();
auto __old_instance = std::exchange(__instance_, __new_instance);
__release_instance_lock();
// Make sure to delete the old instance after releasing the lock.
__old_instance.reset();
return __old_factory;
}

private:
__instance_holder() {
static __system_scheduler_impl __default_instance_;
__current_instance_ = &__default_instance_;
}
std::atomic<bool> __instance_locked_{false};
std::shared_ptr<_Interface> __instance_{nullptr};
std::atomic<__system_context_backend_factory<_Interface>> __factory_{__default_factory};

system_scheduler* __current_instance_;
};
/// The default factory returns an instance of `_Impl`.
static std::shared_ptr<_Interface> __default_factory() {
return std::make_shared<_Impl>();
}

struct __system_context_replaceability_impl : __system_context_replaceability {
//! Globally replaces the system scheduler backend.
//! This needs to be called within `main()` and before the system scheduler is accessed.
void __set_system_scheduler(system_scheduler* __backend) noexcept override {
__instance_holder::__singleton().__set_current_instance(__backend);
void __acquire_instance_lock() {
while (__instance_locked_.exchange(true, std::memory_order_acquire)) {
// Spin until we acquire the lock.
}
}
void __release_instance_lock() {
__instance_locked_.store(false, std::memory_order_release);
}
};

inline void* __default_query_system_context_interface(const __uuid& __id) noexcept {
if (__id == system_scheduler::__interface_identifier) {
return __instance_holder::__singleton().__get_current_instance();
} else if (__id == __system_context_replaceability::__interface_identifier) {
static __system_context_replaceability_impl __impl;
return &__impl;
}
#if STDEXEC_ENABLE_LIBDISPATCH
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::libdispatch_queue>;
#else
using __system_scheduler_impl = __system_scheduler_generic_impl<exec::static_thread_pool>;
#endif

return nullptr;
}
/// The singleton to hold the `system_scheduler` instance.
inline constinit __instance_data<system_scheduler, __system_scheduler_impl>
__system_scheduler_singleton{};

} // namespace exec::__system_context_default_impl
39 changes: 31 additions & 8 deletions include/exec/__detail/__system_context_default_impl_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,36 @@

#include "__system_context_default_impl.hpp" // IWYU pragma: keep

STDEXEC_PRAGMA_PUSH()
STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes") // warning: inline function '[...]' declared weak
#define __STDEXEC_SYSTEM_CONTEXT_API extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak))

/// Gets the default system context implementation.
extern STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) void*
__query_system_context_interface(const __uuid& __id) noexcept {
return exec::__system_context_default_impl::__default_query_system_context_interface(__id);
}
namespace exec::system_context_replaceability {
/// The default implementation of the `query_system_context` function template.
template <__queryable_interface _Interface>
__STDEXEC_SYSTEM_CONTEXT_API std::shared_ptr<_Interface> query_system_context() {
return {};
}

STDEXEC_PRAGMA_POP()
/// The default specialization of `query_system_context` for `system_scheduler`.
template <>
std::shared_ptr<system_scheduler> query_system_context<system_scheduler>() {
return __system_context_default_impl::__system_scheduler_singleton.__get_current_instance();
}

/// The default implementation of the `set_system_context_backend_factory` function template.
template <__queryable_interface _Interface>
__STDEXEC_SYSTEM_CONTEXT_API __system_context_backend_factory<_Interface>
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory) {
return nullptr;
}

/// The default specialization of `set_system_context_backend_factory` for `system_scheduler`.
template <>
__system_context_backend_factory<system_scheduler>
set_system_context_backend_factory<system_scheduler>(
__system_context_backend_factory<system_scheduler> __new_factory) {
return __system_context_default_impl::__system_scheduler_singleton.__set_backend_factory(
__new_factory);
}


} // namespace exec::system_context_replaceability
72 changes: 53 additions & 19 deletions include/exec/__detail/__system_context_replaceability_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
#ifndef STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H
#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H

#include "stdexec/__detail/__execution_fwd.hpp"

#include <cstdint>
#include <exception>
#include <optional>
#include <memory>

struct __uuid {
std::uint64_t __parts1;
Expand All @@ -27,39 +31,78 @@ struct __uuid {
friend bool operator==(__uuid, __uuid) noexcept = default;
};

/// Implementation-defined mechanism of querying a system context interface identified by `__id`.
extern void* __query_system_context_interface(const __uuid& __id) noexcept;

namespace exec::system_context_replaceability {

//! Helper for the `__queryable_interface` concept.
/// Helper for the `__queryable_interface` concept.
template <__uuid X>
using __check_constexpr_uuid = void;

//! Concept for a queryable interface. Ensures that the interface has a `__interface_identifier`
//! member.
/// Concept for a queryable interface. Ensures that the interface has a `__interface_identifier` member.
template <typename _T>
concept __queryable_interface =
requires() { typename __check_constexpr_uuid<_T::__interface_identifier>; };

/// The details for making `_T` a runtime property.
template <typename _T>
struct __runtime_property_helper {
/// Is `_T` a property?
static constexpr bool __is_property = false;
/// The unique identifier for the property.
static constexpr __uuid __property_identifier{0, 0};
};

/// `inplace_stope_token` is a runtime property.
template <>
struct __runtime_property_helper<stdexec::inplace_stop_token> {
static constexpr bool __is_property = true;
static constexpr __uuid __property_identifier{0x8779c09d8aa249df, 0x867db0e653202604};
};

/// Concept for a runtime property.
template <typename _T>
concept __runtime_property = __runtime_property_helper<_T>::__is_property;

/// Query the system context for an interface of type `_Interface`.
template <__queryable_interface _Interface>
inline _Interface* query_system_context() {
return static_cast<_Interface*>(
__query_system_context_interface(_Interface::__interface_identifier));
}
extern std::shared_ptr<_Interface> query_system_context();

/// The type of a factory that can create interfaces of type `_Interface`.
template <__queryable_interface _Interface>
using __system_context_backend_factory = std::shared_ptr<_Interface> (*)();

/// Sets the factory that creates the system context backend for an interface of type `_Interface`.
template <__queryable_interface _Interface>
extern __system_context_backend_factory<_Interface>
set_system_context_backend_factory(__system_context_backend_factory<_Interface> __new_factory);

/// Interface for completing a sender operation. Backend will call frontend though this interface
/// for completing the `schedule` and `schedule_bulk` operations.
struct receiver {
virtual ~receiver() = default;

protected:
virtual bool __query_env(__uuid, void*) noexcept = 0;

public:
/// Called when the system scheduler completes successfully.
virtual void set_value() noexcept = 0;
/// Called when the system scheduler completes with an error.
virtual void set_error(std::exception_ptr) noexcept = 0;
/// Called when the system scheduler was stopped.
virtual void set_stopped() noexcept = 0;

/// Query the receiver for a property of type `_P`.
template <typename _P>
std::optional<std::decay_t<_P>> try_query() noexcept {
if constexpr (__runtime_property<_P>) {
std::decay_t<_P> __p;
bool __success =
__query_env(__runtime_property_helper<std::decay_t<_P>>::__property_identifier, &__p);
return __success ? std::make_optional(std::move(__p)) : std::nullopt;
} else {
return std::nullopt;
}
}
};

/// Receiver for bulk sheduling operations.
Expand Down Expand Up @@ -90,15 +133,6 @@ namespace exec::system_context_replaceability {
bulk_schedule(std::uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0;
};

/// Implementation-defined mechanism for replacing the system scheduler backend at run-time.
struct __system_context_replaceability {
static constexpr __uuid __interface_identifier{0xc008a3be3bb9284b, 0xb98edb3a740ee02c};

/// Globally replaces the system scheduler backend.
/// This needs to be called within `main()` and before the system scheduler is accessed.
virtual void __set_system_scheduler(system_scheduler*) noexcept = 0;
};

} // namespace exec::system_context_replaceability

#endif
Loading
Loading