Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend thread pool #3080

Merged
merged 27 commits into from Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c8e2d2
Add suspend function to scheduled thread pool
msimberg Dec 1, 2017
9fef6b4
Fix checks for suspending pool
msimberg Dec 5, 2017
9bde104
Add suspend function to thread_pool_base
msimberg Dec 6, 2017
8946ed6
Fix missing else when suspending pool
msimberg Dec 15, 2017
1009777
Fix waiting for suspension with multiple pools
msimberg Dec 15, 2017
d808cda
Add test for suspending pool
msimberg Dec 15, 2017
a884c5a
Make asserts after scheduling loop exits more strict
msimberg Dec 15, 2017
966a96a
Add test for suspending own pool
msimberg Dec 15, 2017
3f67f0c
Make thread_pool_base suspend/resume empty by default
msimberg Dec 15, 2017
58b7366
Stop background thread only when no more work is left
msimberg Dec 15, 2017
d1e0fcd
Move locks for remove/add/suspend/resume_processing_unit
msimberg Dec 15, 2017
dcbb24f
Allow suspending a pu which is already suspended
msimberg Dec 15, 2017
85d8041
Add test for suspending a pool with some threads already suspended
msimberg Dec 15, 2017
dc3d5ea
Yield OS thread when waiting for suspension outside runtime
msimberg Dec 18, 2017
9c90a2f
Catch hpx::exception and explicitly test that it was thrown in suspen…
msimberg Dec 18, 2017
62eee0a
Require suspend for thread pools, assert in io_service_thread_pool im…
msimberg Dec 18, 2017
3ff81cb
Add missing include
msimberg Dec 19, 2017
f7bab55
Add future-returning and callback version of thread pool suspend func…
msimberg Jan 5, 2018
cd4e4d8
Add test for suspending pool from outside runtime
msimberg Jan 5, 2018
305c769
Use yield_k for waiting in scheduled_thread_pool
msimberg Jan 5, 2018
027d260
Merge branch 'master' into suspend-pool
msimberg Jan 8, 2018
cb71087
Futurize all pool/thread suspension functions
msimberg Jan 9, 2018
e1704ff
Use futurized suspension functions in resource tests
msimberg Jan 10, 2018
25a1ee5
Change scheduling exit criteria for suspension
msimberg Jan 10, 2018
4687e72
Factor out loops using yield_k
msimberg Jan 15, 2018
ec97d9e
Remove unnecessary yield loop in scheduled_thread_pool_impl
msimberg Jan 15, 2018
0273adc
Shorten long comment line in libfabric receiver.cpp
msimberg Jan 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions hpx/runtime/threads/detail/io_service_thread_pool.hpp
Expand Up @@ -65,6 +65,13 @@ namespace hpx { namespace threads { namespace detail
///////////////////////////////////////////////////////////////////////
void stop (std::unique_lock<compat::mutex>& l, bool blocking = true);

///////////////////////////////////////////////////////////////////////
hpx::future<void> suspend(error_code& ec = throws);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API returning a future does not need to take an error_code as all possible exceptions will be reported through the future mechanism. Do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you're missing anything. This part of what I meant when I said it needs tidying up (just wanted to have the basics working first to try it out).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Things start looking very good now. Thanks for your work on this!

void suspend_cb(std::function<void(void)> callback,
error_code& ec = throws);

void resume(error_code& ec = throws);

///////////////////////////////////////////////////////////////////////
compat::thread& get_os_thread_handle(std::size_t global_thread_num);

Expand Down
11 changes: 10 additions & 1 deletion hpx/runtime/threads/detail/scheduled_thread_pool.hpp
Expand Up @@ -11,6 +11,7 @@
#include <hpx/compat/mutex.hpp>
#include <hpx/compat/thread.hpp>
#include <hpx/error_code.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
#include <hpx/runtime/threads/policies/scheduler_base.hpp>
Expand Down Expand Up @@ -131,7 +132,10 @@ namespace hpx { namespace threads { namespace detail

template <typename Lock>
void stop_locked(Lock& l, bool blocking = true);
void stop (std::unique_lock<compat::mutex>& l, bool blocking = true);
void stop(std::unique_lock<compat::mutex>& l, bool blocking = true);

hpx::future<void> suspend(error_code& ec = throws);
void suspend_cb(std::function<void(void)> callback, error_code& ec = throws);

void resume(error_code& ec = throws);

Expand Down Expand Up @@ -275,6 +279,11 @@ namespace hpx { namespace threads { namespace detail
protected:
friend struct init_tss_helper<Scheduler>;

template <typename F>
void suspend_func(F&& callback, error_code& ec);
template <typename F>
void suspend_internal(F&& callback, error_code& ec);

void remove_processing_unit_internal(
std::size_t virt_core, error_code& = hpx::throws);
void add_processing_unit_internal(std::size_t virt_core,
Expand Down
165 changes: 121 additions & 44 deletions hpx/runtime/threads/detail/scheduled_thread_pool_impl.hpp
Expand Up @@ -6,11 +6,14 @@
#if !defined(HPX_SCHEDULED_THREAD_POOL_IMPL_HPP)
#define HPX_SCHEDULED_THREAD_POOL_IMPL_HPP

#include <hpx/apply.hpp>
#include <hpx/compat/barrier.hpp>
#include <hpx/compat/mutex.hpp>
#include <hpx/compat/thread.hpp>
#include <hpx/exception.hpp>
#include <hpx/exception_info.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/lcos/local/packaged_task.hpp>
#include <hpx/runtime/resource/detail/partitioner.hpp>
#include <hpx/runtime/threads/detail/create_thread.hpp>
#include <hpx/runtime/threads/detail/create_work.hpp>
Expand All @@ -27,6 +30,7 @@
#include <hpx/state.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/detail/yield_k.hpp>
#include <hpx/util/unlock_guard.hpp>

#include <boost/system/system_error.hpp>
Expand Down Expand Up @@ -316,13 +320,81 @@ namespace hpx { namespace threads { namespace detail
}

template <typename Scheduler>
void scheduled_thread_pool<Scheduler>::resume(error_code& ec)
template <typename F>
void scheduled_thread_pool<Scheduler>::suspend_func(F&& callback, error_code& ec)
{
if (!(mode_ & threads::policies::enable_elasticity))
for (std::size_t k = 0;
sched_->Scheduler::get_thread_count() >
get_background_thread_count();
++k)
{
util::detail::yield_k(k, "scheduled_thread_pool::suspend_func");
}

for (std::size_t i = 0; i != threads_.size(); ++i)
{
suspend_processing_unit_internal(i, ec);
}

callback();
}

template <typename Scheduler>
template <typename F>
void scheduled_thread_pool<Scheduler>::suspend_internal(F&& callback, error_code& ec)
{
if (threads::get_self_ptr() && hpx::this_thread::get_pool() == this)
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"cannot suspend a pool from itself");
return;
}

auto suspend_func_wrapper =
[this, callback{std::move(callback)}, ec]() mutable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is C++14 only. We try to stay compatible with C++11. Please use the (new) HPX_CAPTURE_FORWARD macro instead of the direct initialization of the callback capture variable (see https://github.com/STEllAR-GROUP/hpx/blob/master/hpx/config/lambda_capture.hpp).

Copy link
Contributor Author

@msimberg msimberg Jan 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I'm aware. Again needs tidying up. Sorry, should've been more explicit about what's still to do. Thanks for pointing to the macro, I wasn't quite sure how to do this in a C++11 way.

Copy link
Member

@hkaiser hkaiser Jan 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In C++11 we just copy into the capture :/ The only alternative is to use bind/deferred_call instead of the lambda.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had problems with the packaged_task as it's non-copyable (I need the move). But I think I can do without the packaged_task and just do an hpx::async for that case (IMO neither more complicated nor simpler). Otherwise it's like you say, bind/deferred_call.

{
this->suspend_func(callback, ec);
};

if (threads::get_self_ptr())
{
hpx::apply(std::move(suspend_func_wrapper));
}
else
{
compat::thread(std::move(suspend_func_wrapper)).detach();
}
}

template <typename Scheduler>
future<void> scheduled_thread_pool<Scheduler>::suspend(error_code& ec)
{
if (!threads::get_self_ptr())
{
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend",
"cannot call suspend from outside HPX, use suspend_cb instead");
return make_ready_future();
}

lcos::local::packaged_task<void(void)> pt([](){});
hpx::future fut = pt.get_future();

suspend_internal(std::move(pt), ec);

return fut;
}

template <typename Scheduler>
void scheduled_thread_pool<Scheduler>::suspend_cb(std::function<void(void)> callback, error_code& ec)
{
suspend_internal(callback, ec);
}

template <typename Scheduler>
void scheduled_thread_pool<Scheduler>::resume(error_code& ec)
{
for (std::size_t i = 0; i != threads_.size(); ++i)
{
resume_processing_unit_internal(i, ec);
Expand Down Expand Up @@ -439,8 +511,9 @@ namespace hpx { namespace threads { namespace detail
// the OS thread is allowed to exit only if no more HPX
// threads exist or if some other thread has terminated
HPX_ASSERT(
!sched_->Scheduler::get_thread_count(
suspended, thread_priority_default, thread_num) ||
(sched_->Scheduler::get_thread_count(
suspended, thread_priority_default, thread_num) == 0 &&
sched_->Scheduler::get_queue_length(thread_num) == 0) ||
sched_->Scheduler::get_state(thread_num) > state_stopping);
}
catch (hpx::exception const& e)
Expand Down Expand Up @@ -1366,21 +1439,22 @@ namespace hpx { namespace threads { namespace detail
std::size_t virt_core, std::size_t thread_num,
std::shared_ptr<compat::barrier> startup, error_code& ec)
{
std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

if (threads_.size() <= virt_core)
threads_.resize(virt_core + 1);

if (threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::add_processing_unit",
"the given virtual core has already been added to this "
"thread pool");
return;
}

std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

resource::get_partitioner().assign_pu(id_.name(), virt_core);

std::atomic<hpx::state>& state =
Expand Down Expand Up @@ -1435,20 +1509,19 @@ namespace hpx { namespace threads { namespace detail
void scheduled_thread_pool<Scheduler>::remove_processing_unit_internal(
std::size_t virt_core, error_code& ec)
{
std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

if (threads_.size() <= virt_core || !threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::remove_processing_unit",
"the given virtual core has already been stopped to run on "
"this thread pool");
return;
}

std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

compat::thread t;

std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);

Expand All @@ -1460,13 +1533,19 @@ namespace hpx { namespace threads { namespace detail
oldstate == state_stopped);

resource::get_partitioner().unassign_pu(id_.name(), virt_core);
compat::thread t;
std::swap(threads_[virt_core], t);

if (threads::get_self_ptr())
{
while (virt_core == hpx::get_worker_thread_num())
std::size_t thread_num = thread_offset_ + virt_core;

for (std::size_t k = 0;
thread_num == hpx::get_worker_thread_num();
++k)
{
hpx::this_thread::suspend();
util::detail::yield_k(k,
"scheduled_thread_pool::remove_processing_unit_internal");
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop looks like to be a good candidate for being extracted into a utility function (similar construct are used during shutdown as well, iirc).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. The loops at shutdown are at least similar, I'll see what common functionality I can get out of there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pulled out a yield_while function into yield_k.hpp, which takes a predicate for terminating the loop and some optional arguments (same as in yield_k). It doesn't really make the code shorter but I think the intention is clearer, and k is now hidden which I think is good.


Expand All @@ -1477,41 +1556,51 @@ namespace hpx { namespace threads { namespace detail
void scheduled_thread_pool<Scheduler>::suspend_processing_unit_internal(
std::size_t virt_core, error_code& ec)
{
std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

if (threads_.size() <= virt_core || !threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend_processing_unit",
"the given virtual core has already been stopped to run on "
"this thread pool");
return;
}

std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

// inform the scheduler to suspend the virtual core
std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);

// check if already suspended
hpx::state current_state = state.load();
if (current_state == state_pre_sleep || current_state == state_sleeping)
{
return;
}

hpx::state oldstate = state.exchange(state_pre_sleep);

HPX_ASSERT(oldstate == state_running);

if (threads::get_self_ptr())
{
while (virt_core == hpx::get_worker_thread_num())
{
hpx::this_thread::suspend();
}
std::size_t thread_num = thread_offset_ + virt_core;

while (state.load() == state_pre_sleep)
for (std::size_t k = 0;
thread_num == hpx::get_worker_thread_num();
++k)
{
hpx::this_thread::suspend();
util::detail::yield_k(k,
"scheduled_thread_pool::suspend_processing_unit_internal");
}
}
else

for (std::size_t k = 0; state.load() == state_pre_sleep; ++k)
{
while (state.load() == state_pre_sleep) {}
util::detail::yield_k(k,
"scheduled_thread_pool::suspend_processing_unit_internal");
}
}

Expand All @@ -1535,39 +1624,27 @@ namespace hpx { namespace threads { namespace detail
void scheduled_thread_pool<Scheduler>::resume_processing_unit_internal(
std::size_t virt_core, error_code& ec)
{
std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

if (threads_.size() <= virt_core || !threads_[virt_core].joinable())
{
l.unlock();
HPX_THROWS_IF(ec, bad_parameter,
"scheduled_thread_pool<Scheduler>::suspend_processing_unit",
"the given virtual core has already been stopped to run on "
"this thread pool");
return;
}

std::unique_lock<compat::mutex>
l(sched_->Scheduler::get_pu_mutex(virt_core));

std::atomic<hpx::state>& state =
sched_->Scheduler::get_state(virt_core);

if (threads::get_self_ptr())
for (std::size_t k = 0; state.load() == state_sleeping; ++k)
{
sched_->Scheduler::resume(virt_core);

while (state.load() == state_sleeping)
{
sched_->Scheduler::resume(virt_core);
hpx::this_thread::suspend();
}
}
else
{
sched_->Scheduler::resume(virt_core);

while (state.load() == state_sleeping)
{
sched_->Scheduler::resume(virt_core);
}
util::detail::yield_k(k,
"scheduled_thread_pool::resume_processing_unit_internal");
}
}

Expand Down
8 changes: 0 additions & 8 deletions hpx/runtime/threads/detail/scheduling_loop.hpp
Expand Up @@ -643,9 +643,6 @@ namespace hpx { namespace threads { namespace detail
// this might happen, if some thread has been added to the
// scheduler queue already but the state has not been reset
// yet
//
// REVIEW: Passing a specific target thread may set off
// the round robin queuing.
scheduler.SchedulingPolicy::schedule_thread(thrd, num_thread);
}

Expand Down Expand Up @@ -717,11 +714,6 @@ namespace hpx { namespace threads { namespace detail
}
}

// let our background threads terminate
if (background_running)
{
*background_running = running;
}
// do background work in parcel layer and in agas
if (!call_background_thread(background_thread, next_thrd, scheduler,
num_thread, running))
Expand Down
6 changes: 6 additions & 0 deletions hpx/runtime/threads/detail/thread_pool_base.hpp
Expand Up @@ -12,6 +12,7 @@
#include <hpx/compat/thread.hpp>
#include <hpx/error_code.hpp>
#include <hpx/exception_fwd.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/lcos/local/no_mutex.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/runtime/thread_pool_helpers.hpp>
Expand All @@ -29,6 +30,7 @@
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <iosfwd>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -82,6 +84,10 @@ namespace hpx { namespace threads { namespace detail

virtual void resume(error_code& ec = throws) = 0;

virtual hpx::future<void> suspend(error_code& ec = throws) = 0;
virtual void suspend_cb(
std::function<void(void)> callback, error_code& ec = throws) = 0;

public:
std::size_t get_worker_thread_num() const;
virtual std::size_t get_os_thread_count() const = 0;
Expand Down