Skip to content

Commit

Permalink
MB-18453: Make task scheduling fairer
Browse files Browse the repository at this point in the history
The MB identified that we can starve tasks by scheduling
a higher priority task via ExecutorPool::wake().

This occurs because ExecutorPool::wake() pushes tasks
into the readyQueue enabling frequent wakes to trigger
the starvation bug.

The fix is to remove readyQueue.push from wake, so that we only
push to the readyQueue. The fetch side of scheduling only looks at
the futureQueue once the readyQueue is empty, thus the identified
starvation won't happen.

A unit-test demonstrates the fix using the single-threaded harness and
expects that two tasks of differing priorities get executed, rather
than the wake() starving the low-priority task.

This test drives:
 - ExecutorPool::schedule
 - ExecutorPool::reschedule
 - ExecutorPool::wake

These are all the methods which can add tasks into the scheduler
queue.

The fetch side is also covered:
 - ExecutorPool::fetchNextTask

Change-Id: Ie797a637ce4e7066e3155751ff467bc65d083646
Reviewed-on: http://review.couchbase.org/65385
Well-Formed: buildbot <build@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
jimwwalker authored and daverigby committed Jul 7, 2016
1 parent cae60e3 commit e22c9eb
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 3 deletions.
49 changes: 49 additions & 0 deletions src/executorpool.h
Expand Up @@ -14,6 +14,55 @@
* limitations under the License.
*/

/*
* === High-level overview of the task execution system. ===
*
* ExecutorPool is the core interface for users wishing to run tasks on our
* worker threads.
*
* Under the covers we have a configurable number of system threads that are
* labeled with a type (see task_type_t). These threads service all buckets.
*
* Each thread operates by reading from a shared TaskQueue. Each thread wakes
* up and fetches (TaskQueue::fetchNextTask) a task for execution
* (GlobalTask::run() is called to execute the task).
*
* The pool also has the concept of high and low priority which is achieved by
* having two TaskQueue objects per task-type. When a thread wakes up to run
* a task, it will service the high-priority queue more frequently than the
* low-priority queue.
*
* Within a single queue itself there is also a task priority. The task priority
* is a value where lower is better. When many tasks are ready for execution
* they are moved to a ready queue and sorted by their priority. Thus tasks
* with priority 0 get to go before tasks with priority 1. Only once the ready
* queue of tasks is empty will we consider looking for more eligible tasks.
* In this context, an eligible task is one that has a wakeTime <= now.
*
* === Important methods of the ExecutorPool ===
*
* ExecutorPool* ExecutorPool::get()
* The ExecutorPool is accessed via the static get() method. Calling get
* returns the processes global ExecutorPool object. This is an instance
* that is global/shared between all buckets.
*
* ExecutorPool::schedule(ExTask task, task_type_t qidx)
* The schedule method allows task to be scheduled for future execution by a
* thread of type 'qidx'. The task's 'wakeTime' determines approximately when
* the task will be executed (no guarantees).
*
* ExecutorPool::wake(size_t taskId)
* The wake method allows for a caller to request that the task matching
* taskId be executed by its thread-type now'. The tasks wakeTime is modified
* so that it has a wakeTime of now and a thread of the correct type is
* signaled to wake-up and perform fetching. The woken task will have to wait
* for any current tasks to be executed first, but it will jump ahead of other
* tasks as tasks that are ready to run are ordered by their priority.
*
* ExecutorPool::snooze(size_t taskId, double toSleep)
* The pool's snooze method will locate the task matching taskId and adjust
* its wakeTime to account for the toSleep value.
*/
#ifndef SRC_EXECUTORPOOL_H_
#define SRC_EXECUTORPOOL_H_ 1

Expand Down
6 changes: 3 additions & 3 deletions src/taskqueue.cc
Expand Up @@ -276,11 +276,11 @@ void TaskQueue::_wake(ExTask &task) {
while (!notReady.empty()) {
ExTask tid = notReady.front();
if (tid->getWaketime() <= now || tid->isdead()) {
readyQueue.push(tid);
numReady++;
} else {
futureQueue.push(tid);
}

// MB-18453: Only push to the futureQueue
futureQueue.push(tid);
notReady.pop();
}

Expand Down
42 changes: 42 additions & 0 deletions tests/module_tests/evp_store_single_threaded_test.cc
Expand Up @@ -324,3 +324,45 @@ TEST_F(SingleThreadedEPStoreTest, MB18452_yield_dcp_processor) {
// Drop the stream
consumer->closeStream(/*opaque*/0, vbid);
}

/*
* MB-18953 is triggered by the executorpool wake path moving tasks directly
* into the readyQueue, thus allowing for high-priority tasks to dominiate
* a taskqueue.
*/
TEST_F(SingleThreadedEPStoreTest, MB18953_taskWake) {
auto& lpNonioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];

class TestTask : public GlobalTask {
public:
TestTask(EventuallyPersistentEngine* e, const Priority& prio)
: GlobalTask(e, prio, 0.0, false) {}

// returning true will also drive the ExecutorPool::reschedule path.
bool run() { return true; }

std::string getDescription() {
return "TestTask " + std::to_string(priority.getPriorityValue());
}
};

ExTask hpTask = new TestTask(engine.get(),
Priority::PendingOpsPriority);
task_executor->schedule(hpTask, NONIO_TASK_IDX);

ExTask lpTask = new TestTask(engine.get(),
Priority::CheckpointRemoverPriority);
task_executor->schedule(lpTask, NONIO_TASK_IDX);

runNextTask(lpNonioQ, "TestTask 0"); // hptask goes first
// Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
lpNonioQ.wake(hpTask);
runNextTask(lpNonioQ, "TestTask 6"); // lptask goes second

// Run the tasks again to check that coming from ::reschedule our
// expectations are still met.
runNextTask(lpNonioQ, "TestTask 0"); // hptask goes first
// Ensure that a wake to the hpTask doesn't mean the lpTask gets ignored
lpNonioQ.wake(hpTask);
runNextTask(lpNonioQ, "TestTask 6"); // lptask goes second
}

0 comments on commit e22c9eb

Please sign in to comment.