Skip to content

Commit

Permalink
Enable controlling amount of pending threads which must be available …
Browse files Browse the repository at this point in the history
…to allow thread stealing

- this value is exposed as hpx.thread_queue.min_tasks_to_steal_pending, default is zero
- flyby: renamed the same value for staged tasks to hpx.thread_queue.min_tasks_to_steal_staged

# Conflicts:
#	hpx/runtime/threads/detail/scheduling_loop.hpp
  • Loading branch information
hkaiser committed Feb 10, 2017
1 parent 7967d52 commit 47145dd
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 36 deletions.
15 changes: 11 additions & 4 deletions docs/manual/config_defaults.qbk
Expand Up @@ -175,7 +175,8 @@ by section basis below.
[teletype]
``
[hpx.thread_queue]
min_tasks_to_steal = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL:10}
min_tasks_to_steal_pending = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_PENDING:0}
min_tasks_to_steal_staged = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_STAGED:10}
min_add_new_count = ${HPX_THREAD_QUEUE_MIN_ADD_NEW_COUNT:10}
max_add_new_count = ${HPX_THREAD_QUEUE_MAX_ADD_NEW_COUNT:10}
max_delete_count = ${HPX_THREAD_QUEUE_MAX_DELETE_COUNT:1000}
Expand All @@ -184,9 +185,15 @@ by section basis below.

