Skip to content

Commit

Permalink
MB-7966 IOManager integration with KVShard
Browse files Browse the repository at this point in the history
Change-Id: I641dda9ea68bdade2f71b5805da439173dc3d678
Reviewed-on: http://review.couchbase.org/25955
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
Reviewed-by: Jin Lim <jin@couchbase.com>
Tested-by: Michael Wiederhold <mike@couchbase.com>
  • Loading branch information
abhinavdangeti authored and Michael Wiederhold committed May 7, 2013
1 parent 55347af commit 73dac32
Show file tree
Hide file tree
Showing 22 changed files with 1,217 additions and 465 deletions.
2 changes: 1 addition & 1 deletion Doxyfile
Expand Up @@ -574,7 +574,7 @@ WARN_LOGFILE =
# directories like "/usr/src/myproject". Separate the files or directories
# with spaces.

INPUT = src/ src/atomic src/blackhole-kvstore src/couch-kvstore
INPUT = src/ src/atomic src/blackhole-kvstore src/couch-kvstore src/iomanager

# This tag can be used to specify the character encoding of the source files
# that doxygen parses. Internally doxygen uses the UTF-8 encoding, which is
Expand Down
10 changes: 4 additions & 6 deletions Makefile.am
Expand Up @@ -65,6 +65,7 @@ ep_la_SOURCES = include/ep-engine/command_ids.h \
src/flusher.cc src/flusher.hh \
src/histo.hh \
src/htresizer.cc src/htresizer.hh \
src/iomanager/iomanager.cc src/iomanager/iomanager.h \
src/item.cc src/item.hh \
src/item_pager.cc src/item_pager.hh \
src/kvstore.hh \
Expand All @@ -75,28 +76,26 @@ ep_la_SOURCES = include/ep-engine/command_ids.h \
src/priority.cc src/priority.hh \
src/queueditem.cc src/queueditem.hh \
src/ringbuffer.hh \
src/scheduler.cc src/scheduler.h \
src/sizes.cc \
src/stats.hh \
src/stats-info.h src/stats-info.c \
src/statsnap.cc src/statsnap.hh \
src/statwriter.hh \
src/stored-value.cc src/stored-value.hh \
src/syncobject.hh \
src/tapconnection.cc src/tapconnection.hh \
src/tapconnmap.cc src/tapconnmap.hh \
src/tapthrottle.cc src/tapthrottle.hh \
src/tasks.cc src/tasks.h \
src/vbucket.cc src/vbucket.hh \
src/vbucketmap.cc src/vbucketmap.hh \
src/warmup.cc src/warmup.hh


libobjectregistry_la_CPPFLAGS = $(AM_CPPFLAGS)
libobjectregistry_la_SOURCES = src/objectregistry.cc src/objectregistry.hh

libkvstore_la_SOURCES = src/crc32.c src/crc32.h src/kvstore.cc src/kvstore.hh \
src/mutation_log.cc src/mutation_log.hh \
src/mutation_log_compactor.cc \
src/mutation_log_compactor.hh
src/mutation_log.cc src/mutation_log.hh
libkvstore_la_CPPFLAGS = $(AM_CPPFLAGS)

libblackhole_kvstore_la_CPPFLAGS = -I$(top_srcdir)/src/blackhole-kvstore \
Expand Down Expand Up @@ -159,7 +158,6 @@ ep_testsuite_la_DEPENDENCIES = libobjectregistry.la
check_PROGRAMS=\
atomic_ptr_test \
atomic_test \
checkpoint_test \
chunk_creation_test \
dispatcher_test \
hash_table_test \
Expand Down
44 changes: 24 additions & 20 deletions src/bgfetcher.cc
@@ -1,31 +1,35 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "config.h"
#include "ep.hh"
#include "iomanager/iomanager.h"
#include "kvshard.hh"

const double BgFetcher::sleepInterval = 1.0;

bool BgFetcherCallback::callback(Dispatcher &, TaskId &t) {
return bgfetcher->run(t);
}

