Skip to content

Commit

Permalink
MB-37144: Don't set AuxIO & NonIO thread pri to lowest
Browse files Browse the repository at this point in the history
When setting the priority of Writer threads to lowest,
ExecutorThread::start() was incorrectly setting the priority of the
_current_ thread, not the thread just created. As a result this:

- Didn't lower the priority of the first writer thread

- Also lowered the priority of all threads created afterwards
  (i.e. the AuxIO and NonIO threads).

Fix by moving the setpriority() call to after the thread has been
created, to ExecutorThread::run().

Change-Id: I39dcf0aeda216441260204b912689cab0a4af8a3
Reviewed-on: http://review.couchbase.org/118861
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
daverigby committed Dec 5, 2019
1 parent c4f9f45 commit f4a3c7c
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 28 deletions.
61 changes: 36 additions & 25 deletions engines/ep/src/executorthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ void ExecutorThread::start() {
throw std::runtime_error(ss.str().c_str());
}

EP_LOG_DEBUG("{}: Started", name);
}

void ExecutorThread::stop(bool wait) {
if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
return;
}

state = EXECUTOR_SHUTDOWN;

if (!wait) {
EP_LOG_INFO("{}: Stopping", name);
return;
}
cb_join_thread(thread);
EP_LOG_INFO("{}: Stopped", name);
}

