Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend thread pool #3080

Merged
merged 27 commits into from Jan 17, 2018
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c8e2d2
Add suspend function to scheduled thread pool
msimberg Dec 1, 2017
9fef6b4
Fix checks for suspending pool
msimberg Dec 5, 2017
9bde104
Add suspend function to thread_pool_base
msimberg Dec 6, 2017
8946ed6
Fix missing else when suspending pool
msimberg Dec 15, 2017
1009777
Fix waiting for suspension with multiple pools
msimberg Dec 15, 2017
d808cda
Add test for suspending pool
msimberg Dec 15, 2017
a884c5a
Make asserts after scheduling loop exits more strict
msimberg Dec 15, 2017
966a96a
Add test for suspending own pool
msimberg Dec 15, 2017
3f67f0c
Make thread_pool_base suspend/resume empty by default
msimberg Dec 15, 2017
58b7366
Stop background thread only when no more work is left
msimberg Dec 15, 2017
d1e0fcd
Move locks for remove/add/suspend/resume_processing_unit
msimberg Dec 15, 2017
dcbb24f
Allow suspending a pu which is already suspended
msimberg Dec 15, 2017
85d8041
Add test for suspending a pool with some threads already suspended
msimberg Dec 15, 2017
dc3d5ea
Yield OS thread when waiting for suspension outside runtime
msimberg Dec 18, 2017
9c90a2f
Catch hpx::exception and explicitly test that it was thrown in suspen…
msimberg Dec 18, 2017
62eee0a
Require suspend for thread pools, assert in io_service_thread_pool im…
msimberg Dec 18, 2017
3ff81cb
Add missing include
msimberg Dec 19, 2017
f7bab55
Add future-returning and callback version of thread pool suspend func…
msimberg Jan 5, 2018
cd4e4d8
Add test for suspending pool from outside runtime
msimberg Jan 5, 2018
305c769
Use yield_k for waiting in scheduled_thread_pool
msimberg Jan 5, 2018
027d260
Merge branch 'master' into suspend-pool
msimberg Jan 8, 2018
cb71087
Futurize all pool/thread suspension functions
msimberg Jan 9, 2018
e1704ff
Use futurized suspension functions in resource tests
msimberg Jan 10, 2018
25a1ee5
Change scheduling exit criteria for suspension
msimberg Jan 10, 2018
4687e72
Factor out loops using yield_k
msimberg Jan 15, 2018
ec97d9e
Remove unnecessary yield loop in scheduled_thread_pool_impl
msimberg Jan 15, 2018
0273adc
Shorten long comment line in libfabric receiver.cpp
msimberg Jan 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions hpx/runtime/threads/detail/io_service_thread_pool.hpp
Expand Up @@ -9,6 +9,7 @@
#include <hpx/config.hpp>
#include <hpx/compat/barrier.hpp>
#include <hpx/compat/mutex.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
#include <hpx/runtime/threads/policies/scheduler_mode.hpp>
Expand Down Expand Up @@ -65,6 +66,26 @@ namespace hpx { namespace threads { namespace detail
///////////////////////////////////////////////////////////////////////
void stop (std::unique_lock<compat::mutex>& l, bool blocking = true);

///////////////////////////////////////////////////////////////////////
hpx::future<void> resume();
void resume_cb(std::function<void(void)> callback,
error_code& ec = throws);

hpx::future<void> suspend();
void suspend_cb(std::function<void(void)> callback,
error_code& ec = throws);

///////////////////////////////////////////////////////////////////////
hpx::future<void> suspend_processing_unit(std::size_t virt_core);
void suspend_processing_unit_cb(
std::function<void(void)> callback, std::size_t virt_core,
error_code& ec = throws);

hpx::future<void> resume_processing_unit(std::size_t virt_core);
void resume_processing_unit_cb(
std::function<void(void)> callback, std::size_t virt_core,
error_code& ec = throws);

///////////////////////////////////////////////////////////////////////
compat::thread& get_os_thread_handle(std::size_t global_thread_num);

Expand Down
20 changes: 16 additions & 4 deletions hpx/runtime/threads/detail/scheduled_thread_pool.hpp
Expand Up @@ -11,6 +11,7 @@
#include <hpx/compat/mutex.hpp>
#include <hpx/compat/thread.hpp>
#include <hpx/error_code.hpp>
#include <hpx/lcos/future.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/runtime/threads/policies/callback_notifier.hpp>
#include <hpx/runtime/threads/policies/scheduler_base.hpp>
Expand Down Expand Up @@ -131,9 +132,13 @@ namespace hpx { namespace threads { namespace detail

template <typename Lock>
void stop_locked(Lock& l, bool blocking = true);
void stop (std::unique_lock<compat::mutex>& l, bool blocking = true);
void stop(std::unique_lock<compat::mutex>& l, bool blocking = true);

void resume(error_code& ec = throws);
hpx::future<void> suspend();
void suspend_cb(std::function<void(void)> callback, error_code& ec = throws);

hpx::future<void> resume();
void resume_cb(std::function<void(void)> callback, error_code& ec = throws);

///////////////////////////////////////////////////////////////////
compat::thread& get_os_thread_handle(std::size_t global_thread_num)
Expand Down Expand Up @@ -267,14 +272,21 @@ namespace hpx { namespace threads { namespace detail
std::size_t virt_core, error_code& = hpx::throws);

// Suspend the given processing unit on the scheduler.
void suspend_processing_unit(std::size_t virt_core, error_code&);
hpx::future<void> suspend_processing_unit(std::size_t virt_core);
void suspend_processing_unit_cb(std::function<void(void)> callback,
std::size_t virt_core, error_code& = hpx::throws);

// Resume the given processing unit on the scheduler.
void resume_processing_unit(std::size_t virt_core, error_code&);
hpx::future<void> resume_processing_unit(std::size_t virt_core);
void resume_processing_unit_cb(std::function<void(void)> callback,
std::size_t virt_core, error_code& = hpx::throws);

protected:
friend struct init_tss_helper<Scheduler>;

void resume_internal(error_code& ec);
void suspend_internal(error_code& ec);

void remove_processing_unit_internal(
std::size_t virt_core, error_code& = hpx::throws);
void add_processing_unit_internal(std::size_t virt_core,
Expand Down