Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize thread pool implementation: #4172

Closed
wants to merge 3 commits into from
Closed

Optimize thread pool implementation: #4172

wants to merge 3 commits into from

Conversation

nbougalis
Copy link
Contributor

@nbougalis nbougalis commented May 23, 2022

The existing thread pool code uses several layers of indirection which uses a custom lock-free stack, and offers functionality that supports features that are never used (e.g. the ability to dynamically adjust the number of threads in the pool).

This refactoring aims to simplify the code, making it easier to reason about (although lock-free multi-threaded code is always tricky) what is happening, and reduce the latency of the thread pool internals.

High Level Overview of Change

Context of Change

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Refactor (non-breaking change that only restructures code)
  • Tests (You added tests for code that already exists, or your new feature included in this PR)
  • Documentation Updates
  • Release

Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, thanks for simplifying this unnecessarily complicated part of the codebase. Several years back I spent some time fixing a corner case when the number of threads in Workers decreased. That bug, and the time spent fixing and testing it, was pointless since the feature was gratuitous. Less complexity means fewer bugs. So this pull request is a win.

However, as it stands, this pull request makes the full suite of unit tests take more than twice as long to execute. Not good. It's fixable however.

I have a number of suggestions for what I think are improvements for Workers. Most of them are naming and tidiness. But using std::thread::join() rather than detach()is critical to not losing the shutdown performance. You can see all of those accumulated Workers changes here: scottschurr@0c345a0

There are also some less important suggestions I made for JobQueue. You can see those implemented here: scottschurr@9ce5347

Feel free to cherry-pick those commits if you would like.

src/ripple/core/impl/Workers.h Outdated Show resolved Hide resolved
src/test/core/Workers_test.cpp Outdated Show resolved Hide resolved
src/ripple/beast/core/LockFreeStack.h Show resolved Hide resolved
src/ripple/core/JobQueue.h Show resolved Hide resolved
src/ripple/core/JobQueue.h Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/semaphore.h Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
@nbougalis
Copy link
Contributor Author

So having looked at your proposed changes, I'm not sure I'm sold. I dislike the need for a vector to hold the thread instances. I suspect that simply changing the std::this_thread::sleep_for(std::chrono::seconds(1)); to something like std::this_thread::yield() will be sufficient to restore the performance.

@scottschurr
Copy link
Collaborator

One person's cake is another person's poison. Indeed, another approach would be to see if a yield() instead of a sleep_for() returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that is detached is virtually invisible to a maintainer. A join() on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.

@nbougalis
Copy link
Contributor Author

One person's cake is another person's poison. Indeed, another approach would be to see if a yield() instead of a sleep_for() returns the performance. But I like the vector of threads because it leaves an artifact behind that future maintainers can easily see. A thread that is detached is virtually invisible to a maintainer. A join() on shutdown is ubiquitous in the code base, effective, and easy for a maintainer to understand.

So, yield does return the performance back to where it was. And while I personally prefer not having the std::vector to be joined (especially since C++20 brings with it the ability to wait on std::atomic) I see your point: the std::vector does expose the "artifact" for future maintainers, which is something I should be mindful of.

I'll think about this some more.

Copy link
Collaborator

@seelabs seelabs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice code cleanups. We need to fix the slow-downs, but from a code complexity POV this is a nice win.

I have a concern over changing the behavior of uncaught exceptions; I'm not totally against the change, but I don't think it's a small change either.

