Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

MB-6796 Prioritize flushing pending vbuckets over regular vbuckets

This change supports the prioritization of pending vbuckets
(i.e., vbuckets whose active ownerships are currently being
transfered) over regular vbuckets on each destination node.

The flusher basically maintains the list of high priority
vbuckets ,which are explicitly requested by the cluster manager,
and interleaves persisting regular vbuckets with those high
priority vbuckets.

A command CMD_CHECKPOINT_PERSISTENCE (0xb1) is newly introduced
and uses a default binary protocol format whose message body
contains a checkpoint id (8 bytes) requested for persistence.

When the engine receives the above command from the cluster
manager, it registers the requested vbucket and its checkpoint
id into the flusher, so that the flusher can prioritize flusing
that vbucket over other vbuckets. When the requested checkpoint
is persisted, the flusher wakes up the blocked memcached
connection for the cluster manager and removes the requested
vbucket from its high priority vbucket list.

The timeout to a persistence request is set to 10 seconds
by default. If the timeout happens, the engine sends an TMP_FAIL
response to the cluster manger and removes the requested vbucket
from the high priority vbucket list.

Change-Id: I6173a25b81e146f1cb0643ac8f4c3799e086a718
Reviewed-on: http://review.couchbase.org/21523
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Jin Lim <jin@couchbase.com>
  • Loading branch information...
