Skip to content

Commit

Permalink
Futures attempt to execute threads directly if those have not started…
Browse files Browse the repository at this point in the history
… executing

- adding new API function hpx::threads::get_outer_self_id
  • Loading branch information
hkaiser committed Dec 8, 2022
1 parent 4341d92 commit 26b44d8
Show file tree
Hide file tree
Showing 52 changed files with 1,326 additions and 400 deletions.
1 change: 1 addition & 0 deletions .jenkins/cscs/env-clang-13.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS=ON"
configure_extra_options+=" -DHPX_WITH_COMPILER_WARNINGS_AS_ERRORS=ON"
configure_extra_options+=" -DHPX_WITH_SPINLOCK_DEADLOCK_DETECTION=ON"
configure_extra_options+=" -DHPX_WITH_UNITY_BUILD=ON"
configure_extra_options+=" -DHPX_COROUTINES_WITH_THREAD_SCHEDULE_HINT_RUNS_AS_CHILD=ON"

# enable extra counters to verify everything compiles
configure_extra_options+=" -DHPX_WITH_BACKGROUND_THREAD_COUNTERS=ON"
Expand Down
2 changes: 2 additions & 0 deletions .jenkins/lsu/env-gcc-9.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI_BACKEND=ibv"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

configure_extra_options+=" -DHPX_COROUTINES_WITH_THREAD_SCHEDULE_HINT_RUNS_AS_CHILD=ON"
7 changes: 4 additions & 3 deletions components/iostreams/src/server/output_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ namespace hpx { namespace iostreams { namespace server {
{ // {{{
// Perform the IO in another OS thread.
detail::buffer in(buf_in);
hpx::get_thread_pool("io_pool")->get_io_service().post(hpx::bind_front(
&output_stream::call_write_sync, this, locality_id, count,
std::ref(in), threads::thread_id_ref_type(threads::get_self_id())));
hpx::get_thread_pool("io_pool")->get_io_service().post(
util::bind_front(&output_stream::call_write_sync, this, locality_id,
count, std::ref(in),
threads::thread_id_ref_type(threads::get_outer_self_id())));

// Sleep until the worker thread wakes us up.
this_thread::suspend(threads::thread_schedule_state::suspended,
Expand Down
1 change: 0 additions & 1 deletion examples/quickstart/1d_wave_equation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ double wave(std::uint64_t t, std::uint64_t x)
}
else
{
double u_tminus_x = async<wave_action>(here, t - 2, x).get();
std::lock_guard<hpx::mutex> l(u[t][x].mtx);
u[t][x].u_value =
calculate_u_tplus_x(u_t_xplus, u_t_x, u_t_xminus, u_tminus_x);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ namespace hpx { namespace execution { namespace experimental {
{
} with_priority{};

template <>
struct is_scheduling_property<with_priority_t> : std::true_type
{
};

inline constexpr struct get_priority_t final
: hpx::functional::detail::tag_fallback<get_priority_t>
{
Expand All @@ -87,6 +92,11 @@ namespace hpx { namespace execution { namespace experimental {
{
} with_stacksize{};

template <>
struct is_scheduling_property<with_stacksize_t> : std::true_type
{
};

inline constexpr struct get_stacksize_t final
: hpx::functional::detail::tag_fallback<get_stacksize_t>
{
Expand All @@ -111,6 +121,11 @@ namespace hpx { namespace execution { namespace experimental {
{
} with_hint{};

template <>
struct is_scheduling_property<with_hint_t> : std::true_type
{
};

inline constexpr struct get_hint_t final
: hpx::functional::detail::tag_fallback<get_hint_t>
{
Expand All @@ -135,6 +150,11 @@ namespace hpx { namespace execution { namespace experimental {
{
} with_annotation{};

template <>
struct is_scheduling_property<with_annotation_t> : std::true_type
{
};

inline constexpr struct get_annotation_t final
: hpx::functional::detail::tag_fallback<get_annotation_t>
{
Expand Down
20 changes: 19 additions & 1 deletion libs/core/coroutines/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2020 The STE||AR-Group
# Copyright (c) 2019-2021 The STE||AR-Group
#
# SPDX-License-Identifier: BSL-1.0
# Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand All @@ -14,6 +14,23 @@ hpx_option(
MODULE COROUTINES
)

hpx_option(
HPX_COROUTINES_WITH_THREAD_SCHEDULE_HINT_RUNS_AS_CHILD
BOOL
"Futures attempt to run associated threads directly if those have not been started (default: OFF)"
OFF
CATEGORY "Thread Manager"
ADVANCED
MODULE COROUTINES
)

if(HPX_COROUTINES_WITH_THREAD_SCHEDULE_HINT_RUNS_AS_CHILD)
hpx_add_config_define_namespace(
DEFINE HPX_COROUTINES_HAVE_THREAD_SCHEDULE_HINT_RUNS_AS_CHILD
NAMESPACE COROUTINES
)
endif()

set(coroutines_headers
hpx/coroutines/coroutine.hpp
hpx/coroutines/coroutine_fwd.hpp
Expand All @@ -29,6 +46,7 @@ set(coroutines_headers
hpx/coroutines/detail/coroutine_impl.hpp
hpx/coroutines/detail/coroutine_self.hpp
hpx/coroutines/detail/coroutine_stackful_self.hpp
hpx/coroutines/detail/coroutine_stackful_self_direct.hpp
hpx/coroutines/detail/coroutine_stackless_self.hpp
hpx/coroutines/detail/get_stack_pointer.hpp
hpx/coroutines/detail/posix_utility.hpp
Expand Down
6 changes: 6 additions & 0 deletions libs/core/coroutines/include/hpx/coroutines/coroutine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ namespace hpx { namespace threads { namespace coroutines {
return impl_.result();
}

HPX_FORCEINLINE result_type invoke_directly(arg_type arg = arg_type())
{
HPX_ASSERT(impl_.is_ready());
return impl_.invoke_directly(arg);
}

bool is_ready() const
{
return impl_.is_ready();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ namespace hpx { namespace threads { namespace coroutines {
#endif
}

void reset_stack()
void reset_stack(bool direct_execution)
{
if (direct_execution)
return;

if (ctx_)
{
#if defined(HPX_USE_POSIX_STACK_UTILITIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,11 @@ namespace hpx::threads::coroutines::detail::lx {
return m_stack_size;
}

void reset_stack()
void reset_stack(bool direct_execution)
{
if (direct_execution)
return;

HPX_ASSERT(m_stack);
if (posix::reset_stack(
m_stack, static_cast<std::size_t>(m_stack_size)))
Expand All @@ -384,7 +387,11 @@ namespace hpx::threads::coroutines::detail::lx {

void rebind_stack()
{
HPX_ASSERT(m_stack);
// directly executed coroutine, no need to allocate
// a stack
if (m_stack == nullptr)
return;

#if defined(HPX_HAVE_COROUTINE_COUNTERS)
increment_stack_recycle_count();
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,11 @@ namespace hpx { namespace threads { namespace coroutines {
#endif
}

void reset_stack()
void reset_stack(bool direct_execution)
{
if (direct_execution)
return;

if (m_stack)
{
if (posix::reset_stack(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ namespace hpx { namespace threads { namespace coroutines {
return stacksize_;
}

constexpr void reset_stack() noexcept {}
constexpr void reset_stack(bool) noexcept {}

void rebind_stack() noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail {
// execute the coroutine using normal context switching
HPX_CORE_EXPORT void operator()() noexcept;

// execute the coroutine function directly in the context of the calling
// thread
HPX_CORE_EXPORT result_type invoke_directly(arg_type arg);

public:
void bind_result(result_type res)
{
Expand Down Expand Up @@ -116,16 +120,18 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail {
this->super_type::init();
}

void reset()
void reset(bool direct_execution)
{
// First reset the function and arguments
m_result =
result_type(thread_schedule_state::unknown, invalid_thread_id);
m_arg = nullptr;
m_fun.reset();

// Then reset the id and stack as they may be used by the
// destructors of the thread function above
this->super_type::reset();
this->reset_stack();
this->reset_stack(direct_execution);
}

void rebind(functor_type&& f, thread_id_type id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail {

virtual thread_id_type get_thread_id() const = 0;

virtual thread_id_type get_outer_thread_id() const
{
return get_thread_id();
}

virtual std::size_t get_thread_phase() const = 0;

virtual std::ptrdiff_t get_available_stack_space() = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail {
{
public:
explicit coroutine_stackful_self(
impl_type* pimpl, coroutine_self* next_self = nullptr)
coroutine_impl* pimpl, coroutine_self* next_self = nullptr)
: coroutine_self(next_self)
, pimpl_(pimpl)
{
Expand Down Expand Up @@ -133,6 +133,7 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail {
{
return pimpl_;
}

coroutine_impl* pimpl_;
};
}}}} // namespace hpx::threads::coroutines::detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2019-2021 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>
#include <hpx/assert.hpp>
#include <hpx/coroutines/detail/coroutine_accessor.hpp>
#include <hpx/coroutines/detail/coroutine_impl.hpp>
#include <hpx/coroutines/detail/coroutine_stackful_self.hpp>
#include <hpx/coroutines/thread_enums.hpp>
#include <hpx/coroutines/thread_id_type.hpp>

#include <cstddef>
#include <limits>
#include <utility>

namespace hpx { namespace threads { namespace coroutines { namespace detail {

class coroutine_stackful_self_direct : public coroutine_stackful_self
{
public:
explicit coroutine_stackful_self_direct(
coroutine_impl* pimpl, coroutine_self* next_self)
: coroutine_stackful_self(pimpl, next_self)
, next_self_(next_self)
{
HPX_ASSERT(next_self_);
}

// direct execution of a thread needs to use the executing context for
// yielding
arg_type yield_impl(result_type arg) override
{
return next_self_->yield_impl(arg);
}

thread_id_type get_outer_thread_id() const override
{
return next_self_->get_outer_thread_id();
}

#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
std::size_t get_thread_phase() const override
{
return next_self_->get_thread_phase();
}
#endif

#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
// return the executing thread's available stack space
std::ptrdiff_t get_available_stack_space() override
{
return next_self_->get_available_stack_space();
}
#endif

// return the executing thread's recursion count
std::size_t& get_continuation_recursion_count() override
{
return next_self_->get_continuation_recursion_count();
}

private:
// if we chain direct calls the executing thread needs to be inherited
// down
coroutine_impl* get_impl() override
{
return coroutine_accessor::get_impl(*next_self_);
}

coroutine_self* next_self_;
};
}}}} // namespace hpx::threads::coroutines::detail
Loading

0 comments on commit 26b44d8

Please sign in to comment.