Skip to content

Commit

Permalink
Adding a second round of termination detection after shutdown functio…
Browse files Browse the repository at this point in the history
…n were executed. That should take care of the problem reported by #1264: Error "assertion '!m_fun' failed" randomly occurs when using TCP
  • Loading branch information
hkaiser committed Sep 17, 2014
1 parent c6e9225 commit 02d286d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
4 changes: 1 addition & 3 deletions hpx/runtime/agas/big_boot_barrier.hpp
Expand Up @@ -98,9 +98,7 @@ struct HPX_EXPORT big_boot_barrier : boost::noncopyable
// no-op on non-bootstrap localities
void trigger();

void add_thunk(
HPX_STD_FUNCTION<void()>* f
)
void add_thunk(HPX_STD_FUNCTION<void()>* f)
{
thunks.push(f);
}
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/components/server/runtime_support.hpp
Expand Up @@ -403,7 +403,7 @@ namespace hpx { namespace components { namespace server
#endif

// the name says it all
void dijkstra_termination_detection(
std::size_t dijkstra_termination_detection(
std::vector<naming::id_type> const& locality_ids);

#if !defined(HPX_USE_FAST_DIJKSTRA_TERMINATION_DETECTION)
Expand Down
52 changes: 38 additions & 14 deletions src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -575,14 +575,14 @@ namespace hpx { namespace components { namespace server
///////////////////////////////////////////////////////////////////////////
// initiate system shutdown for all localities
void invoke_shutdown_functions(
std::vector<naming::id_type> const& prefixes, bool pre_shutdown)
std::vector<naming::id_type> const& localities, bool pre_shutdown)
{
#if !defined(HPX_GCC44_WORKAROUND)
call_shutdown_functions_action act;
lcos::broadcast(act, prefixes, pre_shutdown).get();
lcos::broadcast(act, localities, pre_shutdown).get();
#else
std::vector<lcos::future<void> > lazy_actions;
BOOST_FOREACH(naming::id_type const& id, prefixes)
BOOST_FOREACH(naming::id_type const& id, localities)
{
using components::stubs::runtime_support;
lazy_actions.push_back(
Expand Down Expand Up @@ -627,7 +627,7 @@ namespace hpx { namespace components { namespace server

while (tm.get_thread_count() > 1)
{
// FIXME: this sleep_for is causing the long shutdown times.
// FIXME: this sleep_for is causing very long shutdown times.
// By commenting it, #1263 gets solved.
//this_thread::sleep_for(boost::posix_time::millisec(100));
this_thread::yield();
Expand All @@ -652,13 +652,15 @@ namespace hpx { namespace components { namespace server
}

// kick off termination detection
void runtime_support::dijkstra_termination_detection(
std::size_t runtime_support::dijkstra_termination_detection(
std::vector<naming::id_type> const& locality_ids)
{
boost::uint32_t num_localities =
static_cast<boost::uint32_t>(locality_ids.size());
if (num_localities == 1)
return;
return 0;

std::size_t count = 0; // keep track of number of trials

{
// Note: we protect the entire loop here since the stopping condition
Expand All @@ -674,17 +676,24 @@ namespace hpx { namespace components { namespace server
bool termination_aborted = false;
{
util::scoped_unlock<dijkstra_mtx_type::scoped_lock> ul(l);
termination_aborted = lcos::reduce(act, locality_ids, std_logical_or_type()).get()
termination_aborted = lcos::reduce(act,
locality_ids, std_logical_or_type()).get()
}

if (termination_aborted)
{
dijkstra_color_ = true; // unsuccessful termination
}

// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.

++count;

} while (dijkstra_color_);
}

return count;
}
#else
void runtime_support::send_dijkstra_termination_token(
Expand All @@ -702,7 +711,7 @@ namespace hpx { namespace components { namespace server

while (tm.get_thread_count() > 1)
{
// FIXME: this sleep_for is causing the long shutdown times.
// FIXME: this sleep_for is causing very long shutdown times.
// By commenting it, #1263 gets solved.
//this_thread::sleep_for(boost::posix_time::millisec(100));
this_thread::yield();
Expand Down Expand Up @@ -762,13 +771,13 @@ namespace hpx { namespace components { namespace server
}

// kick off termination detection
void runtime_support::dijkstra_termination_detection(
std::size_t runtime_support::dijkstra_termination_detection(
std::vector<naming::id_type> const& locality_ids)
{
boost::uint32_t num_localities =
static_cast<boost::uint32_t>(locality_ids.size());
if (num_localities == 1)
return;
return 0;

boost::uint32_t initiating_locality_id = get_locality_id();

Expand All @@ -777,6 +786,8 @@ namespace hpx { namespace components { namespace server
if (0 == target_id)
target_id = static_cast<boost::uint32_t>(num_localities);

std::size_t count = 0; // keep track of number of trials

{
// Note: we protect the entire loop here since the stopping condition
// depends on the shared variable "dijkstra_color_"
Expand All @@ -799,8 +810,12 @@ namespace hpx { namespace components { namespace server
// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.

++count;

} while (dijkstra_color_);
}

return count;
}
#endif

Expand All @@ -823,7 +838,7 @@ namespace hpx { namespace components { namespace server
}

LRT_(info) << "runtime_support::shutdown_all: "
"initialiting application shutdown";
"initializing application shutdown";

applier::applier& appl = hpx::applier::get_applier();
naming::resolver_client& agas_client = appl.get_agas_client();
Expand All @@ -833,10 +848,11 @@ namespace hpx { namespace components { namespace server
stop_evaluating_counters();

std::vector<naming::id_type> locality_ids = find_all_localities();
dijkstra_termination_detection(locality_ids);
std::size_t count = dijkstra_termination_detection(locality_ids);

LRT_(info) << "runtime_support::shutdown_all: "
"passed termination detection";
"passed first termination detection (count: "
<< count << ").";

// execute registered shutdown functions on all localities
invoke_shutdown_functions(locality_ids, true);
Expand All @@ -845,11 +861,19 @@ namespace hpx { namespace components { namespace server
LRT_(info) << "runtime_support::shutdown_all: "
"invoked shutdown functions";

// Do a second round of termination detection to synchronize with all
// work which was triggered by the invocation of the shutdown
// functions.
count = dijkstra_termination_detection(locality_ids);

LRT_(info) << "runtime_support::shutdown_all: "
"passed second termination detection (count: "
<< count << ").";

// Shut down all localities except the the local one, we can't use
// broadcast here as we have to handle the back parcel in a special
// way.
std::reverse(locality_ids.begin(), locality_ids.end());

boost::uint32_t locality_id = get_locality_id();
std::vector<lcos::future<void> > lazy_actions;

Expand Down

0 comments on commit 02d286d

Please sign in to comment.