@@ -61,7 +61,8 @@ Job::queue_time() const
void
Job::doJob()
{
beast::setCurrentThreadName("doJob: " + mName);
beast::setCurrentThreadName(beast::getCurrentThreadName() + ": " + mName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works because we reset the thread name elsewhere when running a new task. Still, this seems fragile and we could get thread names that grow without bound. Consider resetting the thread name immediately after the job finishes running.

Comment on lines 356 to 281
JobQueue::uncaughtException(unsigned int instance, std::exception_ptr eptr)
{
try
{
if (eptr)
std::rethrow_exception(eptr);
}
catch (std::exception const& e)
{
JLOG(m_journal.fatal())
<< beast::getCurrentThreadName()
<< ": Exception caught during task processing: " << e.what();
}
catch (...)
{
JLOG(m_journal.fatal())
<< beast::getCurrentThreadName()
<< ": Unknown exception caught during task processing.";
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a large behavior change that probably should get it's own patch and not be folded into an unrelated change. There are times where we purposely crash the app on unhandled exceptions - because we don't want to continue in an insane state. We've discussed making changes like this in the past, and have always opted to keep the same behavior.

I'm not necessarily against making this change, but at minimum we need to make this change much more visible.

perf::PerfLog* perfLog,
std::string const& threadNames,
int numberOfThreads)
Workers::Workers(Callback& callback, std::string name, unsigned int count)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass name by const&

src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
threads_--;
},
count);
t.detach();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably keep a collection of threads in the object rather than detach here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scottschurr, @seelabs: I thought about this and actually wrote the version with the std::vector<std::thread> and something about it rubbed me the wrong way. I guess it's because actually holding onto the handle doesn't really serve a purpose.

Unless you're vetoing the current version (and if you feel strongly about it, please do!), I'd prefer to stick with the version as is.

@nbougalis
Copy link
Contributor Author

nbougalis commented Jun 6, 2022

@seelabs writes:

This is a large behavior change that probably should get it's own patch and not be folded into an unrelated change. There are times where we purposely crash the app on unhandled exceptions - because we don't want to continue in an insane state. We've discussed making changes like this in the past, and have always opted to keep the same behavior.

I'm not necessarily against making this change, but at minimum we need to make this change much more visible.

I hear what you're saying; is this behavior change appropriate for this PR?

Let's take a step back. My position is that relying on this "hidden" behavior (i.e. that the job queue doesn't catch exceptions and that to throw signals "ABANDON SHIP!") is anathema to good software engineering practices. We've both encountered core dumps that might be traceable to this behavior.

So at some point we're going to need to fix this. The way I see it is that:

  • Either the job handlers need to be marked as noexcept, so that the compiler knows they won't throw and can warn at compile time if some code path is unportected and can throw, and if something slips through, the runtime is required by the standard to kill the process through std::terminate; or
  • The job queue itself must replicate these semantics by catching these exceptions, warning loudly, and, terminating.

I think that terminating is reasonable because, as you said, it's hard for the job queue to properly handle exceptions. So I would suggest that we leave the new try/catch in place and simply add an explicit call to LogicError (which, honestly, is horribly named and ought to be renamed to ripple::abort(std::string const& msg) and should just call std::abort, but that's another story) in the catch handler.

@seelabs
Copy link
Collaborator

seelabs commented Jun 6, 2022

@nbougalis Re: catching the exception (I wish github would let us respond to top level comments): That's a reasonable position - my real concern was putting this behavior change in without giving it the attention it deserves (because it looks innocuous and it part of an optimization patch). And now it's getting attention, so I'm happy.

We have two behaviors that need to be accounted for:

  1. We threw an exception, but it's not a big deal, we can continue on just fine, but we didn't catch it and terminated the server. You're right, we've encountered those exceptions in the past and needed to patch.

  2. We threw an exception, and the exception means we're in an insane state and the best thing we can do is terminate the application.

