Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-cache locality endpoints during bootstrap #2533

Merged
merged 2 commits into from Mar 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions hpx/runtime/agas/addressing_service.hpp
Expand Up @@ -1405,6 +1405,9 @@ struct HPX_EXPORT addressing_service

/// Remove the given object from the table of migrated objects
void unmark_as_migrated(naming::gid_type const& gid);

// Pre-cache locality endpoints in hosted locality namespace
void pre_cache_endpoints(std::vector<parcelset::endpoints_type> const&);
};

}}
Expand Down
32 changes: 23 additions & 9 deletions hpx/runtime/agas/big_boot_barrier.hpp
Expand Up @@ -30,18 +30,21 @@
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include <hpx/config/warnings_prefix.hpp>

namespace hpx { namespace agas
{

struct notification_header;

struct HPX_EXPORT big_boot_barrier
{
private:
private:
HPX_NON_COPYABLE(big_boot_barrier);

private:
private:
parcelset::parcelport* pp;
parcelset::endpoints_type const& endpoints;

Expand All @@ -54,17 +57,19 @@ struct HPX_EXPORT big_boot_barrier

boost::lockfree::queue<util::unique_function_nonser<void()>* > thunks;

std::vector<parcelset::endpoints_type> localities;

void spin();

void notify();

public:
public:
struct scoped_lock
{
private:
private:
big_boot_barrier& bbb;

public:
public:
scoped_lock(
big_boot_barrier& bbb_
)
Expand Down Expand Up @@ -101,8 +106,8 @@ struct HPX_EXPORT big_boot_barrier
, std::uint32_t target_locality_id
, parcelset::locality dest
, Action act
, Args &&... args
) { // {{{
, Args &&... args)
{ // {{{
HPX_ASSERT(pp);
naming::address addr(naming::get_gid_from_locality_id(target_locality_id));
parcelset::parcel p(
Expand All @@ -128,8 +133,8 @@ struct HPX_EXPORT big_boot_barrier
, std::uint32_t target_locality_id
, parcelset::locality const & dest
, Action act
, Args &&... args
) { // {{{
, Args &&... args)
{ // {{{
naming::address addr(naming::get_gid_from_locality_id(target_locality_id));

parcelset::put_parcel(
Expand All @@ -139,6 +144,12 @@ struct HPX_EXPORT big_boot_barrier
std::move(addr), act, std::forward<Args>(args)...);
} // }}}

void apply_notification(
std::uint32_t source_locality_id
, std::uint32_t target_locality_id
, parcelset::locality const& dest
, notification_header&& hdr);

void wait_bootstrap();
void wait_hosted(std::string const& locality_name,
naming::address::address_type primary_ns_ptr,
Expand All @@ -157,6 +168,9 @@ struct HPX_EXPORT big_boot_barrier
++k;
}
}

void add_locality_endpoints(std::uint32_t locality_id,
parcelset::endpoints_type const& endpoints);
};

HPX_EXPORT void create_big_boot_barrier(
Expand Down
37 changes: 17 additions & 20 deletions hpx/runtime/agas/locality_namespace.hpp
Expand Up @@ -30,43 +30,40 @@ namespace hpx { namespace agas
{
virtual ~locality_namespace();

virtual naming::address::address_type ptr() const=0;
virtual naming::address addr() const=0;
virtual naming::id_type gid() const=0;
virtual naming::address::address_type ptr() const = 0;
virtual naming::address addr() const = 0;
virtual naming::id_type gid() const = 0;

virtual std::uint32_t allocate(
parcelset::endpoints_type const& endpoints
, std::uint64_t count
, std::uint32_t num_threads
, naming::gid_type suggested_prefix
)=0;
) = 0;

virtual void free(naming::gid_type locality)=0;
virtual void free(naming::gid_type locality) = 0;

virtual std::vector<std::uint32_t> localities()=0;
virtual std::vector<std::uint32_t> localities() = 0;

virtual parcelset::endpoints_type resolve_locality(
naming::gid_type locality)=0;
naming::gid_type locality) = 0;

virtual std::uint32_t get_num_localities()=0;
virtual hpx::future<std::uint32_t> get_num_localities_async()=0;
virtual std::uint32_t get_num_localities() = 0;
virtual hpx::future<std::uint32_t> get_num_localities_async() = 0;

virtual std::vector<std::uint32_t> get_num_threads()=0;
virtual hpx::future<std::vector<std::uint32_t> > get_num_threads_async()=0;
virtual std::vector<std::uint32_t> get_num_threads() = 0;
virtual hpx::future<std::vector<std::uint32_t> > get_num_threads_async() = 0;

virtual std::uint32_t get_num_overall_threads()=0;
virtual hpx::future<std::uint32_t> get_num_overall_threads_async()=0;
virtual std::uint32_t get_num_overall_threads() = 0;
virtual hpx::future<std::uint32_t> get_num_overall_threads_async() = 0;

virtual naming::gid_type statistics_counter(std::string name)=0;
virtual naming::gid_type statistics_counter(std::string name) = 0;

virtual void register_counter_types()
{}
virtual void register_counter_types() {}

virtual void register_server_instance(std::uint32_t locality_id)
{}
virtual void register_server_instance(std::uint32_t locality_id) {}

virtual void unregister_server_instance(error_code& ec)
{}
virtual void unregister_server_instance(error_code& ec) {}
};
}}

