Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partially reverting #3126 #3138

Merged
merged 1 commit into from Feb 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 11 additions & 7 deletions hpx/runtime/actions/action_invoke_no_more_than.hpp
Expand Up @@ -15,7 +15,7 @@
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/traits/is_future.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/static.hpp>

#include <memory>
Expand Down Expand Up @@ -83,22 +83,24 @@ namespace hpx { namespace actions { namespace detail
// If the action returns something which is not a future, we inject
// a semaphore into the call graph.
static threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
typedef typename construct_semaphore_type::semaphore_type
semaphore_type;

signal_on_exit<semaphore_type> on_exit(
construct_semaphore_type::get_sem());
return f();
return f(state);
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::false_type)
{
return util::deferred_call(
&action_decorate_function::thread_function,
return util::bind(
util::one_shot(&action_decorate_function::thread_function),
util::placeholders::_1,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand All @@ -107,18 +109,20 @@ namespace hpx { namespace actions { namespace detail
// If the action returns a future we wait on the semaphore as well,
// however it will be signaled once the future gets ready only.
static threads::thread_result_type thread_function_future(
threads::thread_state_ex_enum state,
threads::thread_function_type f)
{
construct_semaphore_type::get_sem().wait();
return f();
return f(state);
}

template <typename F>
static threads::thread_function_type
call(naming::address::address_type lva, F && f, std::true_type)
{
return util::deferred_call(
&action_decorate_function::thread_function_future,
return util::bind(
util::one_shot(&action_decorate_function::thread_function_future),
util::placeholders::_1,
traits::action_decorate_function<action_wrapper>::call(
lva, std::forward<F>(f))
);
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/basic_action.hpp
Expand Up @@ -97,7 +97,7 @@ namespace hpx { namespace actions
{}

HPX_FORCEINLINE threads::thread_result_type
operator()()
operator()(threads::thread_state_ex_enum)
{
LTM_(debug)
<< "Executing " << Action::get_action_name(lva_)
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/executor_component.hpp
Expand Up @@ -46,7 +46,7 @@ namespace hpx { namespace components
// executor
static void execute(hpx::threads::thread_function_type const& f)
{
f();
f(hpx::threads::wait_signaled);
}

/// This is the default hook implementation for schedule_thread which
Expand Down
19 changes: 12 additions & 7 deletions hpx/runtime/components/server/locking_hook.hpp
Expand Up @@ -11,8 +11,8 @@
#include <hpx/runtime/get_lva.hpp>
#include <hpx/runtime/threads/coroutines/coroutine.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/bind_front.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/register_locks.hpp>
#include <hpx/util/unlock_guard.hpp>

Expand Down Expand Up @@ -55,15 +55,17 @@ namespace hpx { namespace components
static threads::thread_function_type
decorate_action(naming::address::address_type lva, F && f)
{
return util::deferred_call(&locking_hook::thread_function,
return util::bind(
util::one_shot(&locking_hook::thread_function),
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)));
}

protected:
typedef util::function_nonser<
void(threads::thread_result_type)
threads::thread_arg_type(threads::thread_result_type)
> yield_decorator_type;

struct decorate_wrapper
Expand All @@ -85,7 +87,7 @@ namespace hpx { namespace components
// Execute the wrapped action. This locks the mutex ensuring a thread
// safe action invocation.
threads::thread_result_type thread_function(
threads::thread_function_type f)
threads::thread_arg_type state, threads::thread_function_type f)
{
threads::thread_result_type result(threads::unknown,
threads::invalid_thread_id);
Expand All @@ -107,7 +109,7 @@ namespace hpx { namespace components
decorate_wrapper yield_decorator(
util::bind_front(&locking_hook::yield_function, this));

result = f();
result = f(state);

(void)yield_decorator; // silence gcc warnings
}
Expand All @@ -131,21 +133,24 @@ namespace hpx { namespace components

// The yield decorator unlocks the mutex and calls the system yield
// which gives up control back to the thread manager.
void yield_function(threads::thread_result_type state)
threads::thread_arg_type yield_function(threads::thread_result_type state)
{
// We un-decorate the yield function as the lock handling may
// suspend, which causes an infinite recursion otherwise.
undecorate_wrapper yield_decorator;
threads::thread_arg_type result = threads::wait_unknown;

{
util::unlock_guard<mutex_type> ul(mtx_);
threads::get_self().yield_impl(state);
result = threads::get_self().yield_impl(state);
}

// Re-enable ignoring the lock on the mutex above (this
// information is lost in the lock tracking tables once a mutex is
// unlocked).
util::ignore_lock(&mtx_);

return result;
}

private:
Expand Down
9 changes: 6 additions & 3 deletions hpx/runtime/components/server/migration_support.hpp
Expand Up @@ -16,7 +16,7 @@
#include <hpx/runtime/threads_fwd.hpp>
#include <hpx/traits/action_decorate_function.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/bind.hpp>

#include <cstdint>
#include <mutex>
Expand Down Expand Up @@ -167,8 +167,10 @@ namespace hpx { namespace components
// Make sure we pin the component at construction of the bound object
// which will also unpin it once the thread runs to completion (the
// bound object goes out of scope).
return util::deferred_call(&migration_support::thread_function,
return util::bind(
util::one_shot(&migration_support::thread_function),
get_lva<this_component_type>::call(lva),
util::placeholders::_1,
traits::action_decorate_function<base_type>::call(
lva, std::forward<F>(f)),
components::pinned_ptr::create<this_component_type>(lva));
Expand All @@ -191,10 +193,11 @@ namespace hpx { namespace components
// Execute the wrapped action. This function is bound in decorate_action
// above. The bound object performs the pinning/unpinning.
threads::thread_result_type thread_function(
threads::thread_state_ex_enum state,
threads::thread_function_type && f,
components::pinned_ptr)
{
return f();
return f(state);
}

private:
Expand Down
7 changes: 5 additions & 2 deletions hpx/runtime/threads/coroutines/coroutine.hpp
Expand Up @@ -57,8 +57,9 @@ namespace hpx { namespace threads { namespace coroutines
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::unique_function_nonser<result_type()> functor_type;
typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine(functor_type&& f,
thread_id_type id,
Expand Down Expand Up @@ -107,10 +108,12 @@ namespace hpx { namespace threads { namespace coroutines
impl_.rebind(std::move(f), id);
}

HPX_FORCEINLINE result_type operator()()
HPX_FORCEINLINE result_type operator()(arg_type arg = arg_type())
{
HPX_ASSERT(impl_.is_ready());

impl_.bind_args(&arg);

impl_.invoke();

return impl_.result();
Expand Down
17 changes: 15 additions & 2 deletions hpx/runtime/threads/coroutines/detail/coroutine_impl.hpp
Expand Up @@ -63,13 +63,15 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
typedef context_base::thread_id_type thread_id_type;

typedef std::pair<thread_state_enum, thread_id_type> result_type;
typedef thread_state_ex_enum arg_type;

typedef util::unique_function_nonser<result_type()> functor_type;
typedef util::unique_function_nonser<result_type(arg_type)> functor_type;

coroutine_impl(functor_type&& f, thread_id_type id,
std::ptrdiff_t stack_size)
: context_base(*this, stack_size, id)
, m_result(terminated, invalid_thread_id)
, m_result(unknown, invalid_thread_id)
, m_arg(nullptr)
, m_fun(std::move(f))
{}

Expand All @@ -89,6 +91,16 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
{
return m_result;
}
arg_type * args()
{
HPX_ASSERT(m_arg);
return m_arg;
};

void bind_args(arg_type* arg)
{
m_arg = arg;
}

#if defined(HPX_HAVE_THREAD_PHASE_INFORMATION)
std::size_t get_thread_phase() const
Expand All @@ -114,6 +126,7 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail

private:
result_type m_result;
arg_type* m_arg;

functor_type m_fun;
};
Expand Down
11 changes: 7 additions & 4 deletions hpx/runtime/threads/coroutines/detail/coroutine_self.hpp
Expand Up @@ -75,18 +75,19 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
typedef impl_type::thread_id_type thread_id_type;

typedef impl_type::result_type result_type;
typedef impl_type::arg_type arg_type;

typedef util::function_nonser<void(result_type)>
typedef util::function_nonser<arg_type(result_type)>
yield_decorator_type;

void yield(result_type arg)
arg_type yield(result_type arg = result_type())
{
!yield_decorator_.empty() ?
return !yield_decorator_.empty() ?
yield_decorator_(std::move(arg)) :
yield_impl(std::move(arg));
}

void yield_impl(result_type arg)
arg_type yield_impl(result_type arg)
{
HPX_ASSERT(m_pimpl);

Expand All @@ -96,6 +97,8 @@ namespace hpx { namespace threads { namespace coroutines { namespace detail
reset_self_on_exit on_exit(this);
this->m_pimpl->yield();
}

return *m_pimpl->args();
}

template <typename F>
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -295,7 +295,7 @@ namespace hpx { namespace threads { namespace detail
thread_id_type background_thread;
background_running.reset(new bool(true));
thread_init_data background_init(
[&, background_running]() -> thread_result_type
[&, background_running](thread_state_ex_enum) -> thread_result_type
{
while(*background_running)
{
Expand Down
3 changes: 1 addition & 2 deletions hpx/runtime/threads/detail/set_thread_state.hpp
Expand Up @@ -321,9 +321,8 @@ namespace hpx { namespace threads { namespace detail
// this waits for the thread to be reactivated when the timer fired
// if it returns signaled the timer has been canceled, otherwise
// the timer fired and the wake_timer_thread above has been executed
get_self().yield(thread_result_type(suspended, invalid_thread_id));
thread_state_ex_enum statex =
self_id->set_state_ex(thread_state_ex_enum::wait_signaled);
get_self().yield(thread_result_type(suspended, invalid_thread_id));

if (wait_timeout != statex) //-V601
{
Expand Down
5 changes: 3 additions & 2 deletions hpx/runtime/threads/thread_data.hpp
Expand Up @@ -241,7 +241,7 @@ namespace hpx { namespace threads
thread_state(new_state, state_ex, tag));
}

public:
private:
/// The set_state function changes the extended state of this
/// thread instance.
///
Expand Down Expand Up @@ -269,6 +269,7 @@ namespace hpx { namespace threads
}
}

public:
/// Return the id of the component this thread is running in
naming::address_type get_component_id() const
{
Expand Down Expand Up @@ -516,7 +517,7 @@ namespace hpx { namespace threads
coroutine_type::result_type operator()()
{
HPX_ASSERT(this == coroutine_.get_thread_id().get());
return coroutine_();
return coroutine_(set_state_ex(wait_signaled));
}

thread_id_type get_thread_id() const
Expand Down
3 changes: 2 additions & 1 deletion hpx/runtime/threads/thread_data_fwd.hpp
Expand Up @@ -48,7 +48,8 @@ namespace hpx { namespace threads
typedef std::pair<thread_state_enum, thread_id_type> thread_result_type;
typedef thread_state_ex_enum thread_arg_type;

typedef util::unique_function_nonser<thread_result_type()> thread_function_type;
typedef thread_result_type thread_function_sig(thread_arg_type);
typedef util::unique_function_nonser<thread_function_sig> thread_function_type;
/// \endcond

///////////////////////////////////////////////////////////////////////
Expand Down
26 changes: 23 additions & 3 deletions hpx/runtime/threads/thread_helpers.hpp
Expand Up @@ -796,7 +796,27 @@ namespace hpx { namespace applier
{
F f;

inline threads::thread_result_type operator()()
inline threads::thread_result_type operator()(threads::thread_arg_type)
{
// execute the actual thread function
f(threads::wait_signaled);

// Verify that there are no more registered locks for this
// OS-thread. This will throw if there are still any locks
// held.
util::force_error_on_lock();

return threads::thread_result_type(threads::terminated,
threads::invalid_thread_id);
}
};

template <typename F>
struct thread_function_nullary
{
F f;

inline threads::thread_result_type operator()(threads::thread_arg_type)
{
// execute the actual thread function
f();
Expand Down Expand Up @@ -855,7 +875,7 @@ namespace hpx { namespace applier
error_code& ec = throws)
{
threads::thread_function_type thread_func(
detail::thread_function<typename std::decay<F>::type>{
detail::thread_function_nullary<typename std::decay<F>::type>{
std::forward<F>(func)});
return register_thread_plain(std::move(thread_func),
description, initial_state, run_now, priority, os_thread, stacksize,
Expand Down Expand Up @@ -988,7 +1008,7 @@ namespace hpx { namespace applier
error_code& ec = throws)
{
threads::thread_function_type thread_func(
detail::thread_function<typename std::decay<F>::type>{
detail::thread_function_nullary<typename std::decay<F>::type>{
std::forward<F>(func)});
return register_work_plain(std::move(thread_func),
description, 0, initial_state, priority, os_thread, stacksize,
Expand Down