Unfortunately, it's really hard to audit the code for this type be behavior. There are no good tools that I know of that let me look at where an an exception is thrown and find where it might be caught or find the call paths where it won't be caught (and if anyone knows of such a tool I'd LOVE to hear about it - I made a feature request to rtags a while back, but no love so far: Andersbakken/rtags#1383).

Without an audit we have to decide to either terminate where we shouldn't have, or continue in an insane state where we shouldn't have. Neither choice is good. The advantage of terminating when we shouldn't have is we will be notified and we can patch these issues. If we paper them over we'll never fix them. On the other hand, these issues can be a potential source of DOS attacks on a server.

Bottom line: I vote to terminate, but either choice is reasonable and I'm fine with whatever you decide.

One minor note: I don't think marking an function noexcept will get us compiler warnings if the function actually throws - at least I've never seen this. I could see if it did this one level deep, but it almost certainly won't do this more than one level. Heck, new can throw, so we'd see warnings EVERYWHERE.

@scottschurr
Copy link
Collaborator

Just FYI, the Refactor JobQueue commit message has some junk tagged onto the end. You may wish to clean that up.

Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identified a few more changes for you to consider.

FWIW, I timed unit tests with your change which uses std::detach and std::yield. I also timed unit tests with an implementation that replaces those with std::join. The unit test timings came out comparable. So, although I think the std::join is more maintainable, I'm okay with your approach using std::detach and std::yield.

src/ripple/core/JobQueue.h Show resolved Hide resolved
src/ripple/core/JobTypes.h Outdated Show resolved Hide resolved
src/ripple/core/JobTypes.h Outdated Show resolved Hide resolved
src/ripple/core/JobQueue.h Outdated Show resolved Hide resolved
src/ripple/core/JobQueue.h Show resolved Hide resolved
src/ripple/core/impl/LoadEvent.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/LoadEvent.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Show resolved Hide resolved
src/ripple/perflog/impl/PerfLogImp.h Show resolved Hide resolved
src/ripple/core/impl/LoadEvent.cpp Show resolved Hide resolved
@nbougalis nbougalis mentioned this pull request Aug 4, 2022
@scottschurr
Copy link
Collaborator

I tried out the most recent version of these changes on macOS. I see two things that need to be addressed:

  1. Clang pointed out that JobTypeData(JobTypeData&& other) = default is invalid. Since LoadMonitor is immovable, then JobTypeData must also be immovable. This line could be removed or you could use = delete instead.

  2. While running the unit tests on a debug build, the Coroutine test failed with a segmentation fault. Here is the message I got:

ripple.core.Coroutine thread specific storage
Logic error: Job: LocalValue-Test: Exception caught during task processing: mutex lock failed: Invalid argument
Segmentation fault: 11

The stack trace points at JobQueue.cpp line 265: https://github.com/XRPLF/rippled/pull/4172/files#diff-85e882e68cc55760882821eba100a325cad49b5bce66e21047bd861b3a7e6fedR262-R264

The good news is that the exception message got out. The bad news is that since we caught the exception the trace to the actual source of the exception is lost.

The Coroutine unit test crash is not consistent, but if I loop the unit tests I get a crash every 20 repetitions or so. So it's not hard to reproduce.

I removed the try/catch in Workers.cpp to see what I could find out from the stack trace. Here's that stack trace from the crashed thread:

Thread 6 Crashed:: Job: LocalValue-Test
0   libsystem_kernel.dylib        	0x00007fff6996e33a __pthread_kill + 10
1   libsystem_pthread.dylib       	0x00007fff69a2ae60 pthread_kill + 430
2   libsystem_c.dylib             	0x00007fff698f5808 abort + 120
3   libc++abi.dylib               	0x00007fff66b54458 abort_message + 231
4   libc++abi.dylib               	0x00007fff66b458a7 demangling_terminate_handler() + 238
5   libobjc.A.dylib               	0x00007fff686805b1 _objc_terminate() + 104
6   libc++abi.dylib               	0x00007fff66b53887 std::__terminate(void (*)()) + 8
7   libc++abi.dylib               	0x00007fff66b561a2 __cxxabiv1::failed_throw(__cxxabiv1::__cxa_exception*) + 27
8   libc++abi.dylib               	0x00007fff66b56169 __cxa_throw + 113
9   libc++.1.dylib                	0x00007fff66b3055b std::__1::__throw_system_error(int, char const*) + 77
10  libc++.1.dylib                	0x00007fff66b2754d std::__1::mutex::lock() + 29
11  rippled                       	0x000000010da12e83 std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 35 (__mutex_base:91)
12  rippled                       	0x000000010da1260d std::__1::lock_guard<std::__1::mutex>::lock_guard(std::__1::mutex&) + 29 (__mutex_base:91)
13  rippled                       	0x000000010da2698a ripple::test::Coroutine_test::gate::signal() + 42
14  rippled                       	0x000000010da35bf9 ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()::operator()() const + 89 (Coroutine_test.cpp:172)
15  rippled                       	0x000000010da35b8e ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>::operator()() + 30 (ClosureCounter.h:140)
16  rippled                       	0x000000010da35b4d decltype(std::__1::forward<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(fp)()) std::__1::__invoke<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (type_traits:3545)
17  rippled                       	0x000000010da35afd void std::__1::__invoke_void_return_wrapper<void>::__call<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&>(ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>&) + 29 (__functional_base:349)
18  rippled                       	0x000000010da35acd std::__1::__function::__alloc_func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 29 (functional:1546)
19  rippled                       	0x000000010da3491e std::__1::__function::__func<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()>, std::__1::allocator<ripple::ClosureCounter<void>::Substitute<ripple::test::Coroutine_test::thread_specific_storage()::'lambda0'()> >, void ()>::operator()() + 30 (functional:1720)
20  rippled                       	0x000000010b3fb1b5 std::__1::__function::__value_func<void ()>::operator()() const + 53 (functional:1873)
21  rippled                       	0x000000010b3fb155 std::__1::function<void ()>::operator()() const + 21 (functional:2548)
22  rippled                       	0x000000010bd19b7d ripple::Job::doJob() + 141 (Job.cpp:68)
23  rippled                       	0x000000010bd20593 ripple::JobQueue::processTask(unsigned int) + 723 (JobQueue.cpp:328)
24  rippled                       	0x000000010bd64341 ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0::operator()(unsigned int) const + 977 (Workers.cpp:80)
25  rippled                       	0x000000010bd63ee1 decltype(std::__1::forward<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0>(fp)(std::__1::forward<unsigned int>(fp0))) std::__1::__invoke<ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>(ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0&&, unsigned int&&) + 49 (type_traits:3545)
26  rippled                       	0x000000010bd63e2e void std::__1::__thread_execute<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int>&, std::__1::__tuple_indices<2ul>) + 62 (thread:274)
27  rippled                       	0x000000010bd63576 void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, ripple::Workers::Workers(ripple::Workers::Callback&, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, unsigned int)::$_0, unsigned int> >(void*) + 118 (thread:284)
28  libsystem_pthread.dylib       	0x00007fff69a2b109 _pthread_start + 148
29  libsystem_pthread.dylib       	0x00007fff69a26b8b thread_start + 15

