Skip to content

Commit

Permalink
Merge pull request #2518 from STEllAR-GROUP/fixing_2516
Browse files Browse the repository at this point in the history
Fixing parcel scheduling
  • Loading branch information
hkaiser committed Feb 26, 2017
2 parents d3a04b8 + 153bf4f commit b97ae1c
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 15 deletions.
1 change: 1 addition & 0 deletions hpx/lcos/barrier.hpp
Expand Up @@ -96,6 +96,7 @@ namespace hpx { namespace lcos {
// Resets this barrier instance.
void release();

void detach();

// Get the instance of the global barrier
static barrier& get_global_barrier();
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/actions/base_action.hpp
Expand Up @@ -125,7 +125,7 @@ namespace hpx { namespace actions

virtual void load_schedule(serialization::input_archive& ar,
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread) = 0;
std::size_t num_thread, bool& deferred_schedule) = 0;

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
/// The function \a get_action_name_itt returns the name of this action
Expand Down
17 changes: 15 additions & 2 deletions hpx/runtime/actions/transfer_action.hpp
Expand Up @@ -101,7 +101,7 @@ namespace hpx { namespace actions

void load_schedule(serialization::input_archive& ar,
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread);
std::size_t num_thread, bool& deferred_schedule);
};
/// \endcond

Expand Down Expand Up @@ -209,10 +209,23 @@ namespace hpx { namespace actions
template <typename Action>
void transfer_action<Action>::load_schedule(serialization::input_archive& ar,
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread)
std::size_t num_thread, bool& deferred_schedule)
{
// First, serialize, then schedule
load(ar);

if (deferred_schedule)
{
// If this is a direct action and deferred schedule was requested, that
// is we are not the last parcel, return immediately
if (base_type::direct_execution::value)
return;

// If this is not a direct action, we can safely set deferred_schedule
// to false
deferred_schedule = false;
}

schedule_thread(std::move(target), lva, num_thread);
}
}}
Expand Down
17 changes: 15 additions & 2 deletions hpx/runtime/actions/transfer_continuation_action.hpp
Expand Up @@ -96,7 +96,7 @@ namespace hpx { namespace actions

void load_schedule(serialization::input_archive& ar,
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread);
std::size_t num_thread, bool& deferred_schedule);

