Skip to content

Commit

Permalink
wait_or_add_new returning thread_id_type
Browse files Browse the repository at this point in the history
Instead of pushing the newly available work to the pending queues,
wait_or_add_new now returns a newly created thread_data pointer to
be used in the scheduling loop.
  • Loading branch information
sithhell authored and Thomas Heller committed Feb 15, 2017
1 parent a04bc94 commit beb649e
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 184 deletions.
24 changes: 16 additions & 8 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -454,8 +454,8 @@ namespace hpx { namespace threads { namespace detail
{
if (HPX_LIKELY(next_thrd == nullptr)) {
// schedule other work
scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count);
next_thrd = scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count).get();
}

// schedule this thread again, make sure it ends up at
Expand Down Expand Up @@ -515,7 +515,13 @@ namespace hpx { namespace threads { namespace detail
//
// REVIEW: Passing a specific target thread may set off
// the round robin queuing.
scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread);
if (HPX_LIKELY(next_thrd == nullptr)) {
next_thrd = thrd;
}
else
{
scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread);
}
}

// Remove the mapping from thread_map_ if HPX thread is depleted
Expand All @@ -533,13 +539,16 @@ namespace hpx { namespace threads { namespace detail

// if nothing else has to be done either wait or terminate
else {
++idle_loop_count;
HPX_ASSERT(next_thrd == nullptr);

if (scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count))
next_thrd = scheduler.SchedulingPolicy::wait_or_add_new(num_thread,
running, idle_loop_count).get();

if (next_thrd == nullptr)
{
++idle_loop_count;
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
if (!running && scheduler.SchedulingPolicy::cleanup_terminated(true))
{
// if this is an inner scheduler, exit immediately
if (!(scheduler.get_scheduler_mode() & policies::delay_exit))
Expand All @@ -553,7 +562,6 @@ namespace hpx { namespace threads { namespace detail
idle_loop_count = 0;
may_exit = true;
}
}

// do background work in parcel layer and in agas
if (HPX_UNLIKELY(background_thread))
Expand Down
5 changes: 2 additions & 3 deletions hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -757,7 +757,7 @@ namespace hpx { namespace threads { namespace policies
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
bool wait_or_add_new(std::size_t num_thread, bool running,
thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
HPX_ASSERT(tree.size());
Expand All @@ -770,8 +770,7 @@ namespace hpx { namespace threads { namespace policies
transfer_tasks(num_thread/d, num_thread, 1);
}

bool result = tq->wait_or_add_new(running, idle_loop_count, added);
return result && 0 == added;
return tq->wait_or_add_new(running, idle_loop_count);
}

///////////////////////////////////////////////////////////////////////
Expand Down
51 changes: 23 additions & 28 deletions hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp
Expand Up @@ -881,11 +881,10 @@ namespace hpx { namespace threads { namespace policies
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t num_thread, bool running,
virtual thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
std::size_t added = 0;
bool result = true;
thread_id_type next_thrd = nullptr;

std::size_t high_priority_queues = high_priority_queues_.size();
thread_queue_type* this_high_priority_queue = nullptr;
Expand All @@ -894,15 +893,14 @@ namespace hpx { namespace threads { namespace policies
if (num_thread < high_priority_queues)
{
this_high_priority_queue = high_priority_queues_[num_thread];
result = this_high_priority_queue->wait_or_add_new(running,
idle_loop_count, added)
&& result;
if (0 != added) return result;
next_thrd = this_high_priority_queue->wait_or_add_new(running,
idle_loop_count);
if (next_thrd) return next_thrd;
}

result = this_queue->wait_or_add_new(
running, idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = this_queue->wait_or_add_new(
running, idle_loop_count);
if (next_thrd) return next_thrd;

for (std::size_t idx: victim_threads_[num_thread])
{
Expand All @@ -912,27 +910,25 @@ namespace hpx { namespace threads { namespace policies
num_thread < high_priority_queues)
{
thread_queue_type* q = high_priority_queues_[idx];
result = this_high_priority_queue->
wait_or_add_new(running, idle_loop_count,
added, q)
&& result;
next_thrd = this_high_priority_queue->
wait_or_add_new(running, idle_loop_count, q);

if (0 != added)
if (next_thrd)
{
q->increment_num_stolen_from_staged(added);
q->increment_num_stolen_from_staged(1);
this_high_priority_queue->
increment_num_stolen_to_staged(added);
return result;
increment_num_stolen_to_staged(1);
return next_thrd;
}
}

result = this_queue->wait_or_add_new(running,
idle_loop_count, added, queues_[idx]) && result;
if (0 != added)
next_thrd = this_queue->wait_or_add_new(running,
idle_loop_count, queues_[idx]);
if (next_thrd)
{
queues_[idx]->increment_num_stolen_from_staged(added);
this_queue->increment_num_stolen_to_staged(added);
return result;
queues_[idx]->increment_num_stolen_from_staged(1);
this_queue->increment_num_stolen_to_staged(1);
return next_thrd;
}
}

Expand Down Expand Up @@ -963,11 +959,10 @@ namespace hpx { namespace threads { namespace policies
}
#endif

result = low_priority_queue_.wait_or_add_new(running,
idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = low_priority_queue_.wait_or_add_new(running,
idle_loop_count);

return result;
return next_thrd;
}

///////////////////////////////////////////////////////////////////////
Expand Down
49 changes: 24 additions & 25 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -612,18 +612,17 @@ namespace hpx { namespace threads { namespace policies
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t num_thread, bool running,
virtual thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
std::size_t queues_size = queues_.size();
HPX_ASSERT(num_thread < queues_.size());

std::size_t added = 0;
bool result = true;
thread_id_type next_thrd = nullptr;

result = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count);
if (next_thrd) return next_thrd;

if (numa_sensitive_ != 0) // limited or no stealing across domains
{
Expand All @@ -646,13 +645,13 @@ namespace hpx { namespace threads { namespace policies
if (!test(numa_domain_mask, get_pu_num(idx))) //-V600
continue;

result = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added, queues_[idx]) && result;
if (0 != added)
next_thrd = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, queues_[idx]);
if (next_thrd)
{
queues_[idx]->increment_num_stolen_from_staged(added);
queues_[num_thread]->increment_num_stolen_to_staged(added);
return result;
queues_[idx]->increment_num_stolen_from_staged(1);
queues_[num_thread]->increment_num_stolen_to_staged(1);
return next_thrd;
}
}
}
Expand All @@ -673,13 +672,13 @@ namespace hpx { namespace threads { namespace policies
if (!test(numa_domain_mask, get_pu_num(idx))) //-V600
continue;

result = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added, queues_[idx]) && result;
if (0 != added)
next_thrd = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, queues_[idx]);
if (next_thrd)
{
queues_[idx]->increment_num_stolen_from_staged(added);
queues_[num_thread]->increment_num_stolen_to_staged(added);
return result;
queues_[idx]->increment_num_stolen_from_staged(1);
queues_[num_thread]->increment_num_stolen_to_staged(1);
return next_thrd;
}
}
}
Expand All @@ -695,13 +694,13 @@ namespace hpx { namespace threads { namespace policies

HPX_ASSERT(idx != num_thread);

result = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added, queues_[idx]) && result;
if (0 != added)
next_thrd = queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, queues_[idx]);
if (next_thrd)
{
queues_[idx]->increment_num_stolen_from_staged(added);
queues_[num_thread]->increment_num_stolen_to_staged(added);
return result;
queues_[idx]->increment_num_stolen_from_staged(1);
queues_[num_thread]->increment_num_stolen_to_staged(1);
return next_thrd;
}
}
}
Expand Down Expand Up @@ -733,7 +732,7 @@ namespace hpx { namespace threads { namespace policies
}
#endif

return result;
return nullptr;
}

///////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -285,7 +285,7 @@ namespace hpx { namespace threads { namespace policies
virtual bool destroy_thread(threads::thread_data* thrd,
std::int64_t& busy_count) = 0;

virtual bool wait_or_add_new(std::size_t num_thread, bool running,
virtual thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count) = 0;

virtual void on_start_thread(std::size_t num_thread) = 0;
Expand Down
27 changes: 13 additions & 14 deletions hpx/runtime/threads/policies/static_priority_queue_scheduler.hpp
Expand Up @@ -107,24 +107,27 @@ namespace hpx { namespace threads { namespace policies
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
bool wait_or_add_new(std::size_t num_thread, bool running,
thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
HPX_ASSERT(num_thread < this->queues_.size());

std::size_t added = 0;
bool result = true;
thread_id_type next_thrd = nullptr;

if (num_thread < this->high_priority_queues_.size())
{
result = this->high_priority_queues_[num_thread]->
wait_or_add_new(running, idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = this->high_priority_queues_[num_thread]->
wait_or_add_new(running, idle_loop_count);
if (next_thrd) return next_thrd;
}

result = this->queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = this->queues_[num_thread]->wait_or_add_new(running,
idle_loop_count);
if (next_thrd) return next_thrd;

next_thrd = this->low_priority_queue_.wait_or_add_new(running,
idle_loop_count);
if (next_thrd) return next_thrd;

#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
// no new work is available, are we deadlocked?
Expand Down Expand Up @@ -154,11 +157,7 @@ namespace hpx { namespace threads { namespace policies
}
#endif

result = this->low_priority_queue_.wait_or_add_new(running,
idle_loop_count, added) && result;
if (0 != added) return result;

return result;
return next_thrd;
}
};
}}}
Expand Down
13 changes: 6 additions & 7 deletions hpx/runtime/threads/policies/static_queue_scheduler.hpp
Expand Up @@ -100,18 +100,17 @@ namespace hpx { namespace threads { namespace policies
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t num_thread, bool running,
virtual thread_id_type wait_or_add_new(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count)
{
std::size_t queues_size = this->queues_.size();
HPX_ASSERT(num_thread < queues_size);

std::size_t added = 0;
bool result = true;
thread_id_type next_thrd;

result = this->queues_[num_thread]->wait_or_add_new(running,
idle_loop_count, added) && result;
if (0 != added) return result;
next_thrd = this->queues_[num_thread]->wait_or_add_new(running,
idle_loop_count);
if (next_thrd) return next_thrd;

#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION
// no new work is available, are we deadlocked?
Expand Down Expand Up @@ -141,7 +140,7 @@ namespace hpx { namespace threads { namespace policies
}
#endif

return result;
return nullptr;
}
};
}}}
Expand Down

0 comments on commit beb649e

Please sign in to comment.