void BgFetcher::start(Dispatcher *d) {
void BgFetcher::start() {
LockHolder lh(taskMutex);
if (d) {
dispatcher = d;
}
assert(dispatcher);
pendingFetch.cas(false, true);
dispatcher->schedule(shared_ptr<BgFetcherCallback>(new BgFetcherCallback(this)),
&task, Priority::BgFetcherPriority);
assert(task.get());
IOManager* iom = IOManager::get();
iom->scheduleMultiBGFetcher(&(store->getEPEngine()), this,
Priority::BgFetcherPriority,
shard->getId());
assert(taskId > 0);
}

void BgFetcher::stop() {
LockHolder lh(taskMutex);
assert(task.get());
pendingFetch.cas(true, false);
dispatcher->cancel(task);
assert(taskId > 0);
IOManager::get()->cancel(taskId);
}

void BgFetcher::notifyBGEvent(void) {
++stats.numRemainingBgJobs;
if (pendingFetch.cas(false, true)) {
LockHolder lh(taskMutex);
assert(taskId > 0);
IOManager::get()->wake(taskId);
}
}

void BgFetcher::doFetch(uint16_t vbId) {
Expand Down Expand Up @@ -101,8 +105,8 @@ void BgFetcher::clearItems(uint16_t vbId) {
}
}

bool BgFetcher::run(TaskId &tid) {
assert(tid.get());
bool BgFetcher::run(size_t tid) {
assert(tid > 0);
size_t num_fetched_items = 0;

pendingFetch.cas(true, false);
Expand All @@ -122,12 +126,12 @@ bool BgFetcher::run(TaskId &tid) {
if (!pendingFetch.get()) {
// wait a bit until next fetch request arrives
double sleep = std::max(store->getBGFetchDelay(), sleepInterval);
dispatcher->snooze(tid, sleep);
IOManager::get()->snooze(taskId, sleep);

if (pendingFetch.get()) {
// check again a new fetch request could have arrived
// right before calling above snooze()
dispatcher->snooze(tid, 0);
// check again a new fetch request could have arrived
// right before calling above snooze()
IOManager::get()->snooze(taskId, 0);
}
}
return true;
Expand Down
42 changes: 9 additions & 33 deletions src/bgfetcher.hh
Expand Up @@ -8,6 +8,8 @@

#include "common.hh"
#include "dispatcher.hh"
#include "item.hh"
#include "stats.hh"

const uint16_t MAX_BGFETCH_RETRY=5;

Expand Down Expand Up @@ -44,23 +46,6 @@ typedef unordered_map<uint64_t, std::list<VBucketBGFetchItem *> > vb_bgfetch_que

// Forward declaration.
class EventuallyPersistentStore;
class BgFetcher;

/**
* A DispatcherCallback for BgFetcher
*/
class BgFetcherCallback : public DispatcherCallback {
public:
BgFetcherCallback(BgFetcher *b) : bgfetcher(b) { }

bool callback(Dispatcher &d, TaskId &t);
std::string description() {
return std::string("Batching background fetch.");
}

private:
BgFetcher *bgfetcher;
};

class KVShard;

Expand All @@ -77,33 +62,24 @@ public:
* @param s the store
* @param d the dispatcher
*/
BgFetcher(EventuallyPersistentStore *s,
KVShard *k, Dispatcher *d, EPStats &st) :
store(s), shard(k), dispatcher(d), stats(st) {}
BgFetcher(EventuallyPersistentStore *s, KVShard *k, EPStats &st) :
store(s), shard(k), taskId(0), stats(st) {}

void start(Dispatcher *d = NULL);
void start(void);
void stop(void);
bool run(TaskId &tid);
bool run(size_t tid);
bool pendingJob(void);

void notifyBGEvent(void) {
++stats.numRemainingBgJobs;
if (pendingFetch.cas(false, true)) {
LockHolder lh(taskMutex);
assert(task.get());
dispatcher->wake(task);
}
}
void notifyBGEvent(void);
void setTaskId(size_t newId) { taskId = newId; }

private:
void doFetch(uint16_t vbId);
void clearItems(uint16_t vbId);

EventuallyPersistentStore *store;
KVShard *shard;
Dispatcher *dispatcher;
vb_bgfetch_queue_t items2fetch;
TaskId task;
size_t taskId;
Mutex taskMutex;
EPStats &stats;

Expand Down

0 comments on commit 73dac32

Please sign in to comment.