-
Notifications
You must be signed in to change notification settings - Fork 6.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support reservation in thread pool #10278
Support reservation in thread pool #10278
Conversation
a223e93
to
c9ac504
Compare
edd08b6
to
1683b55
Compare
f9a747b
to
5be7758
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, had a few comments.
Let's mention the Public API change in HISTORY.md so users know to update their Env
implementation.
I think total_threads_limit_ is always accessed with ThreadPoolImpl mutex held, in which case we don't need to use atomic operations.
8509ca3
to
f9be4b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
HISTORY.md
Outdated
@@ -11,6 +11,7 @@ | |||
* `rocksdb_level_metadata_t` and its and its get functions & destroy function. | |||
* `rocksdb_file_metadata_t` and its and get functions & destroy functions. | |||
* Add suggest_compact_range() and suggest_compact_range_cf() to C API. | |||
* Add two functions `int ReserveThreads(int threads_to_be_reserved)` and `int ReleaseThreads(threads_to_be_released)` into `Env` class. In the default implementation, both return 0. Newly added `xxxEnv` class that inherits `Env` should implement these two functions for compatibility. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should implement these two functions for compatibility
I think the main reason to implement them is to enable new features, initially subcompactions for round-robin compactions. The default implementations should already provide compatible API/behavior by not using subcompactions in round-robin.
util/threadpool_imp.cc
Outdated
return static_cast<int>(thread_id) + reserved_threads_ >= | ||
total_threads_limit_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems inconsistent with HasExcessiveThread()
. Personally I'd not terminate/recreate thread-pool threads for reservations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your concern. But if we do that, how do we pend incoming tasks when most threads are reserved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point. I guess we could resolve the inconsistency either by changing the name IsExcessiveThread()
to something else like IsBlockedThread()
, or by adding back the reserved_threads_
into HasExcessiveThread()
. Up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite following why the new IsBlockedThread()
implementation is correct in preventing extra threads being woken up as a result of not considering reserved threads. See (4) in my review summary #10278 (comment) for more.
4aa4236
to
937b831
Compare
util/threadpool_imp.cc
Outdated
if (reserved_threads_ + threads_to_be_reserved > num_waiting_) { | ||
reserved_threads_in_success = num_waiting_ - reserved_threads_; | ||
reserved_threads_ = num_waiting_; | ||
} else { | ||
reserved_threads_ += threads_to_be_reserved; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it equivalent to
reserved_threads_in_success = std::min(num_waiting_ - reserved_threads_, threads_to_be_reserved)
reserved_threads_ += reserved_threads_in_success;
Similar simplification for ReleaseThreads()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Will apply this change soon.
util/threadpool_imp.cc
Outdated
// Number of reserved threads by other tasks in bgthreads_ | ||
int reserved_threads_; | ||
// Number of waiting threads (Maximum number of threads that can be reserved) | ||
int num_waiting_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: num_waiting_threads_
(I generally prefer more explicit naming for variable so that people don't have to look at comment to understand its intention)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
@@ -110,6 +142,10 @@ struct ThreadPoolImpl::Impl { | |||
|
|||
int total_threads_limit_; | |||
std::atomic_uint queue_len_; // Queue length. Used for stats reporting | |||
// Number of reserved threads by other tasks in bgthreads_ | |||
int reserved_threads_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: num_reserved_threads_ to be consistent with below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"other tasks" sounds a bit random/unclear, although I understand it refers to tasks that reserve threads (e.g, subcompaction) as opposed to tasks do not reserve threads (e.g, flush) because I was exposed to this PR's motivation.
Is there a way to better clarify the comment so that people w/o much context of your PR understand it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. I will revise it accordingly.
util/threadpool_imp.cc
Outdated
@@ -189,10 +229,14 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { | |||
// Wait until there is an item that is ready to run | |||
std::unique_lock<std::mutex> lock(mu_); | |||
// Stop waiting if the thread needs to do work or needs to terminate. | |||
// increase num_waiting_ once this task has finished |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be "increase num_waiting_ once the thread starts waiting" to be consistent with "decrease num_waiting_ once the thread is not waiting" below?
nit: Start with a capitalized letter in the comments to be consistent with others such as "Increase ..." or "Decrease ..." (below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my typo. Thanks for pointing out this.
util/threadpool_imp.h
Outdated
// scheduled jobs. (This function will reset reserved_threads to 0, | ||
// reset is only triggered when total_threads_limit changes, typically, | ||
// (num > total_threads_limit) || (num < total_threads_limit && allow_reduce) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh, thanks a lot for catching this! This was an old comment for my earlier implementation. I will remove this.
66dbda7
to
fad97e1
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
fad97e1
to
90a70ad
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
90a70ad
to
f332ca1
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
0861082
to
e7f8bb9
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
e7f8bb9
to
5bce90c
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
@littlepig2013 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
5bce90c
to
db8370d
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
@littlepig2013 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
@@ -155,6 +191,8 @@ void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { | |||
// prevent threads from being recreated right after they're joined, in case | |||
// the user is concurrently submitting jobs. | |||
total_threads_limit_ = 0; | |||
reserved_threads_ = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to take action - I just want to note that it's interesting we are setting reserved_threads_ = 0 while ThreadPoolImpl does not actually have the power to terminate those reserved threads. IMO, this is the biggest drawback of this approach exchanged for the benefit of implementational simplicity.
@@ -1271,7 +1401,6 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { | |||
} | |||
} | |||
}); | |||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit weird since i did not change that.
env/env_test.cc
Outdated
"EnvTest::ReserveThreads:0"}}); | ||
// Empty the thread pool to ensure all the threads can start later | ||
env_->SetBackgroundThreads(0, Env::Priority::HIGH); | ||
ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it seems to me that it's possible for ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 0);
to be true while ThreadPoolImpl::BGThread::Termination:th0
has not been run?
More of a question from me than suggestion - should we put ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 0);
after TEST_SYNC_POINT("EnvTest::ReserveThreads:0");
or remove ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 0);
as what we need is actually the snyc point guarantee?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we are safe to remove it
env/env_test.cc
Outdated
// Set the number of threads to 3 so that Task 3 can dequeue | ||
env_->SetBackgroundThreads(3, Env::Priority::HIGH); | ||
// Wakup Task 1 | ||
tasks[1].WakeUp(); | ||
ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros)); | ||
// Task 2, 3 running (Task 3 dequeue); 0/1 reserved threads | ||
ASSERT_FALSE(tasks[3].TimedWaitUntilSleeping(kWaitMicros)); | ||
ASSERT_TRUE(tasks[3].IsSleeping()); | ||
ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); | ||
|
||
// At most 1 thread can be released | ||
ASSERT_GT(2, env_->ReleaseThreads(3, Env::Priority::HIGH)); | ||
tasks[2].WakeUp(); | ||
ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros)); | ||
tasks[3].WakeUp(); | ||
ASSERT_FALSE(tasks[3].TimedWaitUntilDone(kWaitMicros)); | ||
WaitThreadPoolsEmpty(); | ||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these tests scenarios that haven't been covered previously OR just clean-up? Just want to make sure we are also not making the test unnecessarily long due to duplicating testing scenarios.
Not really sure what 0/1 reserved threads
means - is it "0 or 1 reserved thread"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 964 - 968 is just clean-up.
yes, I will change it to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your efforts to cover different scenarios in the UT along with great comments! Left a few nits!
db8370d
to
dcdfa5b
Compare
@littlepig2013 has updated the pull request. You must reimport the pull request before landing. |
@littlepig2013 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Summary: Update HISTORY.md for CompactionPri::kRoundRobin. Detailed implementation can be found in [PR10107](#10107), [PR10227](#10227), [PR10250](#10250), [PR10278](#10278), [PR10316](#10316), and [PR10341](#10341) Pull Request resolved: #10421 Reviewed By: ajkr Differential Revision: D38194070 Pulled By: littlepig2013 fbshipit-source-id: 4ce153dc0bf22cd865d09c5429955023dbc90f37
Summary:
Add
ReserveThreads
andReleaseThreads
functions in thread pool to support reservation in for a specific thread pool. With this feature, a thread will be blocked if the number of waiting threads (noted bynum_waiting_threads_
) equals the number of reserved threads (noted byreserved_threads_
), normallyreserved_threads_
is upper bounded bynum_waiting_threads_
; in rare cases (e.g.SetBackgroundThreadsInternal
is called when some threads are already reserved),num_waiting_threads_
can be less thanreserved_threads
.Test Plan:
Add
ReserveThreads
unit test inenv_test
. Update the unit testSimpleColumnFamilyInfoTest
inthread_list_test
with addingReserveThreads
related assertions.