void ExecutorThread::run() {
EP_LOG_DEBUG("Thread {} running..", getName());

// Decrease the priority of Writer threads to lessen their impact on
// other threads (esp front-end workers which should be prioritized ahead
// of non-critical path Writer tasks (both Flusher and Compaction).
Expand All @@ -68,15 +89,16 @@ void ExecutorThread::start() {
// Writer (SyncWrite flushes) on the High IO thread pool; keeping
// non-persist SyncWrites / normal mutations & compaction on the Low IO
// pool.
#if defined(__linux__) || defined(_WIN32)
// Only doing this for Linux & Windows at present:
#if defined(__linux__)
// Only doing this for Linux at present:
// - On Windows folly's getpriority() compatability function changes the
// priority of the entire process.
// - On macOS setpriority(PRIO_PROCESS) affects the entire process (unlike
// Linux where it's only the current thread), hence calling setpriority()
// would be pointless.
if (taskType == WRITER_TASK_IDX) {
// Note Linux uses the range -19..20; whereas the Folly implementation
// of setpriority uses 0..39.
const int lowestPriority = folly::kIsLinux ? 19 : 39;
// Note Linux uses the range -20..19 (highest..lowest).
const int lowestPriority = 19;
if (setpriority(PRIO_PROCESS,
/*Current thread*/ 0,
lowestPriority)) {
Expand All @@ -89,26 +111,7 @@ void ExecutorThread::start() {
}
#endif

EP_LOG_DEBUG("{}: Started", name);
}

void ExecutorThread::stop(bool wait) {
if (!wait && (state == EXECUTOR_SHUTDOWN || state == EXECUTOR_DEAD)) {
return;
}

state = EXECUTOR_SHUTDOWN;

if (!wait) {
EP_LOG_INFO("{}: Stopping", name);
return;
}
cb_join_thread(thread);
EP_LOG_INFO("{}: Stopped", name);
}

void ExecutorThread::run() {
EP_LOG_DEBUG("Thread {} running..", getName());
priority = getpriority(PRIO_PROCESS, 0);

for (uint8_t tick = 1;; tick++) {
resetCurrentTask();
Expand Down Expand Up @@ -286,3 +289,11 @@ const std::string ExecutorThread::getStateName() {
return std::string("dead");
}
}

task_type_t ExecutorThread::getTaskType() const {
return taskType;
}

int ExecutorThread::getPriority() const {
return priority;
}
10 changes: 10 additions & 0 deletions engines/ep/src/executorthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ class ExecutorThread {
now.setTimePoint(std::chrono::steady_clock::now());
}

/// @return the threads' type.
task_type_t getTaskType() const;

/// Return the threads' OS priority.
int getPriority() const;

protected:

cb_thread_t thread;
Expand All @@ -156,4 +162,8 @@ class ExecutorThread {

std::mutex currentTaskMutex; // Protects currentTask
ExTask currentTask;

// OS priority of the thread. Only available once the thread
// has been started.
int priority = 0;
};
79 changes: 76 additions & 3 deletions engines/ep/tests/module_tests/executorpool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ void MockTaskable::logRunTime(
TaskId id, const std::chrono::steady_clock::duration runTime) {
}

ExTask makeTask(Taskable& taskable, ThreadGate& tg, size_t i) {
ExTask makeTask(Taskable& taskable, ThreadGate& tg, TaskId taskId) {
return std::make_shared<LambdaTask>(
taskable, TaskId::StatSnap, 0, true, [&]() -> bool {
taskable, taskId, 0, true, [&]() -> bool {
tg.threadUp();
return false;
});
Expand Down Expand Up @@ -135,7 +135,8 @@ TEST_F(ExecutorPoolTest, increase_workers) {
std::vector<ExTask> tasks;

for (size_t i = 0; i < numWriters + 1; ++i) {
ExTask task = makeTask(taskable, tg, i);
// Use any Writer thread task (StatSnap) for the TaskId.
ExTask task = makeTask(taskable, tg, TaskId::StatSnap);
pool.schedule(task);
tasks.push_back(task);
}
Expand All @@ -154,6 +155,78 @@ TEST_F(ExecutorPoolTest, increase_workers) {
pool.unregisterTaskable(taskable, false);
}

// Verifies the priority of the different thread types. On Windows and Linux
// the Writer threads should be low priority.
TEST_F(ExecutorPoolTest, ThreadPriorities) {
// Create test pool and register a (mock) taskable to start all threads.
TestExecutorPool pool(10, // MaxThreads
NUM_TASK_GROUPS,
ThreadPoolConfig::ThreadCount(2), // MaxNumReaders
ThreadPoolConfig::ThreadCount(2), // MaxNumWriters
2, // MaxNumAuxio
2 // MaxNumNonio
);

const size_t totalNumThreads = 8;

// Given we have to wait for all threads to be running (called
// ::run()) before their priority will be set, use a ThreadGate
// with a simple Task which calls threadUp() to ensure all threads
// have started before checking priorities.
MockTaskable taskable;
pool.registerTaskable(taskable);
std::vector<ExTask> tasks;
ThreadGate tg{totalNumThreads};

// Need 2 tasks of each type, so both threads of each type are
// started.
// Reader
tasks.push_back(makeTask(taskable, tg, TaskId::MultiBGFetcherTask));
tasks.push_back(makeTask(taskable, tg, TaskId::MultiBGFetcherTask));
// Writer
tasks.push_back(makeTask(taskable, tg, TaskId::FlusherTask));
tasks.push_back(makeTask(taskable, tg, TaskId::FlusherTask));
// AuxIO
tasks.push_back(makeTask(taskable, tg, TaskId::AccessScanner));
tasks.push_back(makeTask(taskable, tg, TaskId::AccessScanner));
// NonIO
tasks.push_back(makeTask(taskable, tg, TaskId::ItemPager));
tasks.push_back(makeTask(taskable, tg, TaskId::ItemPager));

for (auto& task : tasks) {
pool.schedule(task);
}
tg.waitFor(std::chrono::seconds(10));
EXPECT_TRUE(tg.isComplete()) << "Timeout waiting for threads to start";

// Windows (via folly portability) uses 20 for default (normal) priority.
const int defaultPriority = folly::kIsWindows ? 20 : 0;

// We only set Writer threads to a non-default level on Linux.
const int expectedWriterPriority = folly::kIsLinux ? 19 : defaultPriority;

auto threads = pool.getThreads();
ASSERT_EQ(totalNumThreads, threads.size());
for (const auto* thread : threads) {
switch (thread->getTaskType()) {
case WRITER_TASK_IDX:
EXPECT_EQ(expectedWriterPriority, thread->getPriority())
<< "for thread: " << thread->getName();
break;
case READER_TASK_IDX:
case AUXIO_TASK_IDX:
case NONIO_TASK_IDX:
EXPECT_EQ(defaultPriority, thread->getPriority())
<< "for thread: " << thread->getName();
break;
default:
FAIL() << "Unexpected task type: " << thread->getTaskType();
}
}

pool.unregisterTaskable(taskable, false);
}

TEST_F(ExecutorPoolDynamicWorkerTest, decrease_workers) {
ASSERT_EQ(2, pool->getNumWriters());
pool->setNumWriters(ThreadPoolConfig::ThreadCount(1));
Expand Down
9 changes: 9 additions & 0 deletions engines/ep/tests/module_tests/executorpool_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ class TestExecutorPool : public ExecutorPool {
return output;
}

// Returns a vector of the registered ExecutorThreads, non-owning.
// WARNING: Not safe to reduce thread pool size while the result of
// this method is still in use.
ThreadQ getThreads() {
LockHolder lh(tMutex);
ThreadQ result = threadQ;
return result;
}

bool threadExists(std::string name) {
auto names = getThreadNames();
return std::find(names.begin(), names.end(), name) != names.end();
Expand Down

0 comments on commit f4a3c7c

Please sign in to comment.