New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Suspend thread pool #3080
Suspend thread pool #3080
Conversation
Cover the check for if the OS thread is joinable or not.
{ | ||
while (sched_->Scheduler::get_thread_count() > | ||
get_background_thread_count()) | ||
{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this at least call sleep(0)
instead of unconditionally spinning? Alternatively, we could re-spawn the resume function on an HPX thread instead of spinning, not sure...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep(0)
(or yield
) is a good idea. HPX thread I'm not sure about. It would require returning a future (if we want to know when it's done), and so far I've assumed this can be called from outside the runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here for how this could be handled: https://github.com/STEllAR-GROUP/hpx/blob/master/hpx/util/detail/yield_k.hpp. In fact, you could use yield_k()
instead of having your own loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use yield_k
- made most waits nice and short. Thanks for pointing that out.
@@ -80,7 +80,8 @@ namespace hpx { namespace threads { namespace detail | |||
virtual void stop( | |||
std::unique_lock<compat::mutex>& l, bool blocking = true) = 0; | |||
|
|||
virtual void resume(error_code& ec = throws) = 0; | |||
virtual void suspend(error_code& ec = throws) {}; | |||
virtual void resume(error_code& ec = throws) {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the default implementations at least assert, if not throw an exception instead of silently ignoring the request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right. I went back and forth on this, but I think now it should be required (I will need to implement suspend/resume
for the io_service_pool
to suspend the runtime). If it should be ignored it can be empty in the inheriting class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix. Looks fine now.
tests/unit/resource/suspend_pool.cpp
Outdated
} | ||
catch (std::runtime_error const&) | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the std::runtime_error
thrown? If this is thrown by HPX it should be changed to using an hpx::exception
instead.
Either way, it looks like the exception is expected in this test case, if so, please add a bool checking that it was indeed thrown:
bool exception_thrown = false;
try { ... } catch(hpx::exception const& e) { exception_thrown = true; }
HPX_TEST(exception_thrown);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes, it is thrown by HPX. I did not realize it will be an hpx::exception
then... Will update it the way you've suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM.
tests/unit/resource/suspend_pool.cpp
Outdated
// Can only suspend once all work is done | ||
auto f = hpx::when_all(std::move(fs)); | ||
HPX_TEST(f.is_ready()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way for the suspend()
API to return a future<void>
allowing to synchronize with the fact that the pool actually has been suspended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it could. However, I've again assumed that this can be called from outside the runtime in which case waiting for the future will not work. I don't have a strong case for calling it from outside the runtime but thought it's unnecessary to restrict it unless needed. This may be a reason to restrict it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively you could add a callback that is to be called once the operation is finished executing. That also would allow for splitting the API into two functions, one taking the callback (to be used when called from outside HPX), and one (internally) passing a hpx::lcos::local::packaged_task
to the first API and returning a future extracted from the packaged_task (to be called from HPX threads). That should decouple the waiting from the actual context the suspend is invoked from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you then spawn an OS thread or HPX thread depending on from where it's called? The downside is that there will be two functions, and the user has to know (find out) that only one of them can be called from outside the runtime (asserts help though, and returning a future is already a good sign it can't be called from the outside). I'll give it a shot in any case as it's a nice idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can always spawn an HPX thread. Invoking the callback from an HPX thread should be a viable option, always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right. The problem is that the suspending thread will then run on the pool it's supposed to suspend. So either it gets stuck or it has to call the callback before it has fully suspended the pool. The latter feels wrong but could be good enough (it does first wait for all work to be done before really suspending). Maybe there's another way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two corrections to my previous comments: I have to launch an OS thread in the case of suspending from non-HPX threads, and suspending from non-HPX threads is required for suspending the whole runtime...
I implemented a futurized and a callback version of suspend
the way you suggested and the principle works well but I still need to tidy it up. The real question is: do you think it's useful that all of suspend
/resume
/suspend_processing_unit
/resume_processing_unit
have a futurized and a callback version? I think the answer is clearly yes if it's useful for suspend
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hkaiser Besides technical details, do you have an opinion on the last question? (In case this slipped past you...)
|
||
return pool; | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that the API for creating a new pool is too complicated to be exposed to mere mortal users. This needs to be streamlined somehow - but I understand that this is orthogonal to this PR and should be handled by a separate ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agreed on both points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened an issue: #3082
@@ -65,6 +65,13 @@ namespace hpx { namespace threads { namespace detail | |||
/////////////////////////////////////////////////////////////////////// | |||
void stop (std::unique_lock<compat::mutex>& l, bool blocking = true); | |||
|
|||
/////////////////////////////////////////////////////////////////////// | |||
hpx::future<void> suspend(error_code& ec = throws); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API returning a future does not need to take an error_code
as all possible exceptions will be reported through the future mechanism. Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you're missing anything. This part of what I meant when I said it needs tidying up (just wanted to have the basics working first to try it out).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. Things start looking very good now. Thanks for your work on this!
return; | ||
} | ||
|
||
auto suspend_func_wrapper = | ||
[this, callback{std::move(callback)}, ec]() mutable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is C++14 only. We try to stay compatible with C++11. Please use the (new) HPX_CAPTURE_FORWARD
macro instead of the direct initialization of the callback
capture variable (see https://github.com/STEllAR-GROUP/hpx/blob/master/hpx/config/lambda_capture.hpp).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'm aware. Again needs tidying up. Sorry, should've been more explicit about what's still to do. Thanks for pointing to the macro, I wasn't quite sure how to do this in a C++11 way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In C++11 we just copy into the capture :/ The only alternative is to use bind
/deferred_call
instead of the lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had problems with the packaged_task
as it's non-copyable (I need the move). But I think I can do without the packaged_task
and just do an hpx::async
for that case (IMO neither more complicated nor simpler). Otherwise it's like you say, bind
/deferred_call
.
{ | ||
hpx::this_thread::suspend(); | ||
util::detail::yield_k(k, | ||
"scheduled_thread_pool::remove_processing_unit_internal"); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop looks like to be a good candidate for being extracted into a utility function (similar construct are used during shutdown as well, iirc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. The loops at shutdown are at least similar, I'll see what common functionality I can get out of there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pulled out a yield_while
function into yield_k.hpp
, which takes a predicate for terminating the loop and some optional arguments (same as in yield_k
). It doesn't really make the code shorter but I think the intention is clearer, and k
is now hidden which I think is good.
Don't wait for suspended threads. Not compatible with futures and suspending hpx::get_worker_thread_num(). As future.get() suspends the thread and does not wake so that it can be stolen, waiting for suspended threads leads to lockup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, thanks! LGTM.
This adds a
suspend
method tothread_pool_base
/scheduled_thread_pool
. It waits for all work to be done on the pool before suspending.