Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Towards safer migration #1378

Merged
merged 12 commits into from Feb 25, 2015
Merged
2 changes: 2 additions & 0 deletions docs/CMakeLists.txt
Expand Up @@ -50,6 +50,8 @@ set(doxygen_dependencies
"${hpx_SOURCE_DIR}/hpx/error.hpp"
"${hpx_SOURCE_DIR}/hpx/exception.hpp"
"${hpx_SOURCE_DIR}/hpx/exception_list.hpp"
"${hpx_SOURCE_DIR}/hpx/components/component_storage/migrate_from_storage.hpp"
"${hpx_SOURCE_DIR}/hpx/components/component_storage/migrate_to_storage.hpp"
"${hpx_SOURCE_DIR}/hpx/parallel/execution_policy.hpp"
"${hpx_SOURCE_DIR}/hpx/parallel/algorithm.hpp"
"${hpx_SOURCE_DIR}/hpx/parallel/task_region.hpp"
Expand Down
8 changes: 7 additions & 1 deletion docs/hpx.idx
Expand Up @@ -46,6 +46,12 @@ find_ids_from_basename "" "hpx\.find_ids_from_basename.*"
register_id_with_basename "" "hpx\.register_id_with_basename.*"
unregister_id_with_basename "" "hpx\.unregister_id_with_basename.*"

# hpx/components/component_storage/migrate_from_storage.hpp
migrate_from_storage "" "hpx\.components\.migrate_from_s.*"

# hpx/components/component_storage/migrate_to_storage.hpp
migrate_to_storage "" "hpx\.components\.migrate_to_s.*"

# hpx/parallel/execution_policy.hpp
parallel::execution_policy "execution_policy" "hpx\.parallel\.v1\.execution_policy.*"
parallel::parallel_execution_policy "parallel_execution_policy" "hpx\.parallel\.v1\.parallel_exec.*"
Expand Down Expand Up @@ -175,7 +181,7 @@ new_colocated "" "hpx\.components\.new_colocated.*"
copy "" "hpx\.components\.copy.*"

# hpx/runtime/components/migrate_component.hpp
migrate "" "hpx\.components\.migrate.*"
migrate "" "hpx\.components\.migrate_id.*"

# hpx/exception.hpp
HPX_THROW_EXCEPTION "" "HPX_THROW_EXCEPTION"
Expand Down
18 changes: 10 additions & 8 deletions docs/manual/existing_performance_counters.qbk
Expand Up @@ -24,7 +24,8 @@ system and application performance.
`iterate_names`, `iterate_types`, `localities`,
`num_localities`, `num_localities_type`, `num_threads`, `resolve`,
`resolve_gid`, `resolve_id`, `resolve_locality`, `resolved_localities`,
`route`, `unbind`, `unbind_gid`, `unbind_name`.
`route`, `unbind`, `unbind_gid`, `unbind_name`
`begin_migration`, `end_migration`.
]
[`<agas_instance>/total`

Expand Down Expand Up @@ -67,7 +68,8 @@ system and application performance.
`iterate_names`, `iterate_types`, `localities`,
`num_localities`, `num_localities_type`, `num_threads`, `resolve`,
`resolve_gid`, `resolve_id`, `resolve_locality`, `resolved_localities`,
`route`, `unbind`, `unbind_gid`, `unbind_name`.
`route`, `unbind`, `unbind_gid`, `unbind_name`,
`begin_migration`, `end_migration`.
]
[`<agas_instance>/total`

Expand Down Expand Up @@ -552,10 +554,10 @@ system and application performance.
instance name is `total` the counter returns the average time spent on
overhead while executing one __hpx__-thread for all worker threads (cores)
on that locality. If the instance name is `worker-thread#*` the counter
will return the average time spent on overhead executing
one __hpx__-thread for all worker threads separately.
This counter is available only if the configuration time constants
`HPX_THREAD_MAINTAIN_CUMULATIVE_COUNTS` (default: ON) and
will return the average time spent on overhead executing
one __hpx__-thread for all worker threads separately.
This counter is available only if the configuration time constants
`HPX_THREAD_MAINTAIN_CUMULATIVE_COUNTS` (default: ON) and
`HPX_THREAD_MAINTAIN_IDLE_RATES` are set to `ON` (default: OFF).]
]
[ [`/threads/count/cumulative-phases`]
Expand Down Expand Up @@ -640,8 +642,8 @@ system and application performance.
phase (invocation) for all worker threads (cores) on that locality.
If the instance name is `worker-thread#*` the counter will return
the average time spent on overhead executing one __hpx__-thread phase
for all worker threads separately. This counter is available only
if the configuration time constants
for all worker threads separately. This counter is available only
if the configuration time constants
`HPX_THREAD_MAINTAIN_CUMULATIVE_COUNTS` (default: ON) and
`HPX_THREAD_MAINTAIN_IDLE_RATES` are set to `ON` (default: OFF).]
]
Expand Down
31 changes: 7 additions & 24 deletions hpx/components/component_storage/migrate_from_storage.hpp
Expand Up @@ -3,6 +3,8 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file migrate_from_storage.hpp