@nbougalis nbougalis mentioned this pull request Aug 9, 2022
@nbougalis
Copy link
Contributor Author

The issue that @scottschurr encountered had to do with timing, but it did lead me to discover an implementation issue that could cause a worker thread to sleep despite the fact that work was waiting.

I've reworked the code to address this and now leverage more C++20 functionality, since the PR to enable this was merged into develop.

I know it's annoying to have to do this, but please re-review the first commit (that changes the Workers logic) in its entirety.

@scottschurr
Copy link
Collaborator

With the most recent code I consistently get the following failure on the JobQueue unit test:

$ build/rippled --unittest=JobQueue
ripple.core.JobQueue
Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391.
Abort trap: 6

I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).

@nbougalis
Copy link
Contributor Author

With the most recent code I consistently get the following failure on the JobQueue unit test:

$ build/rippled --unittest=JobQueue
ripple.core.JobQueue
Assertion failed: (!stopped_), function postCoro, file JobQueue.h, line 391.
Abort trap: 6

I'm running macOS 12.5.1, Apple clang version 13.1.6 (clang-1316.0.21.2.5).

Yeah, that was just sloppy of me. It's been fixed and the ugly-looking code in the unit tests has been removed.


// The number of jobs currently in processTask()
int m_processCount;
int activeThreads_ = 0;
Copy link
Collaborator

@ckeshava ckeshava Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the semantic meaning of the variable names like activeThreads_, coroutines and workers. In parallel computing, these terms are used in a loosely inter-changeable fashion. There are a lot of comments about the different methods of the Coro class, but can we give a definition of these terms too?

My understanding is that: Coroutines (objects of the type Coro) are function pointers which are queued for execution in order of their respective priorities. (I know that this is not an accurate description of Coro class, but something like this would be very helpful).

Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a number of comments, many of which are nits. However I think I spotted a couple of thread race conditions that should be fixed. There's also the for loop in Coroutine_test.cpp that runs thread_specific_storage 1000 times. I'm pretty sure we don't want that to happen every time anyone runs the unit tests.

src/test/core/Coroutine_test.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/Workers.cpp Outdated Show resolved Hide resolved
src/ripple/core/LoadEvent.h Outdated Show resolved Hide resolved
src/ripple/core/LoadEvent.h Outdated Show resolved Hide resolved
src/ripple/core/impl/Job.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/JobQueue.cpp Outdated Show resolved Hide resolved
src/ripple/core/impl/LoadEvent.cpp Outdated Show resolved Hide resolved
@scottschurr
Copy link
Collaborator

The most recent version of this pull request fails the clang-format check. The top-most commit on this brand will fix that: https://github.com/scottschurr/rippled/commits/nik-jqt-clang-format

