Skip to content

Commit

Permalink
Merge pull request #1934 from STEllAR-GROUP/fixing_1523_2
Browse files Browse the repository at this point in the history
Fix remote async with deferred launch policy
  • Loading branch information
hkaiser committed Jan 10, 2016
2 parents 1b4fbdd + ae87567 commit 3206b35
Show file tree
Hide file tree
Showing 13 changed files with 516 additions and 535 deletions.
3 changes: 1 addition & 2 deletions hpx/lcos/base_lco.hpp
Expand Up @@ -23,7 +23,7 @@ namespace hpx { namespace lcos
/// implementing a simple set_event action
class HPX_API_EXPORT base_lco
{
protected:
public:
virtual void set_event () = 0;

virtual void set_exception (boost::exception_ptr const& e);
Expand All @@ -34,7 +34,6 @@ namespace hpx { namespace lcos
// noop by default
virtual void disconnect(naming::id_type const &);

public:
// components must contain a typedef for wrapping_type defining the
// managed_component type used to encapsulate instances of this
// component
Expand Down
188 changes: 89 additions & 99 deletions hpx/lcos/detail/async_implementations.hpp
Expand Up @@ -15,6 +15,8 @@
#include <hpx/lcos/packaged_action.hpp>
#include <hpx/util/move.hpp>

#include <boost/mpl/bool.hpp>

namespace hpx { namespace detail
{
/// \cond NOINTERNAL
Expand All @@ -41,31 +43,21 @@ namespace hpx { namespace detail
struct sync_local_invoke
{
template <typename ...Ts>
static lcos::future<Result> call(
naming::id_type const& id, naming::address && addr,
Ts &&... vs)
static lcos::future<Result>
call(naming::id_type const& id, naming::address && addr, Ts &&... vs)
{
lcos::packaged_action<Action, Result> p;
bool target_is_managed = false;
naming::id_type id1;

if (id.get_management_type() == naming::id_type::managed)
{
naming::id_type id1(id.get_gid(), naming::id_type::unmanaged);
if (addr)
{
p.apply(launch::sync, std::move(addr), id1,
std::forward<Ts>(vs)...);
}
else
{
p.apply(launch::sync, id1, std::forward<Ts>(vs)...);
}
id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged);
target_is_managed = true;
}
else
{
p.apply(launch::sync, id, std::forward<Ts>(vs)...);
}

lcos::packaged_action<Action, Result> p;
p.apply(std::move(addr), target_is_managed ? id1 : id,
std::forward<Ts>(vs)...);

// keep id alive, if needed - this allows to send the destination
// as an unmanaged id
Expand All @@ -85,54 +77,44 @@ namespace hpx { namespace detail
}
};

template <typename Action, typename R>
struct sync_local_invoke<Action, lcos::future<R> >
template <typename Action, typename Result>
struct sync_local_invoke<Action, lcos::future<Result> >
{
template <typename ...Ts>
HPX_FORCEINLINE static lcos::future<R> call(
boost::mpl::true_, naming::id_type const&,
naming::address && addr, Ts &&... vs)
HPX_FORCEINLINE static lcos::future<Result>
call(naming::id_type const&, naming::address && addr, Ts &&... vs)
{
HPX_ASSERT(!!addr);
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
typename Action::component_type
>::call(addr));

return Action::execute_function(addr.address_,
std::forward<Ts>(vs)...);
}
};

///////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////
template <typename Action, typename Result>
struct sync_local_invoke_cb
{
template <typename Callback, typename ...Ts>
static lcos::future<Result> call(
naming::id_type const& id, naming::address && addr,
Callback && cb, Ts &&... vs)
static lcos::future<Result>
call(naming::id_type const& id, naming::address && addr, Callback && cb,
Ts &&... vs)
{
lcos::packaged_action<Action, Result> p;
bool target_is_managed = false;
naming::id_type id1;

if (id.get_management_type() == naming::id_type::managed)
{
naming::id_type id1(id.get_gid(), naming::id_type::unmanaged);
if (addr)
{
p.apply_cb(launch::sync, std::move(addr), id1,
std::forward<Callback>(cb), std::forward<Ts>(vs)...);
}
else
{
p.apply_cb(launch::sync, id1, std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}
id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged);
target_is_managed = true;
}
else
{
p.apply_cb(launch::sync, id, std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}

lcos::packaged_action<Action, Result> p;
p.apply_cb(std::move(addr), target_is_managed ? id1 : id,
std::forward<Callback>(cb), std::forward<Ts>(vs)...);

// keep id alive, if needed - this allows to send the destination
// as an unmanaged id
Expand All @@ -152,19 +134,21 @@ namespace hpx { namespace detail
}
};

template <typename Action, typename R>
struct sync_local_invoke_cb<Action, lcos::future<R> >
template <typename Action, typename Result>
struct sync_local_invoke_cb<Action, lcos::future<Result> >
{
template <typename Callback, typename ...Ts>
HPX_FORCEINLINE static lcos::future<R> call(
boost::mpl::true_, naming::id_type const&,
naming::address && addr, Callback && cb, Ts&&... vs)
HPX_FORCEINLINE static lcos::future<Result>
call(naming::id_type const&, naming::address && addr, Callback && cb,
Ts &&... vs)
{
HPX_ASSERT(!!addr);
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
typename Action::component_type
>::call(addr));

lcos::future<R> f = Action::execute_function(addr.address_,
std::forward<Ts>(vs)...);
lcos::future<Result> f = Action::execute_function(
addr.address_, std::forward<Ts>(vs)...);

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
Expand Down Expand Up @@ -194,35 +178,39 @@ namespace hpx { namespace detail
call(id, std::move(addr), std::forward<Ts>(vs)...);
}

lcos::packaged_action<action_type, result_type> p;

bool target_is_managed = false;
naming::id_type id1;
future<result_type> f;

if (id.get_management_type() == naming::id_type::managed)
{
id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged);
target_is_managed = true;
}

if (policy == launch::sync || hpx::detail::has_async_policy(policy))
{
if (id.get_management_type() == naming::id_type::managed)
{
naming::id_type id1(id.get_gid(), naming::id_type::unmanaged);
if (addr)
{
p.apply(policy, std::move(addr), id1,
std::forward<Ts>(vs)...);
}
else
{
p.apply(policy, id1, std::forward<Ts>(vs)...);
}
target_is_managed = true;
}
else
{
p.apply(policy, id, std::forward<Ts>(vs)...);
}
lcos::packaged_action<action_type, result_type> p;
p.apply(std::move(addr), target_is_managed ? id1 : id,
std::forward<Ts>(vs)...);
f = p.get_future();
}
else if (policy == launch::deferred)
{
lcos::packaged_action<action_type, result_type> p;
p.apply_deferred(std::move(addr), target_is_managed ? id1 : id,
std::forward<Ts>(vs)...);
f = p.get_future();
}
else
{
HPX_THROW_EXCEPTION(bad_parameter,
"async_impl", "unknown launch policy");
return f;
}

// keep id alive, if needed - this allows to send the destination as an
// unmanaged id
future<result_type> f = p.get_future();

if (target_is_managed)
{
typedef typename traits::detail::shared_state_ptr_for<
Expand Down Expand Up @@ -260,37 +248,39 @@ namespace hpx { namespace detail
std::forward<Ts>(vs)...);
}

lcos::packaged_action<action_type, result_type> p;

future<result_type> f;
bool target_is_managed = false;
naming::id_type id1;

if (id.get_management_type() == naming::id_type::managed)
{
id1 = naming::id_type(id.get_gid(), naming::id_type::unmanaged);
target_is_managed = true;
}

if (policy == launch::sync || hpx::detail::has_async_policy(policy))
{
if (id.get_management_type() == naming::id_type::managed)
{
naming::id_type id1(id.get_gid(), naming::id_type::unmanaged);
if (addr)
{
p.apply_cb(policy, std::move(addr), id1,
std::forward<Callback>(cb), std::forward<Ts>(vs)...);
}
else
{
p.apply_cb(policy, id1, std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}
target_is_managed = true;
}
else
{
p.apply_cb(policy, id, std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}
lcos::packaged_action<action_type, result_type> p;
p.apply_cb(std::move(addr), target_is_managed ? id1 : id,
std::forward<Callback>(cb), std::forward<Ts>(vs)...);
f = p.get_future();
}
else if (policy == launch::deferred)
{
lcos::packaged_action<action_type, result_type> p;
p.apply_deferred_cb(std::move(addr), target_is_managed ? id1 : id,
std::forward<Callback>(cb), std::forward<Ts>(vs)...);
f = p.get_future();
}
else
{
HPX_THROW_EXCEPTION(bad_parameter,
"async_cb_impl", "unknown launch policy");
return f;
}

// keep id alive, if needed - this allows to send the destination
// as an unmanaged id
future<result_type> f = p.get_future();

if (target_is_managed)
{
typedef typename traits::detail::shared_state_ptr_for<
Expand Down
15 changes: 6 additions & 9 deletions hpx/lcos/detail/future_data.hpp
Expand Up @@ -689,12 +689,12 @@ namespace detail
protected:
typedef typename future_data<Result>::result_type result_type;

threads::thread_id_type get_id() const
threads::thread_id_type get_thread_id() const
{
boost::lock_guard<mutex_type> l(this->mtx_);
return id_;
}
void set_id(threads::thread_id_type id)
void set_thread_id(threads::thread_id_type id)
{
boost::lock_guard<mutex_type> l(this->mtx_);
id_ = id;
Expand Down Expand Up @@ -729,8 +729,7 @@ namespace detail
{
if (!started_test_and_set())
this->do_run();
else
this->future_data<Result>::wait(ec);
this->future_data<Result>::wait(ec);
}

virtual BOOST_SCOPED_ENUM(future_status)
Expand Down Expand Up @@ -817,11 +816,11 @@ namespace detail
reset_id(task_base& target)
: target_(target)
{
target.set_id(threads::get_self_id());
target.set_thread_id(threads::get_self_id());
}
~reset_id()
{
target_.set_id(threads::invalid_thread_id);
target_.set_thread_id(threads::invalid_thread_id);
}
task_base& target_;
};
Expand All @@ -838,13 +837,11 @@ namespace detail
template <typename T>
void set_data(T && result)
{
HPX_ASSERT(started_);
this->future_data<Result>::set_value(std::forward<T>(result));
this->future_data<Result>::set_data(std::forward<T>(result));
}

void set_exception(boost::exception_ptr const& e)
{
HPX_ASSERT(started_);
this->future_data<Result>::set_exception(e);
}

Expand Down

0 comments on commit 3206b35

Please sign in to comment.