Skip to content

Commit

Permalink
Fixing #1263: apply_remote test takes too long
Browse files Browse the repository at this point in the history
  • Loading branch information
sithhell committed Sep 17, 2014
1 parent 677a35a commit 17dd2eb
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
4 changes: 3 additions & 1 deletion hpx/runtime/components/server/runtime_support.hpp
Expand Up @@ -33,6 +33,7 @@
#include <hpx/runtime/actions/manage_object_action.hpp>
#include <hpx/performance_counters/counters.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/lcos/local/mutex.hpp>
#include <hpx/lcos/local/condition_variable.hpp>

#include <hpx/plugins/plugin_factory_base.hpp>
Expand Down Expand Up @@ -421,7 +422,8 @@ namespace hpx { namespace components { namespace server
bool dijkstra_color_; // false: white, true: black
boost::atomic<bool> shutdown_all_invoked_;

lcos::local::spinlock dijkstra_mtx_;
typedef boost::mutex dijkstra_mtx_type;
dijkstra_mtx_type dijkstra_mtx_;
lcos::local::condition_variable dijkstra_cond_;

component_map_mutex_type cm_mtx_;
Expand Down
91 changes: 55 additions & 36 deletions src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -600,7 +600,7 @@ namespace hpx { namespace components { namespace server
void runtime_support::dijkstra_make_black()
{
// Rule 1: A machine sending a message makes itself black.
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
dijkstra_color_ = true;
}

Expand All @@ -627,7 +627,9 @@ namespace hpx { namespace components { namespace server

while (tm.get_thread_count() > 1)
{
this_thread::sleep_for(boost::posix_time::millisec(100));
// FIXME: this sleep_for is causing the long shutdown times.
// By commenting it, #1263 gets solved.
//this_thread::sleep_for(boost::posix_time::millisec(100));
this_thread::yield();
}

Expand All @@ -637,7 +639,7 @@ namespace hpx { namespace components { namespace server
// Rule 2: When machine nr.i + 1 propagates the probe, it hands over a
// black token to machine nr.i if it is black itself, whereas while
// being white it leaves the color of the token unchanged.
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
bool dijkstra_token = dijkstra_color_;

// Rule 5: Upon transmission of the token to machine nr.i, machine
Expand All @@ -658,25 +660,31 @@ namespace hpx { namespace components { namespace server
if (num_localities == 1)
return;

do {
// Rule 4: Machine nr.0 initiates a probe by making itself white
// and sending a white token to machine nr.N - 1.
{
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
{
// Note: we protect the entire loop here since the stopping condition
// depends on the shared variable "dijkstra_color_"
// Proper unlocking for possible remote actions needs to be taken care of
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
do {
// Rule 4: Machine nr.0 initiates a probe by making itself white
// and sending a white token to machine nr.N - 1.
dijkstra_color_ = false; // start off with white
}

dijkstra_termination_action act;
if (lcos::reduce(act, locality_ids, std_logical_or_type()).get())
{
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_color_ = true; // unsuccessful termination
}

// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.
dijkstra_termination_action act;
bool termination_aborted = false;
{
util::scoped_unlock<dijkstra_mtx_type::scoped_lock> ul(l);
termination_aborted = lcos::reduce(act, locality_ids, std_logical_or_type()).get()
}
if (termination_aborted)
{
dijkstra_color_ = true; // unsuccessful termination
}

} while (dijkstra_color_);
// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.
} while (dijkstra_color_);
}
}
#else
void runtime_support::send_dijkstra_termination_token(
Expand All @@ -694,7 +702,9 @@ namespace hpx { namespace components { namespace server

while (tm.get_thread_count() > 1)
{
this_thread::sleep_for(boost::posix_time::millisec(100));
// FIXME: this sleep_for is causing the long shutdown times.
// By commenting it, #1263 gets solved.
//this_thread::sleep_for(boost::posix_time::millisec(100));
this_thread::yield();
}

Expand All @@ -705,7 +715,7 @@ namespace hpx { namespace components { namespace server
// black token to machine nr.i if it is black itself, whereas while
// being white it leaves the color of the token unchanged.
{
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
if (dijkstra_color_)
dijkstra_token = dijkstra_color_;

Expand All @@ -730,12 +740,13 @@ namespace hpx { namespace components { namespace server
agas_client.start_shutdown();

boost::uint32_t locality_id = get_locality_id();

if (initiating_locality_id == locality_id)
{
// we received the token after a full circle
if (dijkstra_token)
{
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
dijkstra_color_ = true; // unsuccessful termination
}

Expand Down Expand Up @@ -766,25 +777,30 @@ namespace hpx { namespace components { namespace server
if (0 == target_id)
target_id = static_cast<boost::uint32_t>(num_localities);

do {
// Rule 4: Machine nr.0 initiates a probe by making itself white
// and sending a white token to machine nr.N - 1.
{
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
{
// Note: we protect the entire loop here since the stopping condition
// depends on the shared variable "dijkstra_color_"
// Proper unlocking for possible remote actions needs to be taken care of
dijkstra_mtx_type::scoped_lock l(dijkstra_mtx_);
do {
// Rule 4: Machine nr.0 initiates a probe by making itself white
// and sending a white token to machine nr.N - 1.
dijkstra_color_ = false; // start off with white
}

send_dijkstra_termination_token(target_id - 1,
initiating_locality_id, num_localities, false);
{
util::scoped_unlock<dijkstra_mtx_type::scoped_lock> ul(l);
send_dijkstra_termination_token(target_id - 1,
initiating_locality_id, num_localities, false);
}

// wait for token to come back to us
lcos::local::spinlock::scoped_lock l(dijkstra_mtx_);
dijkstra_cond_.wait(l);
// wait for token to come back to us
dijkstra_cond_.wait(l);

// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.
// Rule 3: After the completion of an unsuccessful probe, machine
// nr.0 initiates a next probe.

} while (dijkstra_color_);
} while (dijkstra_color_);
}
}
#endif

Expand All @@ -802,7 +818,10 @@ namespace hpx { namespace components { namespace server
// make sure shutdown_all is invoked only once
bool flag = false;
if (!shutdown_all_invoked_.compare_exchange_strong(flag, true))
{
BOOST_ASSERT(false);
return;
}

LRT_(info) << "runtime_support::shutdown_all: "
"initialiting application shutdown";
Expand Down
15 changes: 8 additions & 7 deletions tests/unit/lcos/apply_remote.cpp
Expand Up @@ -10,16 +10,15 @@
#include <hpx/util/lightweight_test.hpp>

///////////////////////////////////////////////////////////////////////////////
bool root_locality = false;
boost::int32_t final_result;
hpx::util::spinlock result_mutex;
hpx::lcos::local::condition_variable result_cv;

void receive_result(boost::int32_t i)
{
hpx::util::spinlock::scoped_lock l(result_mutex);
if (i > final_result)
final_result = i;
result_cv.notify_one();
}
HPX_PLAIN_ACTION(receive_result);

Expand Down Expand Up @@ -65,6 +64,7 @@ int hpx_main()
{
hpx::id_type here = hpx::find_here();
hpx::id_type there = here;
root_locality = true;
if (hpx::get_num_localities_sync() > 1)
{
std::vector<hpx::id_type> localities = hpx::find_remote_localities();
Expand Down Expand Up @@ -121,12 +121,8 @@ int hpx_main()
hpx::apply<call_action>(inc, here, 1);
}

hpx::util::spinlock::scoped_lock l(result_mutex);
result_cv.wait_for(result_mutex, boost::chrono::seconds(1),
hpx::util::bind(std::equal_to<boost::int32_t>(), boost::ref(final_result), 13));

HPX_TEST_EQ(final_result, 13);

// Let finalize wait for every "apply" to be finished
return hpx::finalize();
}

Expand All @@ -138,6 +134,11 @@ int main(int argc, char* argv[])
HPX_TEST_EQ_MSG(hpx::init(argc, argv), 0,
"HPX main exited with non-zero status");

// After hpx::init returns, all actions should have been executed
// The final result is only accumulated on the root locality
if(root_locality)
HPX_TEST_EQ(final_result, 13);

return hpx::util::report_errors();
}

0 comments on commit 17dd2eb

Please sign in to comment.