commit e9ce877041d101efee775e9c65ea6b9eef914926 1 parent 4f9646c
@chiyoung chiyoung authored Farshid Ghods committed
View
2  docs/stats.org
@@ -449,6 +449,7 @@ form to describe when time was spent doing various things:
| get_vb_cmd | servicing vbucket status requests |
| set_vb_cmd | servicing vbucket set state commands |
| del_vb_cmd | servicing vbucket deletion commands |
+| chk_persistence_cmd | waiting for checkpoint persistence |
| tap_vb_set | servicing tap vbucket set state commands |
| tap_vb_reset | servicing tap vbucket reset commands |
| tap_mutation | servicing tap mutations |
@@ -689,6 +690,7 @@ Reset Histograms:
| bg_wait |
| bg_tap_load |
| bg_tap_wait |
+| chk_persistence_cmd |
| data_age |
| del_vb_cmd |
| disk_insert |
View
5 include/ep-engine/command_ids.h
@@ -138,6 +138,11 @@
*/
#define CMD_CHANGE_VB_FILTER 0xb0
+/**
+ * Command to wait for the checkpoint persistence
+ */
+#define CMD_CHECKPOINT_PERSISTENCE 0xb1
+
/**
* TAP OPAQUE command list
View
339 src/ep.cc
@@ -299,7 +299,8 @@ EventuallyPersistentStore::EventuallyPersistentStore(EventuallyPersistentEngine
engine.getConfiguration().getAlogBlockSize()),
diskFlushAll(false),
tctx(stats, t, mutationLog),
- bgFetchDelay(0)
+ bgFetchDelay(0),
+ snapshotVBState(false)
{
getLogger()->log(EXTENSION_LOG_INFO, NULL,
"Storage props: c=%ld/r=%ld/rw=%ld\n",
@@ -896,6 +897,10 @@ void EventuallyPersistentStore::snapshotVBuckets(const Priority &priority) {
if (priority == Priority::VBucketPersistHighPriority) {
vbuckets.setHighPriorityVbSnapshotFlag(false);
+ size_t numVBs = vbuckets.getSize();
+ for (size_t i = 0; i < numVBs; ++i) {
+ vbuckets.setBucketCreation(static_cast<uint16_t>(i), false);
+ }
} else {
vbuckets.setLowPriorityVbSnapshotFlag(false);
}
@@ -938,6 +943,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
return ENGINE_ERANGE;
}
vbuckets.setPersistenceCheckpointId(vbid, 0);
+ vbuckets.setBucketCreation(vbid, true);
lh.unlock();
scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
}
@@ -945,6 +951,7 @@ ENGINE_ERROR_CODE EventuallyPersistentStore::setVBucketState(uint16_t vbid,
}
void EventuallyPersistentStore::scheduleVBSnapshot(const Priority &p) {
+ snapshotVBState = false;
if (p == Priority::VBucketPersistHighPriority) {
if (!vbuckets.setHighPriorityVbSnapshotFlag(true)) {
return;
@@ -1688,16 +1695,54 @@ void EventuallyPersistentStore::reset() {
}
if (diskFlushAll.cas(false, true)) {
// Increase the write queue size by 1 as flusher will execute flush_all as a single task.
- stats.queue_size.set(getWriteQueueSize() + 1);
+ stats.queue_size.set(incomingQueueSize() + 1);
}
}
bool EventuallyPersistentStore::diskQueueEmpty() {
- return !hasItemsForPersistence() && writing.empty() && !diskFlushAll;
+ if (diskFlushAll) {
+ return false;
+ }
+
+ bool hasItems = false;
+ size_t numOfVBuckets = vbuckets.getSize();
+ assert(numOfVBuckets <= std::numeric_limits<uint16_t>::max());
+ for (size_t i = 0; i < numOfVBuckets; ++i) {
+ uint16_t vbid = static_cast<uint16_t>(i);
+ // Check the outgoing queue first.
+ vb_flush_queue_t::iterator iter = writingQueues.find(vbid);
+ if (iter != writingQueues.end() && !iter->second.empty()) {
+ hasItems = true;
+ break;
+ }
+ // Check the incoming queue.
+ RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
+ if (vb && (vb->getState() != vbucket_state_dead)) {
+ if (vb->checkpointManager.hasNextForPersistence() ||
+ vb->getBackfillSize() > 0) {
+ hasItems = true;
+ break;
+ }
+ }
+ }
+
+ return !hasItems;
}
-std::queue<queued_item>* EventuallyPersistentStore::beginFlush() {
- std::queue<queued_item> *rv(NULL);
+bool EventuallyPersistentStore::outgoingQueueEmpty() {
+ bool hasItems = false;
+ vb_flush_queue_t::iterator iter = writingQueues.begin();
+ for (; iter != writingQueues.end(); ++iter) {
+ if (!iter->second.empty()) {
+ hasItems = true;
+ break;
+ }
+ }
+ return !hasItems;
+}
+
+vb_flush_queue_t* EventuallyPersistentStore::beginFlush() {
+ vb_flush_queue_t *rv(NULL);
if (diskQueueEmpty()) {
// If the persistence queue is empty, reset queue-related stats for each vbucket.
@@ -1720,26 +1765,36 @@ std::queue<queued_item>* EventuallyPersistentStore::beginFlush() {
vbuckets.setPersistenceCheckpointId(vbid, chkid);
schedule_vb_snapshot = true;
}
+ HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
+ if (vb_entry.cookie) {
+ if (vb_entry.checkpoint <= chkid) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_SUCCESS);
+ flusher->removeHighPriorityVBucket(vbid);
+ hrtime_t wall_time(gethrtime() - vb_entry.start);
+ stats.chkPersistenceHisto.add(wall_time / 1000);
+ } else {
+ size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
+ if (spent > MAX_CHECKPOINT_PERSISTENCE_TIME) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
+ flusher->removeHighPriorityVBucket(vbid);
+ }
+ }
+ }
}
}
// Schedule the vbucket state snapshot task to record the latest checkpoint Id
// that was successfully persisted for each vbucket.
- if (schedule_vb_snapshot) {
+ if (schedule_vb_snapshot || snapshotVBState) {
scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
}
} else {
assert(rwUnderlying);
- if (diskFlushAll) {
- queued_item qi(new QueuedItem("", 0xffff, queue_op_flush));
- writing.push(qi);
- stats.memOverhead.incr(sizeof(queued_item));
- assert(stats.memOverhead.get() < GIGANTOR);
- }
+ size_t num_items = 0;
std::vector<queued_item> item_list;
item_list.reserve(getTxnSize());
- const std::vector<int> vblist = vbuckets.getBucketsSortedByState();
+ const std::vector<int> vblist = vbuckets.getBuckets();
std::vector<int>::const_iterator itr;
for (itr = vblist.begin(); itr != vblist.end(); ++itr) {
uint16_t vbid = static_cast<uint16_t>(*itr);
@@ -1747,6 +1802,7 @@ std::queue<queued_item>* EventuallyPersistentStore::beginFlush() {
if (!vb) {
// Undefined vbucket..
+ writingQueues.erase(vbid);
continue;
}
@@ -1755,24 +1811,32 @@ std::queue<queued_item>* EventuallyPersistentStore::beginFlush() {
// Get all dirty items from the checkpoint.
vb->checkpointManager.getAllItemsForPersistence(item_list);
if (item_list.size() > 0) {
- pushToOutgoingQueue(item_list);
+ num_items += pushToOutgoingQueue(item_list, vbid);
}
}
- size_t queue_size = getWriteQueueSize();
- stats.flusher_todo.set(writing.size());
+ size_t queue_size = incomingQueueSize();
+ stats.flusher_todo.set(num_items);
stats.queue_size.set(queue_size);
getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
"Flushing %ld items with %ld still in queue\n",
- writing.size(), queue_size);
- rv = &writing;
+ num_items, queue_size);
+ rv = &writingQueues;
}
return rv;
}
-void EventuallyPersistentStore::pushToOutgoingQueue(std::vector<queued_item> &items) {
+size_t EventuallyPersistentStore::pushToOutgoingQueue(std::vector<queued_item> &items,
+ uint16_t vbid) {
size_t num_items = 0;
rwUnderlying->optimizeWrites(items);
+ vb_flush_queue_t::iterator qit = writingQueues.find(vbid);
+ if (qit == writingQueues.end()) {
+ writingQueues.insert(std::make_pair(vbid, std::queue<queued_item>()));
+ qit = writingQueues.find(vbid);
+ }
+
+ std::queue<queued_item> &writing = qit->second;
std::vector<queued_item>::iterator it = items.begin();
for(; it != items.end(); ++it) {
if (writing.empty() || writing.back()->getKey() != (*it)->getKey()) {
@@ -1789,107 +1853,173 @@ void EventuallyPersistentStore::pushToOutgoingQueue(std::vector<queued_item> &it
items.clear();
stats.memOverhead.incr(num_items * sizeof(queued_item));
assert(stats.memOverhead.get() < GIGANTOR);
-}
-
-void EventuallyPersistentStore::requeueRejectedItems(std::queue<queued_item> *rej) {
- size_t queue_size = rej->size();
- // Requeue the rejects.
- while (!rej->empty()) {
- writing.push(rej->front());
- rej->pop();
- }
- stats.memOverhead.incr(queue_size * sizeof(queued_item));
- assert(stats.memOverhead.get() < GIGANTOR);
- stats.queue_size.set(getWriteQueueSize());
- stats.flusher_todo.set(writing.size());
+ return num_items;
}
void EventuallyPersistentStore::completeFlush(rel_time_t flush_start) {
- size_t numOfVBuckets = vbuckets.getSize();
- bool schedule_vb_snapshot = false;
- assert(numOfVBuckets <= std::numeric_limits<uint16_t>::max());
- for (size_t i = 0; i < numOfVBuckets; ++i) {
- uint16_t vbid = static_cast<uint16_t>(i);
- RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
- if (!vb || vb->getState() == vbucket_state_dead) {
- continue;
- }
- uint64_t pcursor_chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
- if (pcursor_chkid > 0 &&
- pcursor_chkid != vbuckets.getPersistenceCheckpointId(vbid)) {
- vbuckets.setPersistenceCheckpointId(vbid, pcursor_chkid);
- schedule_vb_snapshot = true;
- }
- }
-
// Schedule the vbucket state snapshot task to record the latest checkpoint Id
// that was successfully persisted for each vbucket.
- if (schedule_vb_snapshot) {
+ if (snapshotVBState) {
scheduleVBSnapshot(Priority::VBucketPersistHighPriority);
}
- stats.flusher_todo.set(writing.size());
- stats.queue_size.set(getWriteQueueSize());
+ stats.flusher_todo.set(0);
+ stats.queue_size.set(incomingQueueSize());
rel_time_t complete_time = ep_current_time();
stats.cumulativeFlushTime.incr(complete_time - flush_start);
}
-int EventuallyPersistentStore::flushSome(std::queue<queued_item> *q,
- std::queue<queued_item> *rejectQueue) {
+int EventuallyPersistentStore::flushOutgoingQueue(vb_flush_queue_t *flushQueue,
+ size_t phase) {
+ if (diskFlushAll) {
+ flushOneDeleteAll(); // Reset the database.
+ }
+
+ size_t iteration = 0;
+ int oldest = stats.min_data_age;
+ vb_flush_queue_t::iterator vit = flushQueue->begin();
+ for (; vit != flushQueue->end(); ++vit) {
+ uint16_t vbid = vit->first;
+ std::queue<queued_item> &vb_queue = vit->second;
+ // Interleave regular and high priority vbuckets.
+ size_t priority_vbs = flusher->getNumOfHighPriorityVBs();
+ if (phase == 1 && priority_vbs != 0 && (iteration % priority_vbs) == 0) {
+ oldest = flushHighPriorityVBQueue(flushQueue, oldest);
+ }
+ oldest = flushVBQueue(vb_queue, vbid, oldest);
+ ++iteration;
+ }
+ return oldest;
+}
+
+int EventuallyPersistentStore::flushVBQueue(std::queue<queued_item> &vb_queue,
+ uint16_t vbid,
+ int data_age) {
+ int oldest = data_age;
+
+ if (vb_queue.empty()) {
+ HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
+ size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
+ if (vb_entry.cookie && spent > MAX_CHECKPOINT_PERSISTENCE_TIME) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
+ flusher->removeHighPriorityVBucket(vbid);
+ }
+ return oldest;
+ }
+
if (!tctx.enter()) {
++stats.beginFailed;
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Failed to start a transaction.\n");
- // Copy the input queue into the reject queue.
- while (!q->empty()) {
- rejectQueue->push(q->front());
- q->pop();
- }
- return 1; // This will cause us to jump out and delay a second
+ return oldest;
}
- int tsz = tctx.getTxnSize();
- int oldest = stats.min_data_age;
- int completed(0);
- for (; completed < tsz && !q->empty(); ++completed) {
- int n = flushOne(q, rejectQueue);
+
+ std::queue<queued_item> rejectQueue;
+ while (!vb_queue.empty()) {
+ int n = flushOne(vb_queue, rejectQueue);
if (n != 0 && n < oldest) {
oldest = n;
}
}
tctx.commit();
+
+ bool notified = false;
+ HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
+ RCPtr<VBucket> vb = getVBucket(vbid);
+ if (vb) {
+ if (rejectQueue.empty()) {
+ uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
+ if (chkid > 0 && chkid != vbuckets.getPersistenceCheckpointId(vbid)) {
+ vbuckets.setPersistenceCheckpointId(vbid, chkid);
+ snapshotVBState = true;
+ }
+ if (vb_entry.checkpoint <= chkid && vb_entry.cookie) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_SUCCESS);
+ flusher->removeHighPriorityVBucket(vbid);
+ notified = true;
+ hrtime_t wall_time(gethrtime() - vb_entry.start);
+ stats.chkPersistenceHisto.add(wall_time / 1000);
+ }
+ } else {
+ size_t qsize = rejectQueue.size();
+ // Requeue the rejects.
+ while (!rejectQueue.empty()) {
+ vb_queue.push(rejectQueue.front());
+ rejectQueue.pop();
+ }
+ stats.memOverhead.incr(qsize * sizeof(queued_item));
+ assert(stats.memOverhead.get() < GIGANTOR);
+ stats.flusher_todo.incr(qsize);
+ }
+ }
+
+ if (!notified && vb_entry.cookie) {
+ size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
+ if (spent > MAX_CHECKPOINT_PERSISTENCE_TIME) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
+ flusher->removeHighPriorityVBucket(vbid);
+ }
+ }
+
return oldest;
}
-size_t EventuallyPersistentStore::getWriteQueueSize(void) {
- size_t size = 0;
- size_t numOfVBuckets = vbuckets.getSize();
- assert(numOfVBuckets <= std::numeric_limits<uint16_t>::max());
- for (size_t i = 0; i < numOfVBuckets; ++i) {
- uint16_t vbid = static_cast<uint16_t>(i);
+int EventuallyPersistentStore::flushHighPriorityVBQueue(vb_flush_queue_t *flushQueue,
+ int data_age) {
+ int oldest = data_age;
+ size_t num_items = 0;
+
+ std::vector<uint16_t> vblist;
+ flusher->getAllHighPriorityVBuckets(vblist);
+ std::vector<uint16_t>::iterator it = vblist.begin();
+ for (; it != vblist.end(); ++it) {
+ uint16_t vbid = *it;
RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
- if (vb && (vb->getState() != vbucket_state_dead)) {
- size += vb->checkpointManager.getNumItemsForPersistence() + vb->getBackfillSize();
+ if (!vb) {
+ continue;
+ }
+
+ vb_flush_queue_t::iterator vit = flushQueue->find(vbid);
+ if (vit != flushQueue->end()) {
+ std::queue<queued_item> &vb_queue = vit->second;
+ oldest = flushVBQueue(vb_queue, vbid, oldest);
+ // vbucket outgoing queue is empty.
+ if (vb_queue.empty()) {
+ std::vector<queued_item> item_list;
+ // Grab all the backfill items if exist.
+ vb->getBackfillItems(item_list);
+ // Get all dirty items from the checkpoint.
+ vb->checkpointManager.getAllItemsForPersistence(item_list);
+ if (item_list.size() > 0) {
+ num_items += pushToOutgoingQueue(item_list, vbid);
+ }
+ }
}
}
- return size;
+
+ if (num_items > 0) {
+ if (stats.queue_size > num_items) {
+ stats.queue_size.decr(num_items);
+ } else {
+ stats.queue_size.set(0);
+ }
+ stats.flusher_todo.incr(num_items);
+ }
+ return oldest;
}
-bool EventuallyPersistentStore::hasItemsForPersistence(void) {
- bool hasItems = false;
+size_t EventuallyPersistentStore::incomingQueueSize(void) {
+ size_t size = 0;
size_t numOfVBuckets = vbuckets.getSize();
assert(numOfVBuckets <= std::numeric_limits<uint16_t>::max());
for (size_t i = 0; i < numOfVBuckets; ++i) {
uint16_t vbid = static_cast<uint16_t>(i);
RCPtr<VBucket> vb = vbuckets.getBucket(vbid);
if (vb && (vb->getState() != vbucket_state_dead)) {
- if (vb->checkpointManager.hasNextForPersistence() ||
- vb->getBackfillSize() > 0) {
- hasItems = true;
- break;
- }
+ size += vb->checkpointManager.getNumItemsForPersistence() + vb->getBackfillSize();
}
}
- return hasItems;
+ return size;
}
/**
@@ -1903,13 +2033,12 @@ class PersistenceCallback : public Callback<mutation_result>,
public Callback<int> {
public:
- PersistenceCallback(const queued_item &qi, std::queue<queued_item> *q,
+ PersistenceCallback(const queued_item &qi, std::queue<queued_item> &q,
EventuallyPersistentStore *st, MutationLog *ml,
rel_time_t d, EPStats *s, uint64_t c) :
queuedItem(qi), rq(q), store(st), mutationLog(ml),
dirtied(d), stats(s), cas(c) {
- assert(rq);
assert(s);
}
@@ -2032,11 +2161,11 @@ class PersistenceCallback : public Callback<mutation_result>,
queuedItem->getVBucketId(),
&StoredValue::reDirty,
dirtied);
- rq->push(queuedItem);
+ rq.push(queuedItem);
}
const queued_item queuedItem;
- std::queue<queued_item> *rq;
+ std::queue<queued_item> &rq;
EventuallyPersistentStore *store;
MutationLog *mutationLog;
rel_time_t dirtied;
@@ -2064,7 +2193,7 @@ int EventuallyPersistentStore::flushOneDeleteAll() {
// still a bit better off running the older code that figures it out
// based on what's in memory.
int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
- std::queue<queued_item> *rejectQueue) {
+ std::queue<queued_item> &rejectQueue) {
RCPtr<VBucket> vb = getVBucket(qi->getVBucketId());
if (!vb) {
@@ -2133,19 +2262,19 @@ int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
} else {
isDirty = false;
v->reDirty(dirtied);
- rejectQueue->push(qi);
+ rejectQueue.push(qi);
++vb->opsReject;
}
}
if (isDirty && !deleted) {
if (!vbuckets.isBucketDeletion(qi->getVBucketId())) {
- // If a vbucket snapshot task with the high priority is currently scheduled,
- // requeue the persistence task and wait until the snapshot task is completed.
- if (vbuckets.isHighPriorityVbSnapshotScheduled()) {
+ // Wait until the vbucket database is created by the vbucket state
+ // snapshot task.
+ if (vbuckets.isBucketCreation(qi->getVBucketId())) {
v->clearPendingId();
lh.unlock();
- rejectQueue->push(qi);
+ rejectQueue.push(qi);
++vb->opsReject;
} else {
assert(rowid == v->getId());
@@ -2172,20 +2301,19 @@ int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
}
} else if (deleted || !found) {
if (!vbuckets.isBucketDeletion(qi->getVBucketId())) {
- if (vbuckets.isHighPriorityVbSnapshotScheduled()) {
- // wait until the high priority snapshop task is complete
+ if (vbuckets.isBucketCreation(qi->getVBucketId())) {
if (found) {
v->clearPendingId();
}
lh.unlock();
- rejectQueue->push(qi);
+ rejectQueue.push(qi);
++vb->opsReject;
} else {
lh.unlock();
BlockTimer timer(&stats.diskDelHisto, "disk_delete", stats.timingLog);
PersistenceCallback *cb;
cb = new PersistenceCallback(qi, rejectQueue, this, &mutationLog,
- dirtied, &stats, 0);
+ dirtied, &stats, 0);
tctx.addCallback(cb);
rwUnderlying->del(itm, rowid, *cb);
}
@@ -2195,32 +2323,27 @@ int EventuallyPersistentStore::flushOneDelOrSet(const queued_item &qi,
return ret;
}
-int EventuallyPersistentStore::flushOne(std::queue<queued_item> *q,
- std::queue<queued_item> *rejectQueue) {
+int EventuallyPersistentStore::flushOne(std::queue<queued_item> &queue,
+ std::queue<queued_item> &rejectQueue) {
- queued_item qi = q->front();
- q->pop();
+ queued_item qi = queue.front();
+ queue.pop();
stats.memOverhead.decr(sizeof(queued_item));
assert(stats.memOverhead.get() < GIGANTOR);
int rv = 0;
switch (qi->getOperation()) {
- case queue_op_flush:
- rv = flushOneDeleteAll();
- break;
case queue_op_set:
+ case queue_op_del:
{
- size_t prevRejectCount = rejectQueue->size();
+ size_t prevRejectCount = rejectQueue.size();
rv = flushOneDelOrSet(qi, rejectQueue);
- if (rejectQueue->size() == prevRejectCount) {
+ if (rejectQueue.size() == prevRejectCount) {
// flush operation was not rejected
tctx.addUncommittedItem(qi);
}
}
break;
- case queue_op_del:
- rv = flushOneDelOrSet(qi, rejectQueue);
- break;
case queue_op_commit:
tctx.commit();
tctx.enter();
View
44 src/ep.hh
@@ -104,8 +104,7 @@ protected:
RCPtr<VBucket> currentBucket;
};
-typedef std::pair<int64_t, int64_t> chunk_range_t;
-typedef std::list<chunk_range_t>::iterator chunk_range_iterator_t;
+typedef std::map<uint16_t, std::queue<queued_item> > vb_flush_queue_t;
// Forward declaration
class Flusher;
@@ -780,27 +779,33 @@ private:
return v != NULL;
}
+ /**
+ * Return true if both incoming and outgoing queues are empty
+ */
bool diskQueueEmpty();
- std::queue<queued_item> *beginFlush();
- void pushToOutgoingQueue(std::vector<queued_item> &items);
- void requeueRejectedItems(std::queue<queued_item> *rejects);
+ /**
+ * Return true if the outgoing queues are empty
+ */
+ bool outgoingQueueEmpty();
+
+ vb_flush_queue_t* beginFlush();
+ size_t pushToOutgoingQueue(std::vector<queued_item> &items, uint16_t vbid);
void completeFlush(rel_time_t flush_start);
- int flushSome(std::queue<queued_item> *q,
- std::queue<queued_item> *rejectQueue);
- int flushOne(std::queue<queued_item> *q,
- std::queue<queued_item> *rejectQueue);
+ int flushOutgoingQueue(vb_flush_queue_t *queue, size_t phase);
+ int flushHighPriorityVBQueue(vb_flush_queue_t *queue, int data_age);
+ int flushVBQueue(std::queue<queued_item> &vb_queue, uint16_t vbid, int data_age);
+ int flushOne(std::queue<queued_item> &queue,
+ std::queue<queued_item> &rejectQueue);
int flushOneDeleteAll(void);
- int flushOneDelOrSet(const queued_item &qi, std::queue<queued_item> *rejectQueue);
+ int flushOneDelOrSet(const queued_item &qi, std::queue<queued_item> &rejectQueue);
StoredValue *fetchValidValue(RCPtr<VBucket> &vb, const std::string &key,
int bucket_num, bool wantsDeleted=false,
bool trackReference=true, bool queueExpired=true);
- size_t getWriteQueueSize(void);
-
- bool hasItemsForPersistence(void);
+ size_t incomingQueueSize(void);
GetValue getInternal(const std::string &key, uint16_t vbucket,
const void *cookie, bool queueBG,
@@ -845,12 +850,12 @@ private:
// track of the objects it works on. It should _not_ be used
// by any other threads (because the flusher use it without
// locking...
- std::queue<queued_item> writing;
- Atomic<size_t> bgFetchQueue;
- Atomic<bool> diskFlushAll;
- TransactionContext tctx;
- Mutex vbsetMutex;
- uint32_t bgFetchDelay;
+ vb_flush_queue_t writingQueues;
+ Atomic<size_t> bgFetchQueue;
+ Atomic<bool> diskFlushAll;
+ TransactionContext tctx;
+ Mutex vbsetMutex;
+ uint32_t bgFetchDelay;
struct ExpiryPagerDelta {
ExpiryPagerDelta() : sleeptime(0) {}
Mutex mutex;
@@ -875,6 +880,7 @@ private:
size_t itemExpiryWindow;
size_t vbDelChunkSize;
size_t vbChunkDelThresholdTime;
+ Atomic<bool> snapshotVBState;
DISALLOW_COPY_AND_ASSIGN(EventuallyPersistentStore);
};
View
34 src/ep_engine.cc
@@ -904,6 +904,7 @@ extern "C" {
case CMD_LAST_CLOSED_CHECKPOINT:
case CMD_CREATE_CHECKPOINT:
case CMD_EXTEND_CHECKPOINT:
+ case CMD_CHECKPOINT_PERSISTENCE:
{
rv = h->handleCheckpointCmds(cookie, request, response);
return rv;
@@ -3171,6 +3172,8 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doTimingStats(const void *cookie,
add_casted_stat("get_vb_cmd", stats.getVbucketCmdHisto, add_stat, cookie);
add_casted_stat("set_vb_cmd", stats.setVbucketCmdHisto, add_stat, cookie);
add_casted_stat("del_vb_cmd", stats.delVbucketCmdHisto, add_stat, cookie);
+ add_casted_stat("chk_persistence_cmd", stats.chkPersistenceHisto,
+ add_stat, cookie);
// Tap commands
add_casted_stat("tap_vb_set", stats.tapVbucketSetHisto, add_stat, cookie);
add_casted_stat("tap_vb_reset", stats.tapVbucketResetHisto, add_stat, cookie);
@@ -3664,6 +3667,37 @@ EventuallyPersistentEngine::handleCheckpointCmds(const void *cookie,
}
}
break;
+ case CMD_CHECKPOINT_PERSISTENCE:
+ {
+ uint16_t keylen = ntohs(req->request.keylen);
+ uint32_t bodylen = ntohl(req->request.bodylen);
+ if ((bodylen - keylen) == 0) {
+ msg << "No checkpoint id is given for CMD_CHECKPOINT_PERSISTENCE!!!";
+ status = PROTOCOL_BINARY_RESPONSE_EINVAL;
+ } else {
+ uint64_t chk_id;
+ memcpy(&chk_id, req->bytes + sizeof(req->bytes) + keylen,
+ bodylen - keylen);
+ chk_id = ntohll(chk_id);
+ void *es = getEngineSpecific(cookie);
+ if (!es) {
+ uint16_t persisted_chk_id =
+ epstore->getVBuckets().getPersistenceCheckpointId(vbucket);
+ if (chk_id > persisted_chk_id) {
+ Flusher *flusher = const_cast<Flusher *>(epstore->getFlusher());
+ flusher->addHighPriorityVBucket(vbucket, chk_id, cookie);
+ storeEngineSpecific(cookie, this);
+ return ENGINE_EWOULDBLOCK;
+ }
+ } else {
+ storeEngineSpecific(cookie, NULL);
+ getLogger()->log(EXTENSION_LOG_INFO, cookie,
+ "Checkpoint %llu persisted for vbucket %d.",
+ chk_id, vbucket);
+ }
+ }
+ }
+ break;
default:
{
msg << "Unknown checkpoint command opcode: " << req->request.opcode;
View
68 src/flusher.cc
@@ -207,7 +207,7 @@ void Flusher::completeFlush() {
}
double Flusher::computeMinSleepTime() {
- if (flushQueue && !flushQueue->empty()) {
+ if (!store->outgoingQueueEmpty()) {
flushRv = 0;
prevFlushRv = 0;
return 0.0;
@@ -234,38 +234,60 @@ int Flusher::doFlush() {
if (flushQueue) {
getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
"Beginning a write queue flush.\n");
- rejectQueue = new std::queue<queued_item>();
flushStart = ep_current_time();
+ flushPhase = 1;
}
}
// Now do the every pass thing.
if (flushQueue) {
- if (!flushQueue->empty()) {
- int n = store->flushSome(flushQueue, rejectQueue);
- if (_state == pausing) {
- transition_state(paused);
- }
- flushRv = std::min(n, flushRv);
+ int n = store->flushOutgoingQueue(flushQueue, flushPhase++);
+ if (_state == pausing) {
+ transition_state(paused);
}
-
- if (flushQueue->empty()) {
- if (!rejectQueue->empty()) {
- // Requeue the rejects.
- store->requeueRejectedItems(rejectQueue);
- } else {
- store->completeFlush(flushStart);
- getLogger()->log(EXTENSION_LOG_INFO, NULL,
- "Completed a flush, age of oldest item was %ds\n",
- flushRv);
-
- delete rejectQueue;
- rejectQueue = NULL;
- flushQueue = NULL;
- }
+ flushRv = std::min(n, flushRv);
+
+ if (store->outgoingQueueEmpty()) {
+ store->completeFlush(flushStart);
+ getLogger()->log(EXTENSION_LOG_INFO, NULL,
+ "Completed a flush, age of oldest item was %ds\n",
+ flushRv);
+ flushQueue = NULL;
}
}
return flushRv;
}
+void Flusher::addHighPriorityVBucket(uint16_t vbid, uint64_t chkid,
+ const void *cookie) {
+ LockHolder lh(priorityVBMutex);
+ priorityVBList[vbid] = HighPriorityVBEntry(cookie, chkid);
+}
+
+void Flusher::removeHighPriorityVBucket(uint16_t vbid) {
+ LockHolder lh(priorityVBMutex);
+ priorityVBList.erase(vbid);
+}
+
+void Flusher::getAllHighPriorityVBuckets(std::vector<uint16_t> &vbs) {
+ LockHolder lh(priorityVBMutex);
+ std::map<uint16_t, HighPriorityVBEntry>::iterator it = priorityVBList.begin();
+ for (; it != priorityVBList.end(); ++it) {
+ vbs.push_back(it->first);
+ }
+}
+
+HighPriorityVBEntry Flusher::getHighPriorityVBEntry(uint16_t vbid) {
+ LockHolder lh(priorityVBMutex);
+ std::map<uint16_t, HighPriorityVBEntry>::iterator it = priorityVBList.find(vbid);
+ if (it != priorityVBList.end()) {
+ return it->second;
+ } else {
+ return HighPriorityVBEntry(NULL, 0);
+ }
+}
+
+size_t Flusher::getNumOfHighPriorityVBs() {
+ return priorityVBList.size();
+}
View
35 src/flusher.hh
@@ -19,6 +19,7 @@ enum flusher_state {
class Flusher;
const double DEFAULT_MIN_SLEEP_TIME = 0.1;
+const size_t MAX_CHECKPOINT_PERSISTENCE_TIME = 10; // 10 sec.
/**
* A DispatcherCallback adaptor over Flusher.
@@ -42,6 +43,16 @@ private:
Flusher *flusher;
};
+struct HighPriorityVBEntry {
+ HighPriorityVBEntry() :
+ cookie(NULL), checkpoint(0), start(gethrtime()) { }
+ HighPriorityVBEntry(const void *c, uint64_t chk) :
+ cookie(c), checkpoint(chk), start(gethrtime()) { }
+
+ const void *cookie;
+ uint64_t checkpoint;
+ hrtime_t start;
+};
/**
* Manage persistence of data for an EventuallyPersistentStore.
@@ -52,8 +63,8 @@ public:
Flusher(EventuallyPersistentStore *st, Dispatcher *d) :
store(st), _state(initializing), dispatcher(d),
flushRv(0), prevFlushRv(0), minSleepTime(0.1),
- flushQueue(NULL), rejectQueue(NULL),
- forceShutdownReceived(false) {
+ flushQueue(NULL),
+ forceShutdownReceived(false), flushPhase(0) {
}
~Flusher() {
@@ -63,12 +74,6 @@ public:
stateName(_state));
}
- if (rejectQueue != NULL) {
- getLogger()->log(EXTENSION_LOG_WARNING, NULL,
- "Flusher being destroyed with %ld tasks in the reject queue\n",
- rejectQueue->size());
- delete rejectQueue;
- }
}
bool stop(bool isForceShutdown = false);
@@ -85,6 +90,13 @@ public:
enum flusher_state state() const;
const char * stateName() const;
+ void addHighPriorityVBucket(uint16_t vbid, uint64_t chkid,
+ const void *cookie);
+ void removeHighPriorityVBucket(uint16_t vbid);
+ void getAllHighPriorityVBuckets(std::vector<uint16_t> &vbs);
+ HighPriorityVBEntry getHighPriorityVBEntry(uint16_t vbid);
+ size_t getNumOfHighPriorityVBs();
+
private:
bool transition_state(enum flusher_state to);
int doFlush();
@@ -104,12 +116,15 @@ private:
int flushRv;
int prevFlushRv;
double minSleepTime;
- std::queue<queued_item> *flushQueue;
- std::queue<queued_item> *rejectQueue;
+ vb_flush_queue_t *flushQueue;
rel_time_t flushStart;
Atomic<bool> forceShutdownReceived;
+ Mutex priorityVBMutex;
+ std::map<uint16_t, HighPriorityVBEntry> priorityVBList;
+ size_t flushPhase;
+
DISALLOW_COPY_AND_ASSIGN(Flusher);
};
View
4 src/stats.hh
@@ -320,6 +320,9 @@ public:
//! Histogram of get_stats commands.
Histogram<hrtime_t> getStatsCmdHisto;
+ //! Histogram of wait_for_checkpoint_persistence command
+ Histogram<hrtime_t> chkPersistenceHisto;
+
//
// DB timers.
//
@@ -406,6 +409,7 @@ public:
tapVbucketSetHisto.reset();
notifyIOHisto.reset();
getStatsCmdHisto.reset();
+ chkPersistenceHisto.reset();
diskInsertHisto.reset();
diskUpdateHisto.reset();
diskDelHisto.reset();
View
6 src/vbucket.hh
@@ -170,7 +170,11 @@ public:
// Get age sum in millisecond
uint64_t getQueueAge() {
- return (ep_current_time() * dirtyQueueSize - dirtyQueueAge) * 1000;
+ rel_time_t currentAge = ep_current_time() * dirtyQueueSize;
+ if (currentAge < dirtyQueueAge) {
+ return 0;
+ }
+ return (currentAge - dirtyQueueAge) * 1000;
}
void fireAllOps(EventuallyPersistentEngine &engine);
View
15 src/vbucketmap.cc
@@ -6,6 +6,7 @@
VBucketMap::VBucketMap(Configuration &config) :
buckets(new RCPtr<VBucket>[config.getMaxVbuckets()]),
bucketDeletion(new Atomic<bool>[config.getMaxVbuckets()]),
+ bucketCreation(new Atomic<bool>[config.getMaxVbuckets()]),
persistenceCheckpointIds(new Atomic<uint64_t>[config.getMaxVbuckets()]),
size(config.getMaxVbuckets())
{
@@ -13,6 +14,7 @@ VBucketMap::VBucketMap(Configuration &config) :
lowPriorityVbSnapshot.set(false);
for (size_t i = 0; i < size; ++i) {
bucketDeletion[i].set(false);
+ bucketCreation[i].set(false);
persistenceCheckpointIds[i].set(0);
}
}
@@ -20,6 +22,7 @@ VBucketMap::VBucketMap(Configuration &config) :
VBucketMap::~VBucketMap() {
delete[] buckets;
delete[] bucketDeletion;
+ delete[] bucketCreation;
delete[] persistenceCheckpointIds;
}
@@ -93,7 +96,17 @@ bool VBucketMap::setBucketDeletion(uint16_t id, bool delBucket) {
return bucketDeletion[id].cas(!delBucket, delBucket);
}
-uint64_t VBucketMap::getPersistenceCheckpointId(uint16_t id) {
+bool VBucketMap::isBucketCreation(uint16_t id) const {
+ assert(id < size);
+ return bucketCreation[id].get();
+}
+
+bool VBucketMap::setBucketCreation(uint16_t id, bool rv) {
+ assert(id < size);
+ return bucketCreation[id].cas(!rv, rv);
+}
+
+uint64_t VBucketMap::getPersistenceCheckpointId(uint16_t id) const {
assert(id < size);
return persistenceCheckpointIds[id].get();
}
View
6 src/vbucketmap.hh
@@ -22,7 +22,9 @@ public:
std::vector<int> getBucketsSortedByState(void) const;
bool isBucketDeletion(uint16_t id) const;
bool setBucketDeletion(uint16_t id, bool delBucket);
- uint64_t getPersistenceCheckpointId(uint16_t id);
+ bool isBucketCreation(uint16_t id) const;
+ bool setBucketCreation(uint16_t id, bool rv);
+ uint64_t getPersistenceCheckpointId(uint16_t id) const;
void setPersistenceCheckpointId(uint16_t id, uint64_t checkpointId);
/**
* Check if a vbucket snapshot task is currently scheduled with
@@ -84,10 +86,12 @@ public:
* "false".
*/
bool setLowPriorityVbSnapshotFlag(bool lowPrioritySnapshot);
+
private:
RCPtr<VBucket> *buckets;
Atomic<bool> *bucketDeletion;
+ Atomic<bool> *bucketCreation;
Atomic<uint64_t> *persistenceCheckpointIds;
Atomic<bool> highPriorityVbSnapshot;
Atomic<bool> lowPriorityVbSnapshot;
View
11 tests/ep_test_apis.cc
@@ -284,6 +284,17 @@ void extendCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
free(request);
}
+ENGINE_ERROR_CODE checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+ uint64_t checkpoint_id) {
+ checkpoint_id = htonll(checkpoint_id);
+ protocol_binary_request_header *request;
+ request = createPacket(CMD_CHECKPOINT_PERSISTENCE, 0, 0, NULL, 0, NULL, 0,
+ (const char *)&checkpoint_id, sizeof(uint64_t));
+ ENGINE_ERROR_CODE rv = h1->unknown_command(h, NULL, request, add_response);
+ free(request);
+ return rv;
+}
+
void gat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char* key,
uint16_t vb, uint32_t exp, bool quiet) {
char ext[4];
View
2  tests/ep_test_apis.h
@@ -103,6 +103,8 @@ bool verify_vbucket_state(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, uint16_t vb,
void createCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1);
void extendCheckpoint(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
uint32_t checkpoint_num);
+ENGINE_ERROR_CODE checkpointPersistence(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
+ uint64_t checkpoint_id);
// Stats Operations
int get_int_stat(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1, const char *statname,
View
37 tests/ep_testsuite.cc
@@ -4409,6 +4409,38 @@ static enum test_result test_extend_open_checkpoint(ENGINE_HANDLE *h, ENGINE_HAN
return SUCCESS;
}
+static enum test_result test_checkpoint_persistence(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+ for (int j = 0; j < 1000; ++j) {
+ std::stringstream ss;
+ ss << "key" << j;
+ item *i;
+ check(store(h, h1, NULL, OPERATION_SET,
+ ss.str().c_str(), ss.str().c_str(), &i, 0, 0) == ENGINE_SUCCESS,
+ "Failed to store a value");
+ h1->release(h, NULL, i);
+ }
+
+ createCheckpoint(h, h1);
+ check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+ "Expected success response from creating a new checkpoint");
+
+ // Last closed checkpoint id for vbucket 0.
+ int closed_chk_id = get_int_stat(h, h1, "vb_0:last_closed_checkpoint_id",
+ "checkpoint 0");
+ // Request to prioritize persisting vbucket 0.
+ check(checkpointPersistence(h, h1, closed_chk_id) == ENGINE_SUCCESS,
+ "Failed to request checkpoint persistence");
+ check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
+ "Expected success response for checkpoint persistence");
+
+ // Issue another request with unexpected larger checkpoint id 100.
+ check(checkpointPersistence(h, h1, 100) == ENGINE_TMPFAIL,
+ "Expected temp failure for checkpoint persistence request");
+
+ return SUCCESS;
+}
+
static enum test_result test_validate_checkpoint_params(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
set_param(h, h1, engine_param_checkpoint, "chk_max_items", "1000");
check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
@@ -6451,6 +6483,11 @@ engine_test_t* get_tests(void) {
test_setup, teardown,
"chk_max_items=5000;chk_period=600",
prepare, cleanup),
+ TestCase("checkpoint: wait for persistence",
+ test_checkpoint_persistence,
+ test_setup, teardown,
+ "chk_max_items=500;max_checkpoints=5;item_num_based_new_chk=true",
+ prepare, cleanup),
// revision id's
TestCase("revision sequence numbers", test_revid, test_setup,
Please sign in to comment.
Something went wrong with that request. Please try again.