Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleaning up coroutine implementation #3126

Merged
merged 1 commit into from Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 7 additions & 11 deletions hpx/runtime/actions/action_invoke_no_more_than.hpp
Expand Up @@ -15,7 +15,7 @@
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/traits/is_future.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/static.hpp>

#include <memory>
Expand Down Expand Up @@ -83,24 +83,22 @@ namespace hpx { namespace actions { namespace detail
// If the action returns something which is not a future, we inject
// a semaphore into the call graph.
static threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
typedef typename construct_semaphore_type::semaphore_type
semaphore_type;

signal_on_exit<semaphore_type> on_exit(
construct_semaphore_type::get_sem());
return f(state);
return f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::false_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand All @@ -109,20 +107,18 @@ namespace hpx { namespace actions { namespace detail
// If the action returns a future we wait on the semaphore as well,
// however it will be signaled once the future gets ready only.
static threads::thread_result_type thread_function_future(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
construct_semaphore_type::get_sem().wait();
return f(state);
return f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::true_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function_future),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function_future,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/basic_action.hpp
Expand Up @@ -97,7 +97,7 @@ namespace hpx { namespace actions
{}

HPX_FORCEINLINE threads::thread_result_type
operator()(threads::thread_state_ex_enum)
operator()()
{
LTM_(debug)
<< "Executing " << Action::get_action_name(lva_)
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/executor_component.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace components
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f(hpx::threads::wait_signaled);
f();
}

/// This is the default hook implementation for schedule_thread which
Expand Down
19 changes: 7 additions & 12 deletions hpx/runtime/components/server/locking_hook.hpp
Expand Up @@ -11,8 +11,8 @@
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/threads/coroutines/coroutine.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/unlock_guard.hpp>

Expand Down Expand Up @@ -55,17 +55,15 @@ namespace hpx { namespace components
static threads::thread_function_type
decorate_action(naming::address::address_type lva, F && f)
{
return util::bind(
util::one_shot(&locking_hook::thread_function),
return util::deferred_call(&locking_hook::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)));
}

protected:
typedef util::function_nonser<
threads::thread_arg_type(threads::thread_result_type)
void(threads::thread_result_type)
> yield_decorator_type;

struct decorate_wrapper
Expand All @@ -87,7 +85,7 @@ namespace hpx { namespace components
// Execute the wrapped action. This locks the mutex ensuring a thread
// safe action invocation.
threads::thread_result_type thread_function(
threads::thread_arg_type state, threads::thread_function_type f)
threads::thread_function_type f)
{
threads::thread_result_type result(threads::unknown,
threads::invalid_thread_id);
Expand All @@ -109,7 +107,7 @@ namespace hpx { namespace components
decorate_wrapper yield_decorator(
util::bind_front(&locking_hook::yield_function, this));

result = f(state);
result = f();

(void)yield_decorator; // silence gcc warnings
}
Expand All @@ -133,24 +131,21 @@ namespace hpx { namespace components

// The yield decorator unlocks the mutex and calls the system yield
// which gives up control back to the thread manager.
threads::thread_arg_type yield_function(threads::thread_result_type state)
void yield_function(threads::thread_result_type state)
{
// We un-decorate the yield function as the lock handling may
// suspend, which causes an infinite recursion otherwise.
undecorate_wrapper yield_decorator;
threads::thread_arg_type result = threads::wait_unknown;

{
util::unlock_guard<mutex_type> ul(mtx_);
result = threads::get_self().yield_impl(state);
threads::get_self().yield_impl(state);
}

// Re-enable ignoring the lock on the mutex above (this
// information is lost in the lock tracking tables once a mutex is
// unlocked).
util::ignore_lock(&mtx_);

return result;
}

private:
Expand Down
9 changes: 3 additions & 6 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -16,7 +16,7 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>

#include <cstdint>
#include <mutex>
Expand Down Expand Up @@ -167,10 +167,8 @@ namespace hpx { namespace components
// Make sure we pin the component at construction of the bound object
// which will also unpin it once the thread runs to completion (the
// bound object goes out of scope).
return util::bind(
util::one_shot(&migration_support::thread_function),
return util::deferred_call(&migration_support::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)),
components::pinned_ptr::create<this_component_type>(lva));
Expand All @@ -193,11 +191,10 @@ namespace hpx { namespace components
// Execute the wrapped action. This function is bound in decorate_action
// above. The bound object performs the pinning/unpinning.
threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type && f,
components::pinned_ptr)
{
return f(state);
return f();
}

private:
Expand Down
126 changes: 21 additions & 105 deletions hpx/runtime/threads/coroutines/coroutine.hpp
Expand Up @@ -54,168 +54,84 @@ namespace hpx { namespace threads { namespace coroutines
friend struct detail::coroutine_accessor;

typedef detail::coroutine_impl impl_type;
typedef impl_type::pointer impl_ptr;
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine() : m_pimpl(nullptr) {}
typedef util::unique_function_nonser<result_type()> functor_type;

coroutine(functor_type&& f,
thread_id_type id,
std::ptrdiff_t stack_size = detail::default_stack_size)
: m_pimpl(impl_type::create(
std::move(f), id, stack_size))
{
HPX_ASSERT(m_pimpl->is_ready());
}

coroutine(coroutine && src)
: m_pimpl(std::move(src.m_pimpl))
: impl_(std::move(f), id, stack_size)
{
src.m_pimpl = nullptr;
HPX_ASSERT(impl_.is_ready());
}

coroutine& operator=(coroutine && src)
{
coroutine(std::move(src)).swap(*this);
return *this;
}

coroutine& swap(coroutine& rhs)
{
std::swap(m_pimpl, rhs.m_pimpl);
return *this;
}

friend void swap(coroutine& lhs, coroutine& rhs)
{
lhs.swap(rhs);
}
coroutine(coroutine const& src) = delete;
coroutine& operator=(coroutine const& src) = delete;
coroutine(coroutine && src) = delete;
coroutine& operator=(coroutine && src) = delete;

thread_id_type get_thread_id() const
{
return m_pimpl->get_thread_id();
return impl_.get_thread_id();
}

#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
std::size_t get_thread_phase() const
{
return m_pimpl->get_thread_phase();
return impl_.get_thread_phase();
}
#endif

std::size_t get_thread_data() const
{
return m_pimpl.get() ? m_pimpl->get_thread_data() : 0;
return impl_.get_thread_data();
}

std::size_t set_thread_data(std::size_t data)
{
return m_pimpl.get() ? m_pimpl->set_thread_data(data) : 0;
return impl_.set_thread_data(data);
}

#if defined(HPX_HAVE_APEX)
void** get_apex_data() const
{
return m_pimpl.get() ? m_pimpl->get_apex_data() : 0ull;
return impl_.get_apex_data();
}
#endif

void rebind(functor_type&& f, thread_id_type id)
{
HPX_ASSERT(exited());
impl_type::rebind(m_pimpl.get(), std::move(f), id);
impl_.rebind(std::move(f), id);
}

HPX_FORCEINLINE result_type operator()(arg_type arg = arg_type())
HPX_FORCEINLINE result_type operator()()
{
HPX_ASSERT(m_pimpl);
HPX_ASSERT(m_pimpl->is_ready());

result_type* ptr = nullptr;
m_pimpl->bind_args(&arg);
m_pimpl->bind_result_pointer(&ptr);

m_pimpl->invoke();
HPX_ASSERT(impl_.is_ready());

return std::move(*m_pimpl->result());
}
impl_.invoke();

explicit operator bool() const
{
return good();
}

bool operator==(const coroutine& rhs) const
{
return m_pimpl == rhs.m_pimpl;
}

void exit()
{
HPX_ASSERT(m_pimpl);
m_pimpl->exit();
}

bool waiting() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->waiting();
}

bool pending() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->pending() != 0;
}

bool exited() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->exited();
return impl_.result();
}

bool is_ready() const
{
HPX_ASSERT(m_pimpl);
return m_pimpl->is_ready();
}

bool empty() const
{
return m_pimpl == nullptr;
return impl_.is_ready();
}

std::ptrdiff_t get_available_stack_space()
{
#if defined(HPX_HAVE_THREADS_GET_STACK_POINTER)
return m_pimpl->get_available_stack_space();
return impl_.get_available_stack_space();
#else
return (std::numeric_limits<std::ptrdiff_t>::max)();
#endif
}

protected:
bool good() const
{
return !empty() && !exited() && !waiting();
}

impl_ptr m_pimpl;

std::uint64_t count() const
{
return m_pimpl->count();
}

impl_ptr get_impl()
{
return m_pimpl;
}
private:
impl_type impl_;
};
}}}

Expand Down