Skip to content

Commit

Permalink
Partially reverting #2891
Browse files Browse the repository at this point in the history
 - Partially reverting changes to wait_or_add_new and the termination detection
 - Commenting out throttle test, to be fixed within #2955
  • Loading branch information
Thomas Heller committed Oct 17, 2017
1 parent 610f9d4 commit 13c3f67
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 180 deletions.
3 changes: 1 addition & 2 deletions examples/resource_partitioner/shared_priority_scheduler.hpp
Expand Up @@ -565,8 +565,7 @@ namespace threads {
}

///////////////////////////////////////////////////////////////////////
bool cleanup_terminated(std::size_t num_threads = std::size_t(-1),
bool delete_all = false)
bool cleanup_terminated(bool delete_all = false)
{
bool empty = true;
for (std::size_t i = 0; i != queues_.size(); ++i)
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/detail/scheduled_thread_pool.hpp
Expand Up @@ -93,7 +93,7 @@ namespace hpx { namespace threads { namespace detail

bool cleanup_terminated(bool delete_all)
{
return sched_->Scheduler::cleanup_terminated(std::size_t(-1), delete_all);
return sched_->Scheduler::cleanup_terminated(delete_all);
}

std::int64_t get_thread_count(thread_state_enum state,
Expand Down
20 changes: 13 additions & 7 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -171,6 +171,9 @@ namespace hpx { namespace threads { namespace detail

if (!threads_.empty())
{
// set state to stopping
sched_->Scheduler::set_all_states(state_stopping);

// make sure we're not waiting
sched_->Scheduler::do_some_work(std::size_t(-1));

Expand Down Expand Up @@ -1392,6 +1395,16 @@ namespace hpx { namespace threads { namespace detail
{
compat::thread t;

// inform the scheduler to stop the virtual core
std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);

hpx::state oldstate = state.exchange(state_stopping);

HPX_ASSERT(oldstate == state_starting ||
oldstate == state_running || oldstate == state_suspended ||
oldstate == state_stopping || oldstate == state_stopped);

{
std::lock_guard<pu_mutex_type> l(used_processing_units_mtx_);
if (threads_.size() <= virt_core || !threads_[virt_core].joinable())
Expand All @@ -1413,13 +1426,6 @@ namespace hpx { namespace threads { namespace detail
std::swap(threads_[virt_core], t);
}

// inform the scheduler to stop the virtual core
std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);

hpx::state oldstate = state.exchange(state_stopping);
HPX_ASSERT(oldstate == state_running);

if (hpx::get_runtime_ptr())
{
while (virt_core == hpx::get_worker_thread_num())
Expand Down
39 changes: 7 additions & 32 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -667,25 +667,13 @@ namespace hpx { namespace threads { namespace detail
num_thread, running, idle_loop_count))
{
// clean up terminated threads one more time before existing
if (scheduler.SchedulingPolicy::cleanup_terminated(num_thread, true))
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
// if this is an inner scheduler, exit immediately
if (!(scheduler.get_scheduler_mode() & policies::delay_exit))
{
if (background_thread.get() != nullptr)
{
HPX_ASSERT(background_running);
*background_running = false;
scheduler.SchedulingPolicy::schedule_thread(
background_thread.get(), num_thread);
background_thread.reset();
background_running.reset();
}
else
{
this_state.store(state_stopped);
break;
}
this_state.store(state_stopped);
break;
}

// otherwise, keep idling for some time
Expand Down Expand Up @@ -761,29 +749,16 @@ namespace hpx { namespace threads { namespace detail
// break if we were idling after 'may_exit'
if (may_exit)
{
if (background_thread)
if (scheduler.SchedulingPolicy::cleanup_terminated(true))
{
HPX_ASSERT(background_running);
*background_running = false;
scheduler.SchedulingPolicy::schedule_thread(
background_thread.get(), num_thread);
background_thread.reset();
background_running.reset();
}
else
{
if (scheduler.SchedulingPolicy::cleanup_terminated(
num_thread, true))
{
this_state.store(state_stopped);
break;
}
this_state.store(state_stopped);
break;
}
may_exit = false;
}
else
{
scheduler.SchedulingPolicy::cleanup_terminated(std::size_t(-1), true);
scheduler.SchedulingPolicy::cleanup_terminated(true);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/hierarchy_scheduler.hpp
Expand Up @@ -572,7 +572,7 @@ namespace hpx { namespace threads { namespace policies
}

///////////////////////////////////////////////////////////////////////
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
bool cleanup_terminated(bool delete_all = false)
{
HPX_ASSERT(tree.size());
bool empty = true;
Expand Down
33 changes: 9 additions & 24 deletions hpx/runtime/threads/policies/local_priority_queue_scheduler.hpp
Expand Up @@ -426,34 +426,19 @@ namespace hpx { namespace threads { namespace policies
}

///////////////////////////////////////////////////////////////////////
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
bool cleanup_terminated(bool delete_all = false)
{
bool empty = true;
if (num_thread == std::size_t(-1))
{
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
if (!delete_all)
return empty;

for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
empty = high_priority_queues_[i]->
cleanup_terminated(delete_all) && empty;

empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
}
else
{
empty = queues_[num_thread]->cleanup_terminated(delete_all);
if (delete_all)
return true;
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
if (!delete_all)
return empty;

if (num_thread < high_priority_queues_.size())
empty = high_priority_queues_[num_thread]->
cleanup_terminated(delete_all) && empty;
for (std::size_t i = 0; i != high_priority_queues_.size(); ++i)
empty = high_priority_queues_[i]->
cleanup_terminated(delete_all) && empty;

empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
}
empty = low_priority_queue_.cleanup_terminated(delete_all) && empty;
return empty;
}

Expand Down
14 changes: 3 additions & 11 deletions hpx/runtime/threads/policies/local_queue_scheduler.hpp
Expand Up @@ -278,19 +278,11 @@ namespace hpx { namespace threads { namespace policies
}

///////////////////////////////////////////////////////////////////////
bool cleanup_terminated(std::size_t num_thread, bool delete_all)
bool cleanup_terminated(bool delete_all = false)
{
bool empty = true;
if (num_thread == std::size_t(-1))
{
bool empty = true;
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
}
else
{
empty = queues_[num_thread]->cleanup_terminated(delete_all);
}
for (std::size_t i = 0; i != queues_.size(); ++i)
empty = queues_[i]->cleanup_terminated(delete_all) && empty;
return empty;
}

Expand Down
2 changes: 1 addition & 1 deletion hpx/runtime/threads/policies/scheduler_base.hpp
Expand Up @@ -311,7 +311,7 @@ namespace hpx { namespace threads { namespace policies

virtual void abort_all_suspended_threads() = 0;

virtual bool cleanup_terminated(std::size_t num_thread, bool delete_all) = 0;
virtual bool cleanup_terminated(bool delete_all = false) = 0;

virtual void create_thread(thread_init_data& data, thread_id_type* id,
thread_state_enum initial_state, bool run_now, error_code& ec,
Expand Down
5 changes: 0 additions & 5 deletions hpx/runtime/threads/policies/thread_queue.hpp
Expand Up @@ -1063,11 +1063,6 @@ namespace hpx { namespace threads { namespace policies

cleanup_terminated_locked();
}
bool canexit = cleanup_terminated_locked(true);
if (!running && canexit) {
// we don't have any registered work items anymore
return true; // terminate scheduling loop
}
return false;
}

Expand Down

0 comments on commit 13c3f67

Please sign in to comment.