Skip to content

Commit

Permalink
MB-18453: Give all tasks their own stats and priority
Browse files Browse the repository at this point in the history
MB-18453 identified that tasks have copied & pasted
constructors which leads to some tasks all having the
same Priority object.

The fallout of this is that many tasks now all contribute
to the same histogram for runtime and scheduling waittime.
When debugging issues which lead to MB-18453 it is near
impossible at times to know which real task was delayed
as the stats can be attributed to many tasks.

This commit introduces makes all tasks have their own ID
and thus their own histograms and also makes it easier
for new tasks to be created without forgetting to create
a new Priority instance.

tasks.defs.h is a new file that captures every sub-class
of GlobalTask and shows the priority of all tasks.

TASK macros are now used to generate various switch
statements and enums used in task accounting.

The new system is not strict, MyTask could still be
assigned the priority/id of OldTask, however this
flexibility can be useful in some circumstances.

Note this patch has changed ep_testsuite test_item_pager
to increase the max_size value in the test config. This
is because this patch increases the baseline heap usage
of a bucket as we've increased the number of Histogram
object allocated by EventuallyPersistentStore.

Prior to this patch 28 were allocated, with this patch
51 are allocated (1 per task). Each Histogram<hrtime_t
is approx 1568 bytes (on OSX clang build).

The new max_size is 2.5MiB

Change-Id: I209c67945b964023615af37a12f83ca97142ce53
Reviewed-on: http://review.couchbase.org/65253
Well-Formed: buildbot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: buildbot <build@couchbase.com>
  • Loading branch information