Expand Down
15 changes: 15 additions & 0 deletions src/runtime/agas/addressing_service.cpp
Expand Up @@ -337,6 +337,21 @@ bool addressing_service::has_resolved_locality(
return resolved_localities_.find(gid) != resolved_localities_.end();
} // }}}

void addressing_service::pre_cache_endpoints(
std::vector<parcelset::endpoints_type> const& endpoints)
{ // {{{
std::unique_lock<mutex_type> l(resolved_localities_mtx_);
std::uint32_t locality_id = 0;
for (parcelset::endpoints_type const& endpoint : endpoints)
{
resolved_localities_.insert(
resolved_localities_type::value_type(
naming::get_gid_from_locality_id(locality_id),
endpoint));
++locality_id;
}
} // }}}

parcelset::endpoints_type const & addressing_service::resolve_locality(
naming::gid_type const & gid
, error_code& ec
Expand Down
120 changes: 69 additions & 51 deletions src/runtime/agas/big_boot_barrier.cpp
Expand Up @@ -262,16 +262,15 @@ struct registration_header

// TODO: pass head address as a GVA
registration_header(
parcelset::endpoints_type const& endpoints_
, std::uint64_t primary_ns_ptr_
, std::uint64_t symbol_ns_ptr_
, std::uint32_t cores_needed_
, std::uint32_t num_threads_
, std::string const& hostname_
, detail::unassigned_typename_sequence const& typenames_
, naming::gid_type prefix_ = naming::gid_type()
) :
endpoints(endpoints_)
parcelset::endpoints_type const& endpoints_
, std::uint64_t primary_ns_ptr_
, std::uint64_t symbol_ns_ptr_
, std::uint32_t cores_needed_
, std::uint32_t num_threads_
, std::string const& hostname_
, detail::unassigned_typename_sequence const& typenames_
, naming::gid_type prefix_ = naming::gid_type())
: endpoints(endpoints_)
, primary_ns_ptr(primary_ns_ptr_)
, symbol_ns_ptr(symbol_ns_ptr_)
, cores_needed(cores_needed_)
Expand Down Expand Up @@ -314,18 +313,17 @@ struct notification_header
{}

notification_header(
naming::gid_type const& prefix_
, parcelset::locality const & agas_locality_
, naming::address const& locality_ns_address_
, naming::address const& primary_ns_address_
, naming::address const& component_ns_address_
, naming::address const& symbol_ns_address_
, std::uint32_t num_localities_
, std::uint32_t used_cores_
, parcelset::endpoints_type const & agas_endpoints_
, detail::assigned_id_sequence const & ids_
) :
prefix(prefix_)
naming::gid_type const& prefix_
, parcelset::locality const & agas_locality_
, naming::address const& locality_ns_address_
, naming::address const& primary_ns_address_
, naming::address const& component_ns_address_
, naming::address const& symbol_ns_address_
, std::uint32_t num_localities_
, std::uint32_t used_cores_
, parcelset::endpoints_type const & agas_endpoints_
, detail::assigned_id_sequence const & ids_)
: prefix(prefix_)
, agas_locality(agas_locality_)
, locality_ns_address(locality_ns_address_)
, primary_ns_address(primary_ns_address_)
Expand All @@ -347,6 +345,7 @@ struct notification_header
std::uint32_t used_cores;
parcelset::endpoints_type agas_endpoints;
detail::assigned_id_sequence ids;
std::vector<parcelset::endpoints_type> endpoints;

#if defined(HPX_HAVE_SECURITY)
components::security::signed_certificate root_certificate;
Expand All @@ -365,6 +364,7 @@ struct notification_header
ar & used_cores;
ar & agas_endpoints;
ar & ids;
ar & endpoints;
#if defined(HPX_HAVE_SECURITY)
ar & root_certificate;
#endif
Expand Down Expand Up @@ -539,20 +539,18 @@ void register_worker(registration_header const& header)
return;
}