[table:ini_hpx_thread_queue
[[Property] [Description]]
[[`hpx.thread_queue.min_tasks_to_steal`]
[The value of this property defines the number __hpx__ tasks have to be
available before neighboring cores are allowed to steal work.]]
[[`hpx.thread_queue.min_tasks_to_steal_pending`]
[The value of this property defines the number of pending __hpx__ threads
which have to be available before neighboring cores are allowed to steal
work. The default is to allow stealing always.]]
[[`hpx.thread_queue.min_tasks_to_steal_staged`]
[The value of this property defines the number of staged __hpx__ tasks have
which to be available before neighboring cores are allowed to steal work.
The default is to allow stealing only if there are more tan 10 tasks
available.]]
[[`hpx.thread_queue.min_add_new_count`]
[The value of this property defines the minimal number tasks to be
converted into __hpx__ threads whenever the thread queues for a core have
Expand Down
14 changes: 8 additions & 6 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -305,9 +305,12 @@ namespace hpx { namespace threads { namespace detail
while (true) {
// Get the next HPX thread from the queue
thrd = next_thrd;
bool running =
this_state.load(boost::memory_order_relaxed) < state_stopping;

if (HPX_LIKELY(thrd ||
scheduler.SchedulingPolicy::get_next_thread(num_thread,
idle_loop_count, thrd)))
scheduler.SchedulingPolicy::get_next_thread(
num_thread, running, idle_loop_count, thrd)))
{
tfunc_time_wrapper tfunc_time_collector(idle_rate);

Expand Down Expand Up @@ -415,8 +418,7 @@ namespace hpx { namespace threads { namespace detail
if (HPX_LIKELY(next_thrd == nullptr)) {
// schedule other work
scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, this_state.load() < state_stopping,
idle_loop_count);
num_thread, running, idle_loop_count);
}

// schedule this thread again, make sure it ends up at
Expand Down Expand Up @@ -459,8 +461,8 @@ namespace hpx { namespace threads { namespace detail
else {
++idle_loop_count;

if (scheduler.SchedulingPolicy::wait_or_add_new(num_thread,
this_state.load() < state_stopping, idle_loop_count))
if (scheduler.SchedulingPolicy::wait_or_add_new(
num_thread, running, idle_loop_count))
{
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -650,7 +650,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if none is
/// available
bool get_next_thread(std::size_t num_thread,
bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
HPX_ASSERT(tree.size());
Expand Down
Expand Up @@ -490,7 +490,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if none is
/// available
virtual bool get_next_thread(std::size_t num_thread,
virtual bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
std::size_t queues_size = queues_.size();
Expand All @@ -503,7 +503,8 @@ namespace hpx { namespace threads { namespace policies
if (num_thread < high_priority_queues)
{
this_high_priority_queue = high_priority_queues_[num_thread];
bool result = this_high_priority_queue->get_next_thread(thrd);
bool result =
this_high_priority_queue->get_next_thread(thrd);

this_high_priority_queue->increment_num_pending_accesses();
if (result)
Expand Down Expand Up @@ -535,7 +536,7 @@ namespace hpx { namespace threads { namespace policies
num_thread < high_priority_queues)
{
thread_queue_type* q = high_priority_queues_[idx];
if (q->get_next_thread(thrd))
if (q->get_next_thread(thrd, running))
{
q->increment_num_stolen_from_pending();
this_high_priority_queue->
Expand All @@ -544,7 +545,7 @@ namespace hpx { namespace threads { namespace policies
}
}

if (queues_[idx]->get_next_thread(thrd))
if (queues_[idx]->get_next_thread(thrd, running))
{
queues_[idx]->increment_num_stolen_from_pending();
this_queue->increment_num_stolen_to_pending();
Expand Down
8 changes: 4 additions & 4 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -319,7 +319,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if none is
/// available
virtual bool get_next_thread(std::size_t num_thread,
virtual bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
std::size_t queues_size = queues_.size();
Expand Down Expand Up @@ -368,7 +368,7 @@ namespace hpx { namespace threads { namespace policies
continue;

thread_queue_type* q = queues_[idx];
if (q->get_next_thread(thrd))
if (q->get_next_thread(thrd, running))
{
q->increment_num_stolen_from_pending();
queues_[num_thread]->increment_num_stolen_to_pending();
Expand Down Expand Up @@ -397,7 +397,7 @@ namespace hpx { namespace threads { namespace policies
continue;

thread_queue_type* q = queues_[idx];
if (q->get_next_thread(thrd))
if (q->get_next_thread(thrd, running))
{
q->increment_num_stolen_from_pending();
queues_[num_thread]->increment_num_stolen_to_pending();
Expand All @@ -418,7 +418,7 @@ namespace hpx { namespace threads { namespace policies
HPX_ASSERT(idx != num_thread);

thread_queue_type* q = queues_[idx];
if (q->get_next_thread(thrd))
if (q->get_next_thread(thrd, running))
{
q->increment_num_stolen_from_pending();
queues_[num_thread]->increment_num_stolen_to_pending();
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -272,7 +272,7 @@ namespace hpx { namespace threads { namespace policies
thread_state_enum initial_state, bool run_now, error_code& ec,
std::size_t num_thread) = 0;

virtual bool get_next_thread(std::size_t num_thread,
virtual bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd) = 0;

virtual void schedule_thread(threads::thread_data* thrd,
Expand Down
Expand Up @@ -65,7 +65,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if non is
/// available
bool get_next_thread(std::size_t num_thread,
bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
std::size_t queues_size = this->queues_.size();
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/static_queue_scheduler.hpp
Expand Up @@ -74,7 +74,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if none is
/// available
virtual bool get_next_thread(std::size_t num_thread,
virtual bool get_next_thread(std::size_t num_thread, bool,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
typedef typename base_type::thread_queue_type thread_queue_type;
Expand Down
39 changes: 28 additions & 11 deletions hpx/runtime/threads/policies/thread_queue.hpp
Expand Up @@ -77,12 +77,20 @@ namespace hpx { namespace threads { namespace policies

namespace detail
{
inline int get_min_tasks_to_steal()
inline int get_min_tasks_to_steal_pending()
{
static int min_tasks_to_steal =
static int min_tasks_to_steal_pending =
boost::lexical_cast<int>(hpx::get_config_entry(
"hpx.thread_queue.min_tasks_to_steal", "10"));
return min_tasks_to_steal;
"hpx.thread_queue.min_tasks_to_steal_pending", "0"));
return min_tasks_to_steal_pending;
}

inline int get_min_tasks_to_steal_staged()
{
static int min_tasks_to_steal_staged =
boost::lexical_cast<int>(hpx::get_config_entry(
"hpx.thread_queue.min_tasks_to_steal_staged", "10"));
return min_tasks_to_steal_staged;
}

inline int get_min_add_new_count()
Expand Down Expand Up @@ -153,7 +161,8 @@ namespace hpx { namespace threads { namespace policies
typedef Mutex mutex_type;

// don't steal if less than this amount of tasks are left
int const min_tasks_to_steal;
int const min_tasks_to_steal_pending;
int const min_tasks_to_steal_staged;

// create at least this amount of threads from tasks
int const min_add_new_count;
Expand Down Expand Up @@ -523,7 +532,8 @@ namespace hpx { namespace threads { namespace policies

thread_queue(std::size_t queue_num = std::size_t(-1),
std::size_t max_count = max_thread_count)
: min_tasks_to_steal(detail::get_min_tasks_to_steal()),
: min_tasks_to_steal_pending(detail::get_min_tasks_to_steal_pending()),
min_tasks_to_steal_staged(detail::get_min_tasks_to_steal_staged()),
min_add_new_count(detail::get_min_add_new_count()),
max_add_new_count(detail::get_max_add_new_count()),
max_delete_count(detail::get_max_delete_count()),
Expand Down Expand Up @@ -812,11 +822,19 @@ namespace hpx { namespace threads { namespace policies
/// Return the next thread to be executed, return false if non is
/// available
bool get_next_thread(threads::thread_data*& thrd,
bool steal = false) HPX_HOT
bool allow_stealing = false, bool steal = false) HPX_HOT
{
std::int64_t work_items_count =
work_items_count_.load(boost::memory_order_relaxed);

if (allow_stealing && min_tasks_to_steal_pending > work_items_count)
{
return false;
}

#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME
thread_description* tdesc;
if (work_items_.pop(tdesc, steal))
if (0 != work_items_count && work_items_.pop(tdesc, steal))
{
--work_items_count_;

Expand All @@ -832,8 +850,7 @@ namespace hpx { namespace threads { namespace policies
return true;
}
#else
if (0 != work_items_count_.load(boost::memory_order_relaxed) &&
work_items_.pop(thrd, steal))
if (0 != work_items_count && work_items_.pop(thrd, steal))
{
--work_items_count_;
return true;
Expand Down Expand Up @@ -983,7 +1000,7 @@ namespace hpx { namespace threads { namespace policies
{
// don't try to steal if there are only a few tasks left on
// this queue
if (running && min_tasks_to_steal >=
if (running && min_tasks_to_steal_staged >
addfrom->new_tasks_count_.load(boost::memory_order_relaxed))
{
return false;
Expand Down
4 changes: 2 additions & 2 deletions hpx/runtime/threads/policies/throttle_queue_scheduler.hpp
Expand Up @@ -122,7 +122,7 @@ namespace hpx { namespace threads { namespace policies

/// Return the next thread to be executed, return false if none is
/// available
virtual bool get_next_thread(std::size_t num_thread,
virtual bool get_next_thread(std::size_t num_thread, bool running,
std::int64_t& idle_loop_count, threads::thread_data*& thrd)
{
bool ret = throttle(num_thread, apex_current_threads_ <
Expand All @@ -131,7 +131,7 @@ namespace hpx { namespace threads { namespace policies

// grab work if available
return this->base_type::get_next_thread(
num_thread, idle_loop_count, thrd);
num_thread, running, idle_loop_count, thrd);
}

protected:
Expand Down
3 changes: 2 additions & 1 deletion src/util/runtime_configuration.cpp
Expand Up @@ -219,7 +219,8 @@ namespace hpx { namespace util
BOOST_PP_STRINGIZE(HPX_NUM_TIMER_POOL_SIZE) "}",

"[hpx.thread_queue]",
"min_tasks_to_steal = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL:10}",
"min_tasks_to_steal_pending = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_PENDING:0}",
"min_tasks_to_steal_staged = ${HPX_THREAD_QUEUE_MIN_TASKS_TO_STEAL_STAGED:10}",
"min_add_new_count = ${HPX_THREAD_QUEUE_MIN_ADD_NEW_COUNT:10}",
"max_add_new_count = ${HPX_THREAD_QUEUE_MAX_ADD_NEW_COUNT:10}",
"max_delete_count = ${HPX_THREAD_QUEUE_MAX_DELETE_COUNT:1000}",
Expand Down

0 comments on commit 47145dd

Please sign in to comment.