Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support lowering CPU priority of background threads #3763

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Expand Up @@ -9,6 +9,7 @@
* Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data.
* TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki.
* Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage.
* Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks.

### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
Expand Down
9 changes: 9 additions & 0 deletions env/env_posix.cc
Expand Up @@ -815,6 +815,15 @@ class PosixEnv : public Env {
#endif
}

virtual void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerCPUPriority();
#else
(void)pool;
#endif
}

virtual std::string TimeToString(uint64_t secondsSince1970) override {
const time_t seconds = (time_t)secondsSince1970;
struct tm t;
Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/env.h
Expand Up @@ -397,6 +397,9 @@ class Env {
// Lower IO priority for threads from the specified pool.
virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {}

// Lower CPU priority for threads from the specified pool.
virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {}

// Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time) = 0;

Expand Down Expand Up @@ -1092,6 +1095,10 @@ class EnvWrapper : public Env {
target_->LowerThreadPoolIOPriority(pool);
}

void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
target_->LowerThreadPoolCPUPriority(pool);
}

std::string TimeToString(uint64_t time) override {
return target_->TimeToString(time);
}
Expand Down
6 changes: 6 additions & 0 deletions tools/db_bench_tool.cc
Expand Up @@ -992,6 +992,8 @@ DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
"memtable insert with hint with the given prefix size.");
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
"threads' IO priority");
DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
"threads' CPU priority");
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
"table becomes an identity function. This is only valid when key "
"is 8 bytes");
Expand Down Expand Up @@ -3321,6 +3323,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
}
if (FLAGS_enable_cpu_prio) {
FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
}
options.env = FLAGS_env;

if (FLAGS_rate_limiter_bytes_per_sec > 0) {
Expand Down
28 changes: 28 additions & 0 deletions util/threadpool_imp.cc
Expand Up @@ -18,6 +18,7 @@

#ifdef OS_LINUX
# include <sys/syscall.h>
# include <sys/resource.h>
#endif

#include <stdlib.h>
Expand Down Expand Up @@ -54,6 +55,8 @@ struct ThreadPoolImpl::Impl {

void LowerIOPriority();

void LowerCPUPriority();

void WakeUpAllThreads() {
bgsignal_.notify_all();
}
Expand Down Expand Up @@ -98,6 +101,7 @@ struct ThreadPoolImpl::Impl {
static void* BGThreadWrapper(void* arg);

bool low_io_priority_;
bool low_cpu_priority_;
Env::Priority priority_;
Env* env_;

Expand Down Expand Up @@ -126,6 +130,7 @@ inline
ThreadPoolImpl::Impl::Impl()
:
low_io_priority_(false),
low_cpu_priority_(false),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(0),
Expand Down Expand Up @@ -172,9 +177,16 @@ void ThreadPoolImpl::Impl::LowerIOPriority() {
low_io_priority_ = true;
}

inline
void ThreadPoolImpl::Impl::LowerCPUPriority() {
std::lock_guard<std::mutex> lock(mu_);
low_cpu_priority_ = true;
}

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false;
bool low_cpu_priority = false;

while (true) {
// Wait until there is an item that is ready to run
std::unique_lock<std::mutex> lock(mu_);
Expand Down Expand Up @@ -214,9 +226,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
std::memory_order_relaxed);

bool decrease_io_priority = (low_io_priority != low_io_priority_);
bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
lock.unlock();

#ifdef OS_LINUX
if (decrease_cpu_priority) {
setpriority(
PRIO_PROCESS,
// Current thread.
0,
// Lowest priority possible.
19);
low_cpu_priority = true;
}

if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
Expand All @@ -237,6 +260,7 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
}
#else
(void)decrease_io_priority; // avoid 'unused variable' error
(void)decrease_cpu_priority;
#endif
func();
}
Expand Down Expand Up @@ -425,6 +449,10 @@ void ThreadPoolImpl::LowerIOPriority() {
impl_->LowerIOPriority();
}

void ThreadPoolImpl::LowerCPUPriority() {
impl_->LowerCPUPriority();
}

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
impl_->SetBackgroundThreadsInternal(num, false);
}
Expand Down
6 changes: 5 additions & 1 deletion util/threadpool_imp.h
Expand Up @@ -46,10 +46,14 @@ class ThreadPoolImpl : public ThreadPool {
// start yet
void WaitForJobsAndJoinAllThreads() override;

// Make threads to run at a lower kernel priority
// Make threads to run at a lower kernel IO priority
// Currently only has effect on Linux
void LowerIOPriority();

// Make threads to run at a lower kernel CPU priority
// Currently only has effect on Linux
void LowerCPUPriority();

// Ensure there is at aleast num threads in the pool
// but do not kill threads if there are more
void IncBackgroundThreadsIfNeeded(int num);
Expand Down