Skip to content

Commit

Permalink
Merge pull request #1983 from STEllAR-GROUP/migration
Browse files Browse the repository at this point in the history
Avoid invoking migration table look up for all objects
  • Loading branch information
sithhell committed Feb 11, 2016
2 parents e7ec0d2 + bd2f399 commit 0cae2f6
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 56 deletions.
Expand Up @@ -12,6 +12,7 @@
#include <hpx/include/runtime.hpp>
#include <hpx/include/serialization.hpp>
#include <hpx/include/util.hpp>
#include <hpx/include/traits.hpp>

#include <hpx/components/component_storage/export_definitions.hpp>
#include <hpx/components/component_storage/server/component_storage.hpp>
Expand Down Expand Up @@ -122,7 +123,7 @@ namespace hpx { namespace components { namespace server
naming::id_type const& to_resurrect,
naming::id_type const& target_locality)
{
if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
HPX_THROW_EXCEPTION(invalid_status,
"hpx::components::server::trigger_migrate_from_storage_here",
Expand Down
Expand Up @@ -122,7 +122,7 @@ namespace hpx { namespace components { namespace server
naming::address const& addr,
naming::id_type const& target_storage)
{
if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
HPX_THROW_EXCEPTION(invalid_status,
"hpx::components::server::migrate_to_storage_here",
Expand Down Expand Up @@ -157,7 +157,7 @@ namespace hpx { namespace components { namespace server
naming::id_type const& to_migrate,
naming::id_type const& target_storage)
{
if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
HPX_THROW_EXCEPTION(invalid_status,
"hpx::components::server::trigger_migrate_to_storage_here",
Expand Down
38 changes: 30 additions & 8 deletions hpx/lcos/detail/async_implementations.hpp
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/traits/future_access.hpp>
#include <hpx/traits/component_supports_migration.hpp>
#include <hpx/traits/action_was_object_migrated.hpp>
#include <hpx/runtime/naming/address.hpp>
#include <hpx/runtime/naming/id_type.hpp>
Expand Down Expand Up @@ -171,15 +172,24 @@ namespace hpx { namespace detail
typedef typename traits::promise_local_result<
typename action_type::remote_result_type
>::type result_type;
typedef typename action_type::component_type component_type;

std::pair<bool, components::pinned_ptr> r;

naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (policy == launch::sync && !r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (policy == launch::sync && !r.first)
{
return hpx::detail::sync_local_invoke<action_type, result_type>::
call(id, std::move(addr), std::forward<Ts>(vs)...);
}
}
else if (policy == launch::sync)
{
return hpx::detail::sync_local_invoke<action_type, result_type>::
call(id, std::move(addr), std::forward<Ts>(vs)...);
Expand Down Expand Up @@ -247,18 +257,30 @@ namespace hpx { namespace detail
typedef typename traits::promise_local_result<
typename action_type::remote_result_type
>::type result_type;
typedef typename action_type::component_type component_type;

std::pair<bool, components::pinned_ptr> r;

naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (policy == launch::sync && !r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (policy == launch::sync && !r.first)
{
return hpx::detail::sync_local_invoke_cb<
action_type, result_type
>::call(id, std::move(addr), std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}
}
else if (policy == launch::sync)
{
return hpx::detail::sync_local_invoke_cb<action_type, result_type>::
call(id, std::move(addr), std::forward<Callback>(cb),
return hpx::detail::sync_local_invoke_cb<
action_type, result_type
>::call(id, std::move(addr), std::forward<Callback>(cb),
std::forward<Ts>(vs)...);
}
}
Expand Down
83 changes: 67 additions & 16 deletions hpx/lcos/packaged_action.hpp
Expand Up @@ -16,6 +16,7 @@
#include <hpx/util/protect.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/traits/component_type_is_compatible.hpp>
#include <hpx/traits/component_supports_migration.hpp>
#include <hpx/traits/action_was_object_migrated.hpp>

#include <boost/mpl/bool.hpp>
Expand Down Expand Up @@ -361,12 +362,23 @@ namespace hpx { namespace lcos
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
typedef typename Action::component_type component_type;
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
component_type>::call(addr));

r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
return;
}
}
else
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
Expand All @@ -388,12 +400,23 @@ namespace hpx { namespace lcos

if (addr.locality_ == hpx::get_locality())
{
typedef typename Action::component_type component_type;
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
component_type>::call(addr));

r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));
return;
}
}
else
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
Expand All @@ -416,12 +439,26 @@ namespace hpx { namespace lcos
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
typedef typename Action::component_type component_type;
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
component_type>::call(addr));

r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
return;
}
}
else
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
Expand All @@ -446,12 +483,26 @@ namespace hpx { namespace lcos

if (addr.locality_ == hpx::get_locality())
{
typedef typename Action::component_type component_type;
HPX_ASSERT(traits::component_type_is_compatible<
typename Action::component_type>::call(addr));
component_type>::call(addr));

r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
addr.address_, std::forward<Ts>(vs)...));

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
return;
}
}
else
{
// local, direct execution
(*this->impl_)->set_data(action_type::execute_function(
Expand Down
71 changes: 59 additions & 12 deletions hpx/runtime/applier/detail/apply_implementations.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/runtime/naming/address.hpp>
#include <hpx/runtime/naming/id_type.hpp>
#include <hpx/traits/is_continuation.hpp>
#include <hpx/traits/component_supports_migration.hpp>
#include <hpx/traits/action_was_object_migrated.hpp>
#include <hpx/util/move.hpp>

Expand Down Expand Up @@ -40,9 +41,19 @@ namespace hpx { namespace detail
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
typedef typename Action::component_type component_type;
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
return applier::detail::apply_l_p<Action>(
std::forward<Continuation>(c), id, std::move(addr),
priority, std::forward<Ts>(vs)...);
}
}
else
{
return applier::detail::apply_l_p<Action>(
std::forward<Continuation>(c), id, std::move(addr),
Expand Down Expand Up @@ -73,9 +84,18 @@ namespace hpx { namespace detail
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
typedef typename Action::component_type component_type;
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
return applier::detail::apply_l_p<Action>(
id, std::move(addr), priority, std::forward<Ts>(vs)...);
}
}
else
{
return applier::detail::apply_l_p<Action>(
id, std::move(addr), priority, std::forward<Ts>(vs)...);
Expand Down Expand Up @@ -108,8 +128,23 @@ namespace hpx { namespace detail
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(id, addr.address_);
if (!r.first)
typedef typename Action::component_type component_type;
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
bool result = applier::detail::apply_l_p<Action>(
std::forward<Continuation>(c), id, std::move(addr),
priority, std::forward<Ts>(vs)...);

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
return result;
}
}
else
{
bool result = applier::detail::apply_l_p<Action>(
std::forward<Continuation>(c), id, std::move(addr),
Expand Down Expand Up @@ -139,16 +174,28 @@ namespace hpx { namespace detail
return false;
}


std::pair<bool, components::pinned_ptr> r;

// Determine whether the id is local or remote
naming::address addr;
if (agas::is_local_address_cached(id, addr))
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
typedef typename Action::component_type component_type;
if (traits::component_supports_migration<component_type>::call())
{
r = traits::action_was_object_migrated<Action>::call(
id, addr.address_);
if (!r.first)
{
bool result = applier::detail::apply_l_p<Action>(
id, std::move(addr), priority, std::forward<Ts>(vs)...);

// invoke callback
cb(boost::system::error_code(), parcelset::parcel());
return result;
}
}
else
{
bool result = applier::detail::apply_l_p<Action>(
id, std::move(addr), priority, std::forward<Ts>(vs)...);
Expand Down
3 changes: 0 additions & 3 deletions hpx/runtime/components/server/fixed_component_base.hpp
Expand Up @@ -180,9 +180,6 @@ class fixed_component_base : public traits::detail::fixed_component_tag
}
#endif

// This component type does not support migration.
static HPX_CONSTEXPR bool supports_migration() { return false; }

// Pinning functionality
void pin() {}
void unpin() {}
Expand Down
7 changes: 4 additions & 3 deletions hpx/runtime/components/server/migrate_component.hpp
Expand Up @@ -8,6 +8,7 @@

#include <hpx/config.hpp>
#include <hpx/traits/is_component.hpp>
#include <hpx/traits/component_supports_migration.hpp>
#include <hpx/runtime/actions/plain_action.hpp>
#include <hpx/runtime/naming/name.hpp>
#include <hpx/runtime/get_ptr.hpp>
Expand Down Expand Up @@ -167,7 +168,7 @@ namespace hpx { namespace components { namespace server
return make_ready_future(to_migrate);
}

if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
return hpx::make_exceptional_future<hpx::id_type>(
HPX_GET_EXCEPTION(invalid_status,
Expand Down Expand Up @@ -203,7 +204,7 @@ namespace hpx { namespace components { namespace server
future<id_type> trigger_migrate_component(
id_type const& to_migrate, DistPolicy const& policy)
{
if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
return hpx::make_exceptional_future<id_type>(
HPX_GET_EXCEPTION(invalid_status,
Expand Down Expand Up @@ -261,7 +262,7 @@ namespace hpx { namespace components { namespace server
future<id_type> perform_migrate_component(
id_type const& to_migrate, DistPolicy const& policy)
{
if (!Component::supports_migration())
if (!traits::component_supports_migration<Component>::call())
{
return hpx::make_exceptional_future<id_type>(
HPX_GET_EXCEPTION(invalid_status,
Expand Down
3 changes: 0 additions & 3 deletions hpx/runtime/components/server/simple_component_base.hpp
Expand Up @@ -232,9 +232,6 @@ namespace hpx { namespace components
}
#endif

// This component type does not support migration.
static HPX_CONSTEXPR bool supports_migration() { return false; }

// Pinning functionality
void pin() {}
void unpin() {}
Expand Down

0 comments on commit 0cae2f6

Please sign in to comment.