naming::gid_type primary_ns_gid(
primary_namespace::get_service_instance(prefix));
naming::address primary_ns_address(prefix
, components::get_component_type<agas::server::primary_namespace>()
, header.primary_ns_ptr);
// naming::gid_type primary_ns_gid(
// primary_namespace::get_service_instance(prefix));
// naming::address primary_ns_address(prefix
// , components::get_component_type<agas::server::primary_namespace>()
// , header.primary_ns_ptr);
// agas_client.bind_local(primary_ns_gid, primary_ns_address);

naming::gid_type symbol_ns_gid(
symbol_namespace::get_service_instance(prefix));

naming::address symbol_ns_address(prefix
, components::get_component_type<agas::server::symbol_namespace>()
, header.symbol_ns_ptr);

// naming::gid_type symbol_ns_gid(
// symbol_namespace::get_service_instance(prefix));
// naming::address symbol_ns_address(prefix
// , components::get_component_type<agas::server::symbol_namespace>()
// , header.symbol_ns_ptr);
// agas_client.bind_local(symbol_ns_gid, symbol_ns_address);

naming::address locality_addr(hpx::get_locality(),
Expand Down Expand Up @@ -617,6 +615,10 @@ void register_worker(registration_header const& header)
}
}

// collect endpoints from all registering localities
bbb.add_locality_endpoints(naming::get_locality_id_from_gid(prefix),
header.endpoints);

// TODO: Handle cases where localities try to connect to AGAS while it's
// shutting down.
if (agas_client.get_status() != state_starting)
Expand Down Expand Up @@ -645,23 +647,15 @@ void register_worker(registration_header const& header)
, std::move(hdr));
#else
// delay the final response until the runtime system is up and running
void (big_boot_barrier::*f)(
std::uint32_t,
std::uint32_t,
parcelset::locality,
notify_worker_action,
notification_header&&)
= &big_boot_barrier::apply<notify_worker_action, notification_header&&>;
util::unique_function_nonser<void()>* thunk
= new util::unique_function_nonser<void()>(
util::bind(
util::one_shot(f)
, std::ref(get_big_boot_barrier())
, 0
, naming::get_locality_id_from_gid(prefix)
, dest
, notify_worker_action()
, std::move(hdr)));
util::unique_function_nonser<void()>* thunk =
new util::unique_function_nonser<void()>(
util::bind(
util::one_shot(&big_boot_barrier::apply_notification)
, std::ref(get_big_boot_barrier())
, 0
, naming::get_locality_id_from_gid(prefix)
, dest
, std::move(hdr)));
get_big_boot_barrier().add_thunk(thunk);
#endif
}
Expand Down Expand Up @@ -737,6 +731,9 @@ void notify_worker(notification_header const& header)
cfg.set_first_used_core(header.used_cores);
rt.assign_cores();

// pre-cache all known locality endpoints in local AGAS
agas_client.pre_cache_endpoints(header.endpoints);

#if defined(HPX_HAVE_SECURITY)
// initialize certificate store
rt.store_root_certificate(header.root_certificate);
Expand Down Expand Up @@ -857,6 +854,24 @@ void notify_worker_security(notification_header_security const& header)
// }}}
#endif

void big_boot_barrier::apply_notification(
std::uint32_t source_locality_id
, std::uint32_t target_locality_id
, parcelset::locality const& dest
, notification_header && hdr)
{
hdr.endpoints = localities;
apply(source_locality_id, target_locality_id, dest,
notify_worker_action(), std::move(hdr));
}

void big_boot_barrier::add_locality_endpoints(std::uint32_t locality_id,
parcelset::endpoints_type const& endpoints)
{
localities.resize(locality_id + 1);
localities[locality_id] = endpoints;
}

///////////////////////////////////////////////////////////////////////////////
void big_boot_barrier::spin()
{
Expand Down Expand Up @@ -908,6 +923,9 @@ void big_boot_barrier::wait_bootstrap()
{ // {{{
HPX_ASSERT(service_mode_bootstrap == service_type);

// store endpoints of root locality for later
add_locality_endpoints(0, get_runtime().endpoints());

// the root just waits until all localities have connected
spin();
} // }}}
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/parcelset/parcelhandler.cpp
Expand Up @@ -151,12 +151,12 @@ namespace hpx { namespace parcelset

for (pports_type::value_type& pp : pports_)
{
pp.second->set_applier(applier);
if(pp.second != get_bootstrap_parcelport())
{
if(pp.first > 0)
pp.second->run(false);
}
pp.second->set_applier(applier);
}
}

Expand Down