Skip to content

Commit

Permalink
Support lowering CPU priority of background threads
Browse files Browse the repository at this point in the history
Summary:
Background activities like compaction can negatively affect
latency of higher-priority tasks like request processing. To avoid this,
rocksdb already lowers the IO priority of background threads on Linux
systems. While this takes care of typical IO-bound systems, it does not
help much when CPU (temporarily) becomes the bottleneck. This is
especially likely when using more expensive compression settings.

This patch adds an API to allow for lowering the CPU priority of
background threads, modeled on the IO priority API. Benchmarks (see
below) show significant latency and throughput improvements when CPU
bound. As a result, workloads with some CPU usage bursts should benefit
from lower latencies at a given utilization, or should be able to push
utilization higher at a given request latency target.

A useful side effect is that compaction CPU usage is now easily visible
in common tools, allowing for an easier estimation of the contribution
of compaction vs. request processing threads.

As with IO priority, the implementation is limited to Linux, degrading
to a no-op on other systems.
Closes #3763

Differential Revision: D7740096

Pulled By: gwicke

fbshipit-source-id: e5d32373e8dc403a7b0c2227023f9ce4f22b413c
  • Loading branch information
Gabriel Wicke authored and facebook-github-bot committed Apr 24, 2018
1 parent affe01b commit 090c78a
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 1 deletion.
1 change: 1 addition & 0 deletions HISTORY.md
Expand Up @@ -10,6 +10,7 @@
* Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data.
* TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki.
* Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage.
* Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks.

### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.
Expand Down
9 changes: 9 additions & 0 deletions env/env_posix.cc
Expand Up @@ -815,6 +815,15 @@ class PosixEnv : public Env {
#endif
}

virtual void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerCPUPriority();
#else
(void)pool;
#endif
}

virtual std::string TimeToString(uint64_t secondsSince1970) override {
const time_t seconds = (time_t)secondsSince1970;
struct tm t;
Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/env.h
Expand Up @@ -397,6 +397,9 @@ class Env {
// Lower IO priority for threads from the specified pool.
virtual void LowerThreadPoolIOPriority(Priority /*pool*/ = LOW) {}

// Lower CPU priority for threads from the specified pool.
virtual void LowerThreadPoolCPUPriority(Priority /*pool*/ = LOW) {}

// Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time) = 0;

Expand Down Expand Up @@ -1092,6 +1095,10 @@ class EnvWrapper : public Env {
target_->LowerThreadPoolIOPriority(pool);
}

void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
target_->LowerThreadPoolCPUPriority(pool);
}

std::string TimeToString(uint64_t time) override {
return target_->TimeToString(time);
}
Expand Down
6 changes: 6 additions & 0 deletions tools/db_bench_tool.cc
Expand Up @@ -992,6 +992,8 @@ DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
"memtable insert with hint with the given prefix size.");
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
"threads' IO priority");
DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
"threads' CPU priority");
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
"table becomes an identity function. This is only valid when key "
"is 8 bytes");
Expand Down Expand Up @@ -3321,6 +3323,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
}
if (FLAGS_enable_cpu_prio) {
FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
}
options.env = FLAGS_env;

if (FLAGS_rate_limiter_bytes_per_sec > 0) {
Expand Down
28 changes: 28 additions & 0 deletions util/threadpool_imp.cc
Expand Up @@ -18,6 +18,7 @@

#ifdef OS_LINUX
# include <sys/syscall.h>
# include <sys/resource.h>
#endif

#include <stdlib.h>
Expand Down Expand Up @@ -54,6 +55,8 @@ struct ThreadPoolImpl::Impl {

void LowerIOPriority();

void LowerCPUPriority();

void WakeUpAllThreads() {
bgsignal_.notify_all();
}
Expand Down Expand Up @@ -98,6 +101,7 @@ struct ThreadPoolImpl::Impl {
static void* BGThreadWrapper(void* arg);

bool low_io_priority_;
bool low_cpu_priority_;
Env::Priority priority_;
Env* env_;

Expand Down Expand Up @@ -126,6 +130,7 @@ inline
ThreadPoolImpl::Impl::Impl()
:
low_io_priority_(false),
low_cpu_priority_(false),
priority_(Env::LOW),
env_(nullptr),
total_threads_limit_(0),
Expand Down Expand Up @@ -172,9 +177,16 @@ void ThreadPoolImpl::Impl::LowerIOPriority() {
low_io_priority_ = true;
}

inline
void ThreadPoolImpl::Impl::LowerCPUPriority() {
std::lock_guard<std::mutex> lock(mu_);
low_cpu_priority_ = true;
}

void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
bool low_io_priority = false;
bool low_cpu_priority = false;

while (true) {
// Wait until there is an item that is ready to run
std::unique_lock<std::mutex> lock(mu_);
Expand Down Expand Up @@ -214,9 +226,20 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
std::memory_order_relaxed);

bool decrease_io_priority = (low_io_priority != low_io_priority_);
bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
lock.unlock();

#ifdef OS_LINUX
if (decrease_cpu_priority) {
setpriority(
PRIO_PROCESS,
// Current thread.
0,
// Lowest priority possible.
19);
low_cpu_priority = true;
}

if (decrease_io_priority) {
#define IOPRIO_CLASS_SHIFT (13)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
Expand All @@ -237,6 +260,7 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
}
#else
(void)decrease_io_priority; // avoid 'unused variable' error
(void)decrease_cpu_priority;
#endif
func();
}
Expand Down Expand Up @@ -425,6 +449,10 @@ void ThreadPoolImpl::LowerIOPriority() {
impl_->LowerIOPriority();
}

void ThreadPoolImpl::LowerCPUPriority() {
impl_->LowerCPUPriority();
}

void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
impl_->SetBackgroundThreadsInternal(num, false);
}
Expand Down
6 changes: 5 additions & 1 deletion util/threadpool_imp.h
Expand Up @@ -46,10 +46,14 @@ class ThreadPoolImpl : public ThreadPool {
// start yet
void WaitForJobsAndJoinAllThreads() override;

// Make threads to run at a lower kernel priority
// Make threads to run at a lower kernel IO priority
// Currently only has effect on Linux
void LowerIOPriority();

// Make threads to run at a lower kernel CPU priority
// Currently only has effect on Linux
void LowerCPUPriority();

// Ensure there is at aleast num threads in the pool
// but do not kill threads if there are more
void IncBackgroundThreadsIfNeeded(int num);
Expand Down

0 comments on commit 090c78a

Please sign in to comment.