Skip to content

Commit

Permalink
Remove wait_or_add_new and run_now from shared priority queue scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
biddisco committed Feb 1, 2019
1 parent a347532 commit 4789991
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 148 deletions.
78 changes: 4 additions & 74 deletions examples/resource_partitioner/shared_priority_queue_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ namespace example {
// create a new thread and schedule it if the initial state
// is equal to pending
void create_thread(thread_init_data& data, thread_id_type* thrd,
thread_state_enum initial_state, bool run_now, error_code& ec) override
thread_state_enum initial_state, error_code& ec) override
{
// safety check that task was created by this thread/scheduler
HPX_ASSERT(data.scheduler_base == this);
Expand Down Expand Up @@ -790,7 +790,7 @@ namespace example {

hp_queues_[domain_num].queues_[hp_lookup_[
q_index % hp_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);

LOG_CUSTOM_MSG("create_thread thread_priority_high "
<< "queue " << decnumber(q_index)
Expand All @@ -804,7 +804,7 @@ namespace example {
{
lp_queues_[domain_num].queues_[lp_lookup_[
q_index % lp_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);

LOG_CUSTOM_MSG("create_thread thread_priority_low "
<< "queue " << decnumber(q_index)
Expand All @@ -817,7 +817,7 @@ namespace example {
// normal priority
np_queues_[domain_num].queues_[np_lookup_[
q_index % np_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);

LOG_CUSTOM_MSG2("create_thread thread_priority_normal "
<< "queue " << decnumber(q_index)
Expand Down Expand Up @@ -1274,76 +1274,6 @@ namespace example {
return result;
}

/// This is a function which gets called periodically by the thread
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t thread_num,
bool running, std::int64_t& idle_loop_count) override
{
std::size_t added = 0;
bool result = true;

if (thread_num == std::size_t(-1)) {
HPX_THROW_EXCEPTION(bad_parameter,
"shared_priority_queue_scheduler_example::wait_or_add_new",
"Invalid thread number: " + std::to_string(thread_num));
}

// find the numa domain from the local thread index
std::size_t domain_num = d_lookup_[thread_num];

// is there a high priority task, take first from our numa domain
// and then try to steal from others
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = hp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}

// try a normal priority task
if (!result) {
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = np_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
}

// low priority task
if (!result) {
#ifdef JB_LP_STEALING
for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
std::size_t dom = d % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = (dom==domain_num) ?
q_lookup_[thread_num] :
lp_lookup_[(counters_[dom]++ %
lp_queues_[dom].num_cores)];

result = lp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
#else
// no cross domain stealing for LP queues
result = lp_queues_[domain_num].wait_or_add_new(0, running,
idle_loop_count, added);
if (0 != added) return result;
#endif
}

return result;
}

///////////////////////////////////////////////////////////////////////
void on_start_thread(std::size_t thread_num) override
{
Expand Down
78 changes: 4 additions & 74 deletions hpx/runtime/threads/policies/shared_priority_queue_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ namespace policies {
// create a new thread and schedule it if the initial state
// is equal to pending
void create_thread(thread_init_data& data, thread_id_type* thrd,
thread_state_enum initial_state, bool run_now, error_code& ec) override
thread_state_enum initial_state, error_code& ec) override
{
// safety check that task was created by this thread/scheduler
HPX_ASSERT(data.scheduler_base == this);
Expand Down Expand Up @@ -649,7 +649,7 @@ namespace policies {

hp_queues_[domain_num].queues_[hp_lookup_[
q_index % hp_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);

return;
}
Expand All @@ -658,15 +658,15 @@ namespace policies {
{
lp_queues_[domain_num].queues_[lp_lookup_[
q_index % lp_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);

return;
}

// normal priority
np_queues_[domain_num].queues_[np_lookup_[
q_index % np_queues_[domain_num].num_cores]]->
create_thread(data, thrd, initial_state, run_now, ec);
create_thread(data, thrd, initial_state, ec);
}

/// Return the next thread to be executed, return false if none available
Expand Down Expand Up @@ -1062,76 +1062,6 @@ namespace policies {
return result;
}

/// This is a function which gets called periodically by the thread
/// manager to allow for maintenance tasks to be executed in the
/// scheduler. Returns true if the OS thread calling this function
/// has to be terminated (i.e. no more work has to be done).
virtual bool wait_or_add_new(std::size_t thread_num,
bool running, std::int64_t& idle_loop_count) override
{
std::size_t added = 0;
bool result = true;

if (thread_num == std::size_t(-1)) {
HPX_THROW_EXCEPTION(bad_parameter,
"shared_priority_queue_scheduler::wait_or_add_new",
"Invalid thread number: " + std::to_string(thread_num));
}

// find the numa domain from the local thread index
std::size_t domain_num = d_lookup_[thread_num];

// is there a high priority task, take first from our numa domain
// and then try to steal from others
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = hp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}

// try a normal priority task
if (!result) {
for (std::size_t d=0; d<num_domains_; ++d) {
std::size_t dom = (domain_num+d) % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = q_lookup_[thread_num];
// get next task, steal if from another domain
result = np_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
}

// low priority task
if (!result) {
#ifdef JB_LP_STEALING
for (std::size_t d=domain_num; d<domain_num+num_domains_; ++d) {
std::size_t dom = d % num_domains_;
// set the preferred queue for this domain, if applicable
std::size_t q_index = (dom==domain_num) ?
q_lookup_[thread_num] :
lp_lookup_[(counters_[dom]++ %
lp_queues_[dom].num_cores)];

result = lp_queues_[dom].wait_or_add_new(q_index, running,
idle_loop_count, added);
if (0 != added) return result;
}
#else
// no cross domain stealing for LP queues
result = lp_queues_[domain_num].wait_or_add_new(0, running,
idle_loop_count, added);
if (0 != added) return result;
#endif
}

return result;
}

///////////////////////////////////////////////////////////////////////
void on_start_thread(std::size_t thread_num) override
{
Expand Down

0 comments on commit 4789991

Please sign in to comment.