Other than the clang-format problems, is this branch ready to be re-reviewed? Or is there more work to be done?

@nbougalis
Copy link
Contributor Author

@scottschurr Thanks for the patience, the comments and the clang-format fix. I believe I've addressed everything, rebased the code to the latest and it should be ready for review.

I know there's a couple of clang-format failures still; I'll work around them. The existing formatting rules don't like attributes very much (e.g. [[likely]]) which is unfortunate.

@nbougalis
Copy link
Contributor Author

Ping...

@intelliot
Copy link
Collaborator

@thejohnfreeman @seelabs @scottschurr - looks like this is ready for (re)review.

If you won't be able to have an initial look at this within 2 weeks (and provide some feedback), please comment here to let us know.

@scottschurr
Copy link
Collaborator

@intelliot, I'm not ignoring you, but I have another higher priority code review I'm working on. I can't predict whether or not I'll get to this one in the coming two weeks.

@nbougalis
Copy link
Contributor Author

nbougalis commented Jan 23, 2023

This isn't a high priority item, imo. I'm sure Scott et al are focused on business priorities and that's fine; this can wait.

@intelliot intelliot removed the request for review from scottschurr January 27, 2023 19:03
@intelliot intelliot requested a review from mtrippled April 5, 2023 06:34
Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should start out by saying that I'm really sorry that this pull request has been sitting stagnant for so long. It's hard to have code that you've put effort into just sit and be ignored for such a long time.

At a request from @intelliot I moved this pull request to the top of my review queue. I was requested to only verify that the changes I had flagged had been made. So I did that. But even with that cursory look I stumbled across a place that sure looks like it is missing a mutex lock.

So I can say that it looks like all of the important review comments I had made earlier have been addressed. But the most recent rework contains enough significant changes that it probably should have a full review. I'm fine with either me or someone else doing that additional review. But I'm not ready to give this code a thumbs up yet.

return {};

return std::make_unique<LoadEvent>(iter->second.load(), name, true);
return {jobData_[t].load().sample(), std::move(name), true};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much all of the other accesses to jobData_ are done with mutex_ locked. I have the uneasy sensation that there's a lock missing here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some more time spelunking through the code today. As far as I can tell it's okay that there is no lock here. Sorry for the false alarm. My instinct tells me that it would be good to leave a comment for why that lock is not needed. But that's not essential.

In my defense, I think it's hard to review code using a type that requires locking only some of the time. But I also know that part of the design predates your modifications, @nbougalis.


A number of initial threads may be optionally specified. The
The number of initial threads must be specified. The
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The comment about "The default is to create one thread per CPU" is still present. There can be no default, since the count is required by the signature.

@nbougalis
Copy link
Contributor Author

@scottschurr, no worries. I know this was (a) a large PR; (b) not a priority; and (c) you're swamped. I appreciate you taking another look.

Re: your comment about an apparently missing lock in this bit of code:

return {jobData_[t].load().sample(), std::move(name), true};

You say:

Pretty much all of the other accesses to jobData_ are done with mutex_ locked. I have the uneasy sensation that there's a lock missing here.

Here's my (preliminary) rationale for why a lock isn't missing:

The jobData_ array is constructed once and each entry in the array contains a LoadMonitor instance load_. The load() member function returns a reference to the given member.

The LoadMonitor instance contains a LoadSampler member variable which is:

using LoadSampler = std::function<void(
    char const*,
    std::chrono::steady_clock::duration,
    std::chrono::steady_clock::duration)>;

And sample() returns a std::reference_wrapper<LoadSampler const> to that member.

Given that, I think that it's fine as is, although it's been a long time since I've even looked at this code and I will have to re-check to be sure.

I'll take a closer looks when I rebase this.

Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug into the details regarding the comment I left in a previous review. It looks like my suspicions were unfounded. As far as I can tell no lock is needed at that location.

Since I don't anticipate spending more time on this review, and I don't want to be responsible for holding up this pull request any longer, I'm giving the thumbs up.

The removal of beast::LockFreeStack and semaphore, constexpr-izing jobTypes, and the massive simplification of Workers are all good steps forward for this code base. Thanks for the contribution, @nbougalis.

return {};

return std::make_unique<LoadEvent>(iter->second.load(), name, true);
return {jobData_[t].load().sample(), std::move(name), true};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some more time spelunking through the code today. As far as I can tell it's okay that there is no lock here. Sorry for the false alarm. My instinct tells me that it would be good to leave a comment for why that lock is not needed. But that's not essential.

