Skip to content

Commit

Permalink
Cleaning up coroutine implementation
Browse files Browse the repository at this point in the history
 - Removing dynamic memory allocation
 - Removing intrusive ptr
 - Removing unused functions
 - Simplifying thread functions to be nullary
 - Using thread_data directly to get/set the state_ex value
  • Loading branch information
Thomas Heller committed Jan 24, 2018
1 parent 4ba45e7 commit 7354aea
Show file tree
Hide file tree
Showing 45 changed files with 194 additions and 910 deletions.
8 changes: 2 additions & 6 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -867,11 +867,9 @@ namespace detail
}

protected:
static threads::thread_result_type run_impl(future_base_type this_)
static void run_impl(future_base_type this_)
{
this_->do_run();
return threads::thread_result_type(
threads::terminated, threads::invalid_thread_id);
}

public:
Expand Down Expand Up @@ -943,12 +941,10 @@ namespace detail
};

protected:
static threads::thread_result_type run_impl(future_base_type this_)
static void run_impl(future_base_type this_)
{
reset_id r(*this_);
this_->do_run();
return threads::thread_result_type(
threads::terminated, threads::invalid_thread_id);
}

public:
Expand Down
14 changes: 5 additions & 9 deletions hpx/lcos/local/packaged_continuation.hpp
Expand Up @@ -271,7 +271,7 @@ namespace hpx { namespace lcos { namespace detail
}

protected:
threads::thread_result_type
void
async_impl(
typename traits::detail::shared_state_ptr_for<
Future
Expand All @@ -281,11 +281,9 @@ namespace hpx { namespace lcos { namespace detail

Future future = traits::future_access<Future>::create(std::move(f));
invoke_continuation(f_, std::move(future), *this);
return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}

threads::thread_result_type
void
async_exec_impl(
typename traits::detail::shared_state_ptr_for<
Future
Expand All @@ -299,8 +297,6 @@ namespace hpx { namespace lcos { namespace detail

Future future = traits::future_access<Future>::create(std::move(f));
invoke_continuation(f_, std::move(future), *this, is_void());
return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}

public:
Expand All @@ -324,7 +320,7 @@ namespace hpx { namespace lcos { namespace detail
}

boost::intrusive_ptr<continuation> this_(this);
threads::thread_result_type (continuation::*async_impl_ptr)(
void (continuation::*async_impl_ptr)(
typename traits::detail::shared_state_ptr_for<Future>::type &&
) = &continuation::async_impl;

Expand Down Expand Up @@ -371,7 +367,7 @@ namespace hpx { namespace lcos { namespace detail
}

boost::intrusive_ptr<continuation> this_(this);
threads::thread_result_type (continuation::*async_impl_ptr)(
void (continuation::*async_impl_ptr)(
typename traits::detail::shared_state_ptr_for<Future>::type &&
) = &continuation::async_impl;

Expand Down Expand Up @@ -414,7 +410,7 @@ namespace hpx { namespace lcos { namespace detail
}

boost::intrusive_ptr<continuation> this_(this);
threads::thread_result_type (continuation::*async_exec_impl_ptr)(
void (continuation::*async_exec_impl_ptr)(
typename traits::detail::shared_state_ptr_for<Future>::type &&
) = &continuation::async_exec_impl;

Expand Down
22 changes: 9 additions & 13 deletions hpx/runtime/actions/action_invoke_no_more_than.hpp
Expand Up @@ -15,7 +15,7 @@
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/traits/is_future.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/static.hpp>

#include <memory>
Expand Down Expand Up @@ -82,47 +82,43 @@ namespace hpx { namespace actions { namespace detail

// If the action returns something which is not a future, we inject
// a semaphore into the call graph.
static threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
static void thread_function(
threads::thread_function_type f)
{
typedef typename construct_semaphore_type::semaphore_type
semaphore_type;

signal_on_exit<semaphore_type> on_exit(
construct_semaphore_type::get_sem());
return f(state);
f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::false_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
}

// If the action returns a future we wait on the semaphore as well,
// however it will be signaled once the future gets ready only.
static threads::thread_result_type thread_function_future(
threads::thread_state_ex_enum state,
static void thread_function_future(
threads::thread_function_type f)
{
construct_semaphore_type::get_sem().wait();
return f(state);
return f();
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::true_type)
{
return util::bind(
util::one_shot(&action_decorate_function::thread_function_future),
util::placeholders::_1,
return util::deferred_call(
&action_decorate_function::thread_function_future,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand Down
10 changes: 3 additions & 7 deletions hpx/runtime/actions/basic_action.hpp
Expand Up @@ -96,16 +96,14 @@ namespace hpx { namespace actions
, f_(std::move(other.f_))
{}

HPX_FORCEINLINE threads::thread_result_type
operator()(threads::thread_state_ex_enum)
HPX_FORCEINLINE void
operator()()
{
LTM_(debug)
<< "Executing " << Action::get_action_name(lva_)
<< " with continuation(" << cont_.get_id() << ")";

actions::trigger(std::move(cont_), f_);
return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}

private:
Expand Down Expand Up @@ -237,7 +235,7 @@ namespace hpx { namespace actions
{}

template <typename ...Ts>
HPX_FORCEINLINE threads::thread_result_type
HPX_FORCEINLINE void
operator()(naming::address::address_type lva,
naming::address::component_type comptype, Ts&&... vs) const
{
Expand Down Expand Up @@ -272,8 +270,6 @@ namespace hpx { namespace actions
// OS-thread. This will throw if there are still any locks
// held.
util::force_error_on_lock();
return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}

// This holds the target alive, if necessary.
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/executor_component.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace components
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f(hpx::threads::wait_signaled);
f();
}

/// This is the default hook implementation for schedule_thread which
Expand Down
25 changes: 7 additions & 18 deletions hpx/runtime/components/server/locking_hook.hpp
Expand Up @@ -11,8 +11,8 @@
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/threads/coroutines/coroutine.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/unlock_guard.hpp>

Expand Down Expand Up @@ -55,17 +55,15 @@ namespace hpx { namespace components
static threads::thread_function_type
decorate_action(naming::address::address_type lva, F && f)
{
return util::bind(
util::one_shot(&locking_hook::thread_function),
return util::deferred_call(&locking_hook::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)));
}

protected:
typedef util::function_nonser<
threads::thread_arg_type(threads::thread_result_type)
void(threads::thread_result_type)
> yield_decorator_type;

struct decorate_wrapper
Expand All @@ -86,12 +84,8 @@ namespace hpx { namespace components

// Execute the wrapped action. This locks the mutex ensuring a thread
// safe action invocation.
threads::thread_result_type thread_function(
threads::thread_arg_type state, threads::thread_function_type f)
void thread_function(threads::thread_function_type f)
{
threads::thread_result_type result(threads::unknown,
threads::invalid_thread_id);

// now lock the mutex and execute the action
std::unique_lock<mutex_type> l(mtx_);

Expand All @@ -109,12 +103,10 @@ namespace hpx { namespace components
decorate_wrapper yield_decorator(
util::bind_front(&locking_hook::yield_function, this));

result = f(state);
f();

(void)yield_decorator; // silence gcc warnings
}

return result;
}

struct undecorate_wrapper
Expand All @@ -133,24 +125,21 @@ namespace hpx { namespace components

// The yield decorator unlocks the mutex and calls the system yield
// which gives up control back to the thread manager.
threads::thread_arg_type yield_function(threads::thread_result_type state)
void yield_function(threads::thread_result_type state)
{
// We un-decorate the yield function as the lock handling may
// suspend, which causes an infinite recursion otherwise.
undecorate_wrapper yield_decorator;
threads::thread_arg_type result = threads::wait_unknown;

{
util::unlock_guard<mutex_type> ul(mtx_);
result = threads::get_self().yield_impl(state);
threads::get_self().yield_impl(state);
}

// Re-enable ignoring the lock on the mutex above (this
// information is lost in the lock tracking tables once a mutex is
// unlocked).
util::ignore_lock(&mtx_);

return result;
}

private:
Expand Down
12 changes: 4 additions & 8 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -16,7 +16,7 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/deferred_call.hpp>

#include <cstdint>
#include <mutex>
Expand Down Expand Up @@ -167,10 +167,8 @@ namespace hpx { namespace components
// Make sure we pin the component at construction of the bound object
// which will also unpin it once the thread runs to completion (the
// bound object goes out of scope).
return util::bind(
util::one_shot(&migration_support::thread_function),
return util::deferred_call(&migration_support::thread_function,
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)),
components::pinned_ptr::create<this_component_type>(lva));
Expand All @@ -192,12 +190,10 @@ namespace hpx { namespace components
protected:
// Execute the wrapped action. This function is bound in decorate_action
// above. The bound object performs the pinning/unpinning.
threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type && f,
void thread_function(threads::thread_function_type && f,
components::pinned_ptr)
{
return f(state);
f();
}

private:
Expand Down

0 comments on commit 7354aea

Please sign in to comment.