Skip to content

Commit

Permalink
Making sure resolve_localities does not hang during normal operation.
Browse files Browse the repository at this point in the history
- This fixes #1804: register_with_basename causes hangs
  • Loading branch information
hkaiser committed Oct 20, 2015
1 parent 3ee6cd1 commit 538136b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 6 deletions.
4 changes: 4 additions & 0 deletions hpx/runtime/agas/addressing_service.hpp
Expand Up @@ -324,6 +324,10 @@ struct HPX_EXPORT addressing_service : boost::noncopyable
, error_code& ec = throws
);

bool has_resolved_locality(
naming::gid_type const & gid
);

/// \brief Remove a locality from the runtime.
bool unregister_locality(
naming::gid_type const & gid
Expand Down
12 changes: 9 additions & 3 deletions src/runtime/agas/addressing_service.cpp
Expand Up @@ -538,6 +538,14 @@ void addressing_service::register_console(parcelset::endpoints_type const & eps)
HPX_ASSERT(res.second);
}

bool addressing_service::has_resolved_locality(
naming::gid_type const & gid
)
{ // {{{
boost::unique_lock<mutex_type> l(resolved_localities_mtx_);
return resolved_localities_.find(gid) != resolved_localities_.end();
} // }}}

parcelset::endpoints_type const & addressing_service::resolve_locality(
naming::gid_type const & gid
, error_code& ec
Expand Down Expand Up @@ -578,9 +586,7 @@ parcelset::endpoints_type const & addressing_service::resolve_locality(
if (0 == threads::get_self_ptr())
{
// this should happen only during bootstrap
// FIXME: Disabled this assert cause it fires.
// It should not, but doesn't do any harm
//HPX_ASSERT(hpx::is_starting());
HPX_ASSERT(hpx::is_starting());

while(!endpoints_future.is_ready())
/**/;
Expand Down
29 changes: 26 additions & 3 deletions src/runtime/parcelset/parcelhandler.cpp
Expand Up @@ -13,6 +13,7 @@
#include <hpx/util/io_service_pool.hpp>
#include <hpx/util/safe_lexical_cast.hpp>
#include <hpx/util/runtime_configuration.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/runtime/naming/resolver_client.hpp>
#include <hpx/runtime/parcelset/parcelhandler.hpp>
#include <hpx/runtime/parcelset/static_parcelports.hpp>
Expand Down Expand Up @@ -365,12 +366,34 @@ namespace hpx { namespace parcelset
{
HPX_ASSERT(resolver_);

// properly initialize parcel
init_parcel(p);

naming::id_type const* ids = p.destinations();
naming::address* addrs = p.addrs();

// During bootstrap this is handled separately (see
// addressing_service::resolve_locality.
if (0 == hpx::threads::get_self_ptr() && !hpx::is_starting())
{
HPX_ASSERT(resolver_);
if (!resolver_->has_resolved_locality(ids[0].get_gid()))
{
// reschedule request as an HPX thread to avoid hangs
void (parcelhandler::*put_parcel_ptr) (
parcel p, write_handler_type f
) = &parcelhandler::put_parcel;

threads::register_thread_nullary(
util::bind(
util::one_shot(put_parcel_ptr), this,
std::move(p), std::move(f)),
"parcelhandler::put_parcel", threads::pending, true,
threads::thread_priority_boost);
return;
}
}

// properly initialize parcel
init_parcel(p);

bool resolved_locally = true;

#if !defined(HPX_SUPPORT_MULTIPLE_PARCEL_DESTINATIONS)
Expand Down
3 changes: 3 additions & 0 deletions tests/regressions/agas/CMakeLists.txt
Expand Up @@ -8,8 +8,11 @@ set(tests
duplicate_id_registration_1596
pass_by_value_id_type_action
send_gid_keep_component_1624
register_with_basename_1804
)

set(register_with_basename_1804_PARAMETERS LOCALITIES 4)

foreach(test ${tests})
set(sources
${test}.cpp)
Expand Down
98 changes: 98 additions & 0 deletions tests/regressions/agas/register_with_basename_1804.cpp
@@ -0,0 +1,98 @@
// Copyright (c) 2015 Andreas Schaefer
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This illustrates the issue as reported by #1804: register_with_basename
// causes hangs

#include <hpx/hpx.hpp>
#include <hpx/hpx_init.hpp>

#include <iostream>
#include <vector>

#include <boost/shared_ptr.hpp>

static std::string itoa(int i)
{
std::stringstream buf;
buf << i;
return buf.str();
}

struct test_server
: hpx::components::simple_component_base<test_server>
{
test_server()
{}

hpx::id_type call() const
{
return hpx::find_here();
}
HPX_DEFINE_COMPONENT_ACTION(test_server, call, call_action);
};

typedef hpx::components::simple_component<test_server> server_type;
HPX_REGISTER_COMPONENT(server_type, test_server);

typedef test_server::call_action call_action;
HPX_REGISTER_ACTION(call_action);

std::string genName(int source, int target)
{
std::string basename = "/0/HPXSimulatorUpdateGroupSdfafafasdasd";

return basename + "/PatchLink/" +
itoa(source) + "-" +
itoa(target);
}

void testBar()
{
int rank = hpx::get_locality_id();

std::vector<hpx::id_type> boundingBoxReceivers;
std::vector<hpx::id_type> boundingBoxAccepters;
for (int i = 0; i < 4; ++i) {
if (i == rank)
continue;

std::string name = genName(i, rank);
std::cout << "registration: " << name << "\n";

hpx::id_type id = hpx::new_<test_server>(hpx::find_here()).get();
hpx::register_with_basename(name, id, 0).get();
boundingBoxReceivers.push_back(id);
}

for (int i = 0; i < 4; ++i) {
if (i == rank)
continue;

std::string name = genName(rank, i);
std::cout << "lookup: " << name << "\n";
std::vector<hpx::future<hpx::id_type> > ids =
hpx::find_all_from_basename(name, 1);
boundingBoxAccepters.push_back(std::move(ids[0].get()));
}

std::cout << "all done " << rank << "\n";
}

int hpx_main(int argc, char **argv)
{
testBar();
return hpx::finalize();
}

int main(int argc, char **argv)
{
// We want HPX to run hpx_main() on all localities to avoid the
// initial overhead caused by broadcasting the work from one to
// all other localities:
std::vector<std::string> config(1, "hpx.run_hpx_main!=1");

return hpx::init(argc, argv, config);
}

0 comments on commit 538136b

Please sign in to comment.