private:
continuation_type cont_;
Expand Down Expand Up @@ -210,10 +210,23 @@ namespace hpx { namespace actions
void transfer_continuation_action<Action>::load_schedule(
serialization::input_archive& ar,
naming::gid_type&& target, naming::address_type lva,
std::size_t num_thread)
std::size_t num_thread, bool& deferred_schedule)
{
// First, serialize, then schedule
load(ar);

if (deferred_schedule)
{
// If this is a direct action and deferred schedule was requested, that
// is we are not the last parcel, return immediately
if (base_type::direct_execution::value)
return;

// If this is not a direct action, we can safely set deferred_schedule
// to false
deferred_schedule = false;
}

schedule_thread(std::move(target), lva, num_thread);
}
}}
Expand Down
40 changes: 39 additions & 1 deletion hpx/runtime/parcelset/decode_parcels.hpp
Expand Up @@ -112,21 +112,32 @@ namespace hpx { namespace parcelset
buffer.data_point_;

{
std::vector<parcel> deferred_parcels;
// De-serialize the parcel data
serialization::input_archive archive(buffer.data_,
inbound_data_size, &chunks);

if(parcel_count == 0)
{
archive >> parcel_count; //-V128
if (parcel_count > 1)
deferred_parcels.reserve(parcel_count);
}
for(std::size_t i = 0; i != parcel_count; ++i)
{
bool deferred_schedule = true;
if (i == parcel_count - 1) deferred_schedule = false;

#if defined(HPX_HAVE_PARCELPORT_ACTION_COUNTERS)
std::size_t archive_pos = archive.current_pos();
std::int64_t serialize_time = timer.elapsed_nanoseconds();
#endif
// de-serialize parcel and add it to incoming parcel queue
parcel p;
bool migrated = p.load_schedule(archive, num_thread);
// deferred_schedule will be set to false if it was previously
// set to true and the action to be scheduled is direct.
bool migrated = p.load_schedule(archive, num_thread,
deferred_schedule);

std::int64_t add_parcel_time = timer.elapsed_nanoseconds();

Expand Down Expand Up @@ -166,6 +177,8 @@ namespace hpx { namespace parcelset
&detail::parcel_route_handler,
threads::thread_priority_normal);
}
else if (deferred_schedule)
deferred_parcels.push_back(std::move(p));

// be sure not to measure add_parcel as serialization time
overall_add_parcel_time += timer.elapsed_nanoseconds() -
Expand All @@ -175,6 +188,31 @@ namespace hpx { namespace parcelset
// complete received data with parcel count
data.num_parcels_ = parcel_count;
data.raw_bytes_ = archive.bytes_read();

for (std::size_t i = 0; i != deferred_parcels.size(); ++i)
{
// If we are the last deferred parcel, we don't need to spin
// a new thread...
if (i == deferred_parcels.size() - 1)
{
deferred_parcels[i].schedule_action(num_thread);
}
// ... otherwise, schedule the parcel on a new thread.
else
{
hpx::applier::register_thread_nullary(
util::bind(
util::one_shot(
[num_thread](parcel&& p)
{
p.schedule_action(num_thread);
}
), std::move(deferred_parcels[i])),
"schedule_parcel",
threads::pending, true, threads::thread_priority_critical,
num_thread, threads::thread_stacksize_default);
}
}
}

// store the time required for serialization
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/parcelset/parcel.hpp
Expand Up @@ -159,11 +159,11 @@ namespace hpx { namespace parcelset

std::size_t & size();

void schedule_action();
void schedule_action(std::size_t num_thread = std::size_t(-1));

// returns true if parcel was migrated, false if scheduled locally
bool load_schedule(serialization::input_archive & ar,
std::size_t num_thread);
std::size_t num_thread, bool& deferred_schedule);

// generate unique parcel id
static naming::gid_type generate_unique_id(
Expand Down
16 changes: 16 additions & 0 deletions src/lcos/barrier.cpp
Expand Up @@ -115,6 +115,22 @@ namespace hpx { namespace lcos {
}
}

void barrier::detach()
{
if (node_)
{
if (hpx::get_runtime_ptr() != nullptr &&
hpx::threads::threadmanager_is(state_running) &&
!hpx::is_stopped_or_shutting_down())
{
if ((*node_)->num_ >= (*node_)->cut_off_ || (*node_)->rank_ == 0)
hpx::unregister_with_basename(
(*node_)->base_name_, (*node_)->rank_);
}
node_.reset();
}
}

barrier barrier::create_global_barrier()
{
runtime& rt = get_runtime();
Expand Down
8 changes: 7 additions & 1 deletion src/runtime/agas/big_boot_barrier.cpp
Expand Up @@ -992,12 +992,18 @@ void big_boot_barrier::notify()
runtime& rt = get_runtime();
naming::resolver_client& agas_client = rt.get_agas_client();

bool notify = false;
{
std::lock_guard<boost::mutex> lk(mtx, std::adopt_lock);
if (agas_client.get_status() == state_starting)
{
--connected;
if (connected == 0)
notify = true;
}
}
cond.notify_all();
if (notify)
cond.notify_all();
}

// This is triggered in runtime_impl::start, after the early action handler
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -1355,7 +1355,7 @@ namespace hpx { namespace components { namespace server
rt.report_error(boost::current_exception());
}
}
lcos::barrier::get_global_barrier().release();
lcos::barrier::get_global_barrier().detach();
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/runtime/parcelset/parcel.cpp
Expand Up @@ -421,7 +421,7 @@ namespace hpx { namespace parcelset
}

bool parcel::load_schedule(serialization::input_archive & ar,
std::size_t num_thread)
std::size_t num_thread, bool& deferred_schedule)
{
load_data(ar);

Expand All @@ -440,7 +440,8 @@ namespace hpx { namespace parcelset
}

// continuation support, this is handled in the transfer action
action_->load_schedule(ar, std::move(data_.dest_), lva, num_thread);
action_->load_schedule(ar, std::move(data_.dest_), lva, num_thread,
deferred_schedule);

#if HPX_HAVE_ITTNOTIFY != 0 && !defined(HPX_HAVE_APEX)
static util::itt::event parcel_recv("recv_parcel");
Expand All @@ -457,7 +458,7 @@ namespace hpx { namespace parcelset
return false;
}

void parcel::schedule_action()
void parcel::schedule_action(std::size_t num_thread)
{
// make sure this parcel destination matches the proper locality
HPX_ASSERT(destination_locality() == data_.addr_.locality_);
Expand All @@ -479,7 +480,7 @@ namespace hpx { namespace parcelset

// dispatch action, register work item either with or without
// continuation support, this is handled in the transfer action
action_->schedule_thread(std::move(data_.dest_), lva, std::size_t(-1));
action_->schedule_thread(std::move(data_.dest_), lva, num_thread);
}

void parcel::load_data(serialization::input_archive & ar)
Expand Down
2 changes: 1 addition & 1 deletion src/runtime_impl.cpp
Expand Up @@ -537,7 +537,7 @@ namespace hpx {
std::lock_guard<boost::mutex> l(mtx_);
exception_ = e;
}
lcos::barrier::get_global_barrier().release();
lcos::barrier::get_global_barrier().detach();

// initiate stopping the runtime system
runtime_support_->notify_waiting_main();
Expand Down

0 comments on commit b97ae1c

Please sign in to comment.