In my defense, I think it's hard to review code using a type that requires locking only some of the time. But I also know that part of the design predates your modifications, @nbougalis.

Copy link
Collaborator

@scottschurr scottschurr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

nbougalis and others added 3 commits May 2, 2023 07:38
The existing thread pool code uses several layers of indirection which
uses a custom lock-free stack, and offers functionality that supports
features that are never used (e.g. the ability to dynamically adjust
the number of threads in the pool).

This refactoring aims to simplify the code, making it easier to reason
about (although lock-free multi-threaded code is always tricky) what
is happening, and reduce the latency of the thread pool internals.
This commit cleans up and modernizes the JobQueue but does not change
the queueing logic. It focuses on simplifying the code by eliminating
awkward code constructs, like "invalid jobs" and the need for default
constructors.

It leverages modern C++ to initialize tables and data structures at
compile time and replaces `std:map` instances with directly indexed
arrays.

Lastly, it restructures the load tracking infrastructure and reduces
the need for dynamic memory allocations by supporting move semantics
and value types.
The existing code attempted to restrict the instantiation of `Coro`
only to a subset of helper functions, by using the `Coro_create_t`
helper structure. But the structure was public, which limited the
effectiveness of this method.

This commit uses a private type, fixing the issue.
@nbougalis nbougalis mentioned this pull request May 4, 2023
@intelliot
Copy link
Collaborator

@nbougalis if this PR is ready for review and merging (in your opinion), then please avoid force-pushing to the branch. Best practices would require that any changes to the PR after a review should invalidate that review.

Also, when a branch is force-pushed or any changes are made to it, please comment on the PR to explain why the push / change was made.

@nbougalis
Copy link
Contributor Author

How can I avoid force-pushing if I rebase a branch to resolve merge conflicts with upstream?

@intelliot
Copy link
Collaborator

@nbougalis Instead of rebasing, just do a merge commit (git merge). The merge commit resolves merge conflicts with upstream. The merge commit will go away when the PR is merged to develop because we use Squash and merge.

@intelliot
Copy link
Collaborator

@thejohnfreeman what's the status of your review of this?

@intelliot
Copy link
Collaborator

@seelabs will you be able to review this?

@intelliot intelliot added this to the refactors milestone Jul 19, 2023
@seelabs
Copy link
Collaborator

seelabs commented Jul 19, 2023

@intelliot I won't be able to review this week for sure

Comment on lines 106 to 110
@note Calling this when the coroutine can result in undefined
behavior if (a) the coroutine has finished executing by
using 'return' instead of a call to 'yield()'; or (b)
the coroutine is either executing or scheduled for execution
and a call to to yield().
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            @note Calling `post` can result in undefined
                  behavior if (a) the coroutine has finished execution by
                  returning instead of calling `yield()`; or (b)
                  the coroutine is already executing or scheduled for execution.


Once the job starts running, the coroutine execution context is
set up and execution begins either at the start of the coroutine
or, if yield() was previous called, at the statement after that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous -> previously


LogicError(
beast::getCurrentThreadName() +
": Uncaught exception handler invoked with no exception_ptr");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we're talking about the exception pointed by eptr. For this function to be called, that exception must have already been caught to be captured in eptr. If what you're saying is true, then the stack trace is already lost when this function is entered. Are we sure that is true? Or maybe I'm misunderstanding.

// See if we can reuse a paused worker
Worker* worker = m_paused.pop_front();
std::thread th(
[this, name](unsigned int instance) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a preference, but why capture this and name but not instance / count? Could pass them all as parameters and turn this lambda into a function pointer. No biggie, just wondering how the distinction was made.

without a corresponding yield.
/** Resume execution of a suspended coroutine on the current thread.

The coroutine will continues execution from where it last left,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continues -> continue

@intelliot
Copy link
Collaborator

@nbougalis - does this PR still make sense to merge? If so, please consider the above comments and update the branch to be up-to-date with develop. If not, then share a brief explanation and close. Thanks!

@nbougalis nbougalis closed this Oct 16, 2023
@nbougalis nbougalis deleted the jqt branch October 16, 2023 05:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: 📋 Backlog
Development

Successfully merging this pull request may close these issues.

8 participants