#if !defined(HPX_MIGRATE_FROM_STORAGE_FEB_09_2015_0329PM)
#define HPX_MIGRATE_FROM_STORAGE_FEB_09_2015_0329PM

Expand All @@ -21,13 +23,11 @@ namespace hpx { namespace components
///
/// The function \a migrate_from_storage<Component> will migrate the
/// component referenced by \a to_resurrect from the storage facility
/// specified with \a source_storage. It returns a future referring to
/// the migrated component instance. The component instance is resurrected
/// on the locality specified by \a target_locality.
/// specified where the object is currently stored on. It returns a future
/// referring to the migrated component instance. The component instance
/// is resurrected on the locality specified by \a target_locality.
///
/// \param to_resurrect [in] The global id of the component to migrate.
/// \param source_storage [in] The id of the storage facility to migrate
/// this object from.
/// \param target [in] The optional locality to resurrect the
/// object on. By default the object is resurrected
/// on the locality it was located on last.
Expand All @@ -47,28 +47,11 @@ namespace hpx { namespace components
>::type
#endif
migrate_from_storage(naming::id_type const& to_resurrect,
naming::id_type const& source_storage,
naming::id_type const& target = naming::invalid_id)
{
typedef server::migrate_from_storage_here_action<Component>
typedef server::trigger_migrate_from_storage_here_action<Component>
action_type;
return async_colocated<action_type>(source_storage, source_storage,
to_resurrect, target);
}

template <typename Component>
#if defined(DOXYGEN)
future<naming::id_type>
#else
inline typename std::enable_if<
traits::is_component<Component>::value, future<naming::id_type>
>::type
#endif
migrate_from_storage(naming::id_type const& to_resurrect,
hpx::components::component_storage const& source_storage,
naming::id_type const& target = naming::invalid_id)
{
return migrate_from_storage<Component>(source_storage.get_gid(),
return async<action_type>(naming::get_locality_from_id(to_resurrect),
to_resurrect, target);
}
}}
Expand Down
8 changes: 5 additions & 3 deletions hpx/components/component_storage/migrate_to_storage.hpp
Expand Up @@ -3,6 +3,8 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

/// \file migrate_to_storage.hpp

#if !defined(HPX_MIGRATE_TO_STORAGE_FEB_04_2015_1245PM)
#define HPX_MIGRATE_TO_STORAGE_FEB_04_2015_1245PM

Expand Down Expand Up @@ -44,10 +46,10 @@ namespace hpx { namespace components
migrate_to_storage(naming::id_type const& to_migrate,
naming::id_type const& target_storage)
{
typedef server::migrate_to_storage_here_action<Component>
typedef server::trigger_migrate_to_storage_here_action<Component>
action_type;
return async_colocated<action_type>(to_migrate, to_migrate,
target_storage);
return async<action_type>(naming::get_locality_from_id(to_migrate),
to_migrate, target_storage);
}

/// Migrate the given component to the specified target storage
Expand Down
115 changes: 86 additions & 29 deletions hpx/components/component_storage/server/migrate_from_storage.hpp
Expand Up @@ -17,12 +17,46 @@

namespace hpx { namespace components { namespace server
{
///////////////////////////////////////////////////////////////////////////
// Migrate given component from the specified storage component
//
// Object migration is performed from the storage in several steps:
//
// 1) The migration is triggered by invoking the
// trigger_migrate_from_storage_here_action on the locality which is
// responsible for managing the address resolution for the object which
// has to be migrated.
// 2) The trigger_migrate_from_storage_here_action performs 3 steps:
// a) Invoke agas::begin_migration, which marks the global id in AGAS,
// deferring all address resolution requests until end_migration is
// called.
// b) Invoke the actual migration operation (see step 3)
// c) Invoke end_migration, which un-marks the global id and releases
// all pending address resolution requests. Those requests now return
// the new object location.
// 3) The actual migration (component_storage::migrate_from_here_action)
// is executed on the storage facility where the object is currently
// stored. This involves several steps as well:
// a) Retrieve the byte stream representing the object from the storage
// b) Deserialize the byte stream to re-create the object. The newly
// recreated object is pinned immediately. The object is unpinned by
// the deleter associated with the shared pointer.
// c) Invoke the action runtime_support::migrate_component on the
// locality where the object has to be moved to. This passes
// along the shared pointer to the object and recreates the object
// on the target locality and updates the association of the object's
// global id with the new local virtual address in AGAS.
// d) Mark the old object (through the original shared pointer) as
// migrated which will delete it once the shared pointer goes out of
// scope.
//
namespace detail
{
///////////////////////////////////////////////////////////////////////
template <typename Component>
future<naming::id_type> migrate_from_storage_here_id(
naming::id_type const& target_locality,
boost::shared_ptr<Component> ptr,
boost::shared_ptr<Component> const& ptr,
naming::id_type const& to_resurrect)
{
// and resurrect it on the specified locality
Expand All @@ -36,18 +70,20 @@ namespace hpx { namespace components { namespace server

template <typename Component>
future<naming::id_type> migrate_from_storage_here_address(
future<naming::address> && f, boost::shared_ptr<Component> ptr,
naming::address const& addr,
boost::shared_ptr<Component> const& ptr,
naming::id_type const& to_resurrect)
{
naming::id_type id(f.get().locality_, id_type::unmanaged);
naming::id_type id(addr.locality_, id_type::unmanaged);
return migrate_from_storage_here_id(id, ptr, to_resurrect);
}

// convert the extracted data into a living component instance
template <typename Component>
future<naming::id_type> migrate_from_storage_here(
future<std::vector<char> > f,
future<std::vector<char> > && f,
naming::id_type const& to_resurrect,
naming::address const& addr,
naming::id_type const& target_locality)
{
// recreate the object
Expand All @@ -63,18 +99,12 @@ namespace hpx { namespace components { namespace server
// make sure the migration code works properly
ptr->pin();

// if target locality is not specified, ask AGAS for the address
// the object was living on before
// if target locality is not specified, use the address of the last
// locality where the object was living before
if (target_locality == naming::invalid_id)
{
// FIXME: This could be less efficient than possible if the
// responsible AGAS locality is not the same as
// the target locality where the object should be
// resurrected.
return agas::resolve(to_resurrect).then(
util::bind(
&migrate_from_storage_here_address<Component>,
util::placeholders::_1, ptr, to_resurrect));
return migrate_from_storage_here_address<Component>(addr, ptr,
to_resurrect);
}

// otherwise directly refer to the locality where the object should
Expand All @@ -85,43 +115,70 @@ namespace hpx { namespace components { namespace server
}

///////////////////////////////////////////////////////////////////////////
// This is executed on the locality responsible for managing the address
// resolution for the given object.
template <typename Component>
future<naming::id_type> migrate_from_storage_here(
future<naming::id_type> trigger_migrate_from_storage_here(
naming::id_type const& to_resurrect,
naming::id_type const& source_storage,
naming::id_type const& target_locality)
{
if (!Component::supports_migration())
{
HPX_THROW_EXCEPTION(invalid_status,
"hpx::components::server::migrate_from_storage_here",
"hpx::components::server::trigger_migrate_from_storage_here",
"attempting to migrate an instance of a component which "
"does not support migration");
return make_ready_future(naming::invalid_id);
}

// retrieve the data from the given storage
typedef typename server::component_storage::migrate_from_here_action
action_type;
return async<action_type>(source_storage, to_resurrect.get_gid())
.then(util::bind(
&detail::migrate_from_storage_here<Component>,
util::placeholders::_1, to_resurrect, target_locality));
if (naming::get_locality_id_from_id(to_resurrect) != get_locality_id())
{
HPX_THROW_EXCEPTION(invalid_status,
"hpx::components::server::trigger_migrate_from_storage_here",
"this function has to be executed on the locality responsible "
"for managing the address of the given object");
return make_ready_future(naming::invalid_id);
}

return agas::begin_migration(to_resurrect, target_locality)
.then(
[to_resurrect, target_locality](
future<std::pair<naming::id_type, naming::address> > && f)
-> future<naming::id_type>
{
// rethrow errors
std::pair<naming::id_type, naming::address> r = f.get();

// retrieve the data from the given storage
typedef typename server::component_storage::migrate_from_here_action
action_type;
return async<action_type>(r.first, to_resurrect.get_gid())
.then(util::bind(
&detail::migrate_from_storage_here<Component>,
util::placeholders::_1, to_resurrect,
r.second, target_locality));
})
.then(
[to_resurrect](future<naming::id_type> && f) -> naming::id_type
{
agas::end_migration(to_resurrect).get();
return f.get();
});
}

template <typename Component>
struct migrate_from_storage_here_action
struct trigger_migrate_from_storage_here_action
: ::hpx::actions::action<
future<naming::id_type> (*)(naming::id_type const&,
naming::id_type const&, naming::id_type const&)
, &migrate_from_storage_here<Component>
, migrate_from_storage_here_action<Component> >
naming::id_type const&)
, &trigger_migrate_from_storage_here<Component>
, trigger_migrate_from_storage_here_action<Component> >
{};
}}}

HPX_REGISTER_PLAIN_ACTION_TEMPLATE(
(template <typename Component>),
(hpx::components::server::migrate_from_storage_here_action<Component>))
(hpx::components::server::trigger_migrate_from_storage_here_action<Component>))

#endif

Expand Down