jimwwalker authored and daverigby committed Jul 4, 2016
1 parent 7d9dc5d commit 6403bc0
Show file tree
Hide file tree
Showing 34 changed files with 783 additions and 593 deletions.
8 changes: 1 addition & 7 deletions CMakeLists.txt
Expand Up @@ -131,7 +131,7 @@ ADD_LIBRARY(ep SHARED
src/failover-table.cc src/flusher.cc src/htresizer.cc
src/item.cc src/item_pager.cc src/kvshard.cc
src/memory_tracker.cc src/murmurhash3.cc
src/mutex.cc src/priority.cc
src/mutex.cc
src/executorthread.cc
src/sizes.cc
${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
Expand Down Expand Up @@ -194,7 +194,6 @@ ADD_EXECUTABLE(ep-engine_stream_test
src/mutation_log.cc
src/mutex.cc
src/objectregistry.cc
src/priority.cc
src/tapconnection.cc
src/stored-value.cc
src/tapthrottle.cc
Expand Down Expand Up @@ -258,9 +257,6 @@ ADD_EXECUTABLE(ep-engine_mutex_test
tests/module_tests/mutex_test.cc src/testlogger.cc src/mutex.cc)
TARGET_LINK_LIBRARIES(ep-engine_mutex_test platform)

ADD_EXECUTABLE(ep-engine_priority_test tests/module_tests/priority_test.cc src/priority.cc)
TARGET_LINK_LIBRARIES(ep-engine_priority_test platform)

ADD_EXECUTABLE(ep-engine_ringbuffer_test tests/module_tests/ringbuffer_test.cc)
TARGET_LINK_LIBRARIES(ep-engine_ringbuffer_test platform)

Expand Down Expand Up @@ -306,7 +302,6 @@ ADD_EXECUTABLE(ep-engine_kvstore_test
src/mutation_log.cc
src/mutex.cc
src/objectregistry.cc
src/priority.cc
src/tapconnection.cc
src/stored-value.cc
src/tapthrottle.cc
Expand Down Expand Up @@ -334,7 +329,6 @@ ADD_TEST(ep-engine_histo_test ep-engine_histo_test)
ADD_TEST(ep-engine_hrtime_test ep-engine_hrtime_test)
ADD_TEST(ep-engine_misc_test ep-engine_misc_test)
ADD_TEST(ep-engine_mutex_test ep-engine_mutex_test)
ADD_TEST(ep-engine_priority_test ep-engine_priority_test)
ADD_TEST(ep-engine_ringbuffer_test ep-engine_ringbuffer_test)
ADD_TEST(ep-engine_stream_test ep-engine_stream_test)
ADD_TEST(ep-engine_kvstore_test ep-engine_kvstore_test)
Expand Down
270 changes: 270 additions & 0 deletions README.md
@@ -0,0 +1,270 @@
# Eventually Persistent Engine
## Threads
Code in ep-engine is executing in a multithreaded environment, two classes of
thread exist.

1. memcached's threads, for servicing a client and calling in via the
[engine API] (https://github.com/couchbase/memcached/blob/master/include/memcached/engine.h)
2. ep-engine's threads, for running tasks such as the document expiry pager
(see subclasses of `GlobalTasks`).

## Synchronisation Primitives

There are three mutual-exclusion primitives available in ep-engine.

1. `Mutex` exclusive lock - [mutex.h](./src/mutex.h)
2. `RWLock` shared, reader/writer lock - [rwlock.h](./src/rwlock.h)
3. `SpinLock` 1-byte exclusive lock - [atomix.h](./src/atomic.h)

A conditional-variable is also available called `SyncObject`
[syncobject.h](./src/syncobject.h). `SyncObject` glues a `Mutex` and
conditional-variable together in one object.

These primitives are managed via RAII wrappers - [locks.h](./src/locks.h).

1. `LockHolder` - for acquiring a `Mutex` or `SyncObject`.
2. `MultiLockHolder` - for acquiring an array of `Mutex` or `SyncObject`.
3. `WriterLockHolder` - for acquiring write access to a `RWLock`.
4. `ReaderLockHolder` - for acquiring read access to a `RWLock`.
5. `SpinLockHolder` - for acquiring a `SpinLock`.

## Mutex
The general style is to create a `LockHolder` when you need to acquire a
`Mutex`, the constructor will acquire and when the `LockHolder` goes out of
scope, the destructor will release the `Mutex`. For certain use-cases the
caller can explicitly lock/unlock a `Mutex` via the `LockHolder` class.

```c++
Mutex mutex;
void example1() {
LockHolder lockHolder(&mutex);
...
return;
}

void example2() {
LockHolder lockHolder(&mutex);
...
lockHolder.unlock();
...
lockHolder.lock();
...
return;
}
```

A `MultiLockHolder` allows an array of locks to be conveniently acquired and
released, and similarly to `LockHolder` the caller can choose to manually
lock/unlock at any time (with all locks locked/unlocked via one call).

```c++
Mutex mutexes[10];
Object objects[10];
void foo() {
MultiLockHolder lockHolder(&mutexes, 10);
for (int ii = 0; ii < 10; ii++) {
objects[ii].doStuff();
}
return;
}
```

## RWLock

`RWLock` allows many readers to acquire it and exclusive access for a writer.
`ReadLockHolder` acquires the lock for a reader and `WriteLockHolder` acquires
the lock for a writer. Neither classes enable manual lock/unlock, all
acquisitions and release are performed via the constructor and destructor.

```c++
RWLock rwLock;
Object thing;

void foo1() {
ReaderLockHolder rlh(&rwLock);
if (thing.getData()) {
...
}
}

void foo2() {
WriterLockHolder wlh(&rwLock);
thing.setData(...);
}
```

## SyncObject

`SyncObject` inherits from `Mutex` and is thus managed via a `LockHolder` or
`MultiLockHolder`. The `SyncObject` provides the conditional-variable
synchronisation primitive enabling threads to block and be woken.

The wait/wakeOne/wake method is provided by the `SyncObject`.

Note that `wake` will wake up a single blocking thread, `wakeOne` will wake up
every thread that is blocking on the `SyncObject`.

```c++
SyncObject syncObject;
bool sleeping = false;
void foo1() {
LockHolder lockHolder(&syncObject);
sleeping = true;
syncObject.wait(); // the mutex is released and the thread put to sleep
// when wait returns the mutex is reacquired
sleeping = false;
}

void foo2() {
LockHolder lockHolder(&syncObject);
if (sleeping) {
syncObject.notifyOne();
}
}
```

## SpinLock

A `SpinLock` uses a single byte for the lock and our own code to spin until the
lock is acquired. The intention for this lock is for low contention locks.

The RAII pattern is just like for a Mutex.


```c++
SpinLock spinLock;
void example1() {
SpinLockHolder lockHolder(&spinLock);
...
return;
}
```

## _UNLOCKED convention

ep-engine has a function naming convention that indicates the function should
be called with a lock acquired.

For example the following `doStuff_UNLOCKED` method indicates that it expect a
lock to be held before the function is called. What lock should be acquired
before calling is not defined by the convention.

```c++
void Object::doStuff_UNLOCKED() {
}

void Object::run() {
LockHolder lockHolder(&mutex);
doStuff_UNLOCKED();
return;
}
```
## Thread Local Storage (ObjectRegistry).

Threads in ep-engine are servicing buckets and when a thread is dispatched to
serve a bucket, the pointer to the `EventuallyPersistentEngine` representing
the bucket is placed into thread local storage, this avoids the need for the
pointer to be passed along the chain of execution as a formal parameter.

Both threads servicing frontend operations (memcached's threads) and ep-engine's
own task threads will save the bucket's engine pointer before calling down into
engine code.

Calling `ObjectRegistry::onSwitchThread(enginePtr)` will save the `enginePtr`
in thread-local-storage so that subsequent task code can retrieve the pointer
with `ObjectRegistry::getCurrentEngine()`.

## Tasks

A task is created by creating a sub-class (the `run()` method is the entry point
of the task) of the `GlobalTask` class and it is scheduled onto one of 4 task
queue types. Each task should be declared in `src/tasks.defs.h` using the TASK
macro. Using this macro ensures correct generation of a task-type ID, priority,
task name and ultimately ensures each task gets its own scheduling statistics.

The recipe is simple.

### Add your task's class name with its priority into `src/tasks.defs.h`
* A lower value priority is 'higher'.
```
TASK(MyNewTask, 1) // MyNewTask has priority 1.
```

### Create your class and set its ID using `MY_TASK_ID`.

```
class MyNewTask : public GlobalTask {
public:
MyNewTask(EventuallyPersistentEngine* e)
: GlobalTask(e/*engine/,
MY_TASK_ID(MyNewTask),
0.0/*snooze*/){}
...
```

### Define pure-virtual methods in `MyNewTask`
* run method

The run method is invoked when the task is executed. The method should return
true if it should be scheduled again. If false is returned, the instance of the
task is never re-scheduled and will deleted once all references to the instance are
gone.

```
bool run() {
// Task code here
return schedule again?;
}
```

* Define the `getDescription` method to aid debugging and statistics.
```
std::string getDescription() {
return "A brief description of what MyNewTask does";
}
```

### Schedule your task to the desired queue.
```
ExTask myNewTask = new MyNewTask(&engine);
myNewTaskId = ExecutorPool::get()->schedule(myNewTask, NONIO_TASK_IDX);
```

The 4 task queue types are:
* Readers - `READER_TASK_IDX`
* Tasks that should primarily only read from 'disk'. They generally read from
the vbucket database files, for example background fetch of a non-resident document.
* Writers (they are allowed to read too) `WRITER_TASK_IDX`
* Tasks that should primarily only write to 'disk'. They generally write to
the vbucket database files, for example when flushing the write queue.
* Auxilliary IO `AUXIO_TASK_IDX`
* Tasks that read and write 'disk', but not necessarily the vbucket data files.
* Non IO `NONIO_TASK_IDX`
* Tasks that do not perform 'disk' I/O.

### Utilise `snooze`

The snooze value of the task sets when the task should be executed. The initial snooze
value is set when constructing `GlobalTask`. A value of 0.0 means attempt to execute
the task as soon as scheduled and 5.0 would be 5 seconds from being scheduled
(scheduled meaning when `ExecutorPool::get()->schedule(...)` is called).

The `run()` function can also call `snooze(double snoozeAmount)` to set how long
before the task is rescheduled.

It is **best practice** for most tasks to actually do a sleep forever from their run function:

```
snooze(INT_MAX);
```

Using `INT_MAX` means sleep forever and tasks should always sleep until they have
real work todo. Tasks **should not periodically poll for work** with a snooze of
n seconds.

### Utilise `wake()`
When a task has work todo, some other function should be waking the task using the wake method.

```
ExecutorPool::get()->wake(myNewTaskId)`
```
4 changes: 2 additions & 2 deletions src/access_scanner.h
Expand Up @@ -34,9 +34,9 @@ class AccessScanner : public GlobalTask {
friend class AccessScannerValueChangeListener;
public:
AccessScanner(EventuallyPersistentStore &_store, EPStats &st,
const Priority &p, double sleeptime = 0,
double sleeptime = 0,
bool completeBeforeShutdown = false)
: GlobalTask(&_store.getEPEngine(), p, sleeptime,
: GlobalTask(&_store.getEPEngine(), TaskId::AccessScanner, sleeptime,
completeBeforeShutdown),
completedCount(0), store(_store), stats(st), sleepTime(sleeptime),
available(true) { }
Expand Down
3 changes: 1 addition & 2 deletions src/backfill.cc
Expand Up @@ -156,7 +156,6 @@ bool BackFillVisitor::visitBucket(RCPtr<VBucket> &vb) {
"Schedule a full backfill from disk for vbucket %d.", vb->getId());
ExTask task = new BackfillDiskLoad(name, engine, connMap,
underlying, vb->getId(), 0, connToken,
Priority::TapBgFetcherPriority,
0, false);
ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
}
Expand Down Expand Up @@ -210,6 +209,6 @@ bool BackFillVisitor::checkValidity() {

bool BackfillTask::run(void) {
engine->getEpStore()->visit(bfv, "Backfill task", NONIO_TASK_IDX,
Priority::BackfillTaskPriority, 1);
TaskId::BackfillVisitorTask, 1);
return false;
}
6 changes: 3 additions & 3 deletions src/backfill.h
Expand Up @@ -51,9 +51,9 @@ class BackfillDiskLoad : public GlobalTask {

BackfillDiskLoad(const std::string &n, EventuallyPersistentEngine* e,
TapConnMap &cm, KVStore *s, uint16_t vbid,
uint64_t start_seqno, hrtime_t token, const Priority &p,
uint64_t start_seqno, hrtime_t token,
double sleeptime = 0, bool shutdown = false)
: GlobalTask(e, p, sleeptime, shutdown),
: GlobalTask(e, TaskId::BackfillDiskLoad, sleeptime, shutdown),
name(n), engine(e), connMap(cm), store(s), vbucket(vbid),
startSeqno(start_seqno), connToken(token) {
ScheduleDiskBackfillTapOperation tapop;
Expand Down Expand Up @@ -123,7 +123,7 @@ class BackfillTask : public GlobalTask {

BackfillTask(EventuallyPersistentEngine *e, TapConnMap &cm, Producer *tc,
const VBucketFilter &backfillVBFilter):
GlobalTask(e, Priority::BackfillTaskPriority, 0, false),
GlobalTask(e, TaskId::BackfillTask, 0, false),
bfv(new BackFillVisitor(e, cm, tc, backfillVBFilter)), engine(e) {}

virtual ~BackfillTask() {}
Expand Down
3 changes: 1 addition & 2 deletions src/bgfetcher.cc
Expand Up @@ -31,8 +31,7 @@ void BgFetcher::start() {
bool inverse = false;
pendingFetch.compare_exchange_strong(inverse, true);
ExecutorPool* iom = ExecutorPool::get();
ExTask task = new BgFetcherTask(&(store->getEPEngine()), this,
Priority::BgFetcherPriority, false);
ExTask task = new MultiBGFetcherTask(&(store->getEPEngine()), this, false);
this->setTaskId(task->getId());
iom->schedule(task, READER_TASK_IDX);
cb_assert(taskId > 0);
Expand Down
2 changes: 1 addition & 1 deletion src/checkpoint_remover.cc
Expand Up @@ -91,7 +91,7 @@ bool ClosedUnrefCheckpointRemoverTask::run(void) {
shared_ptr<CheckpointVisitor> pv(new CheckpointVisitor(store, stats,
available));
store->visit(pv, "Checkpoint Remover", NONIO_TASK_IDX,
Priority::CheckpointRemoverPriority);
TaskId::ClosedUnrefCheckpointRemoverVisitorTask);
}
snooze(sleepTime);
return true;
Expand Down
2 changes: 1 addition & 1 deletion src/checkpoint_remover.h
Expand Up @@ -43,7 +43,7 @@ class ClosedUnrefCheckpointRemoverTask : public GlobalTask {
*/
ClosedUnrefCheckpointRemoverTask(EventuallyPersistentEngine *e,
EPStats &st, size_t interval) :
GlobalTask(e, Priority::CheckpointRemoverPriority, interval, false),
GlobalTask(e, TaskId::ClosedUnrefCheckpointRemoverTask, interval, false),
engine(e), stats(st), sleepTime(interval), available(true) {}

bool run(void);
Expand Down

0 comments on commit 6403bc0

Please sign in to comment.