Skip to content

Commit

Permalink
MB-26021 [6/6]: Limit #checkpoint items flushed in a single batch
Browse files Browse the repository at this point in the history
Expand the previous limiting of flusher batch size to also limit the
number of items from the Checkpoint Manager.

In the case of Checkpoint items, we cannot arbitrarily "cut" the batch
in the middle of a checkpoint - as that would result in an
inconsistent state (non-checkpoint boundary) being written in the
couchstore snapshow. In the event of a crash / restart that wouldn't
be a valid state.

This is implemented by adding a new
CheckpointManager::getItemsForCursor() method; which differs from the
existing get*All*ItemsForCursor() in that it takes an approximate
limit argument. Note this is approximate as we only split the batch at
a checkpoint boundary - the "limit" specifies that we will finish
visiting the current checkpoint, but not visit the next.

Results in the following changes to VBucketBench/FlushVBucket - note
reduction in PeakFlushBytes (from 740M to 7.5M); and average bytes per
item (from 775 to 7) at larger DWQ sizes:

Before:

    Run on (8 X 2300 MHz CPU s)
    2018-02-16 17:23:25
    -----------------------------------------------------------------------------------------
    Benchmark                                  Time           CPU Iterations
     UserCounters...-----------------------------------------------------------------------------------------
    VBucketBench/FlushVBucket/1           438175 ns     319992 ns       2239 PeakBytesPerItem=175.266k PeakFlushBytes=175.266k   3.05183k items/s
    VBucketBench/FlushVBucket/10          537116 ns     365452 ns       2042 PeakBytesPerItem=18.1953k PeakFlushBytes=181.961k    26.722k items/s
    VBucketBench/FlushVBucket/100         928924 ns     724770 ns       1013 PeakBytesPerItem=2.82715k PeakFlushBytes=282.727k   134.741k items/s
    VBucketBench/FlushVBucket/1000       4414461 ns    4079710 ns        176 PeakBytesPerItem=1000 PeakFlushBytes=977.438k   239.371k items/s
    VBucketBench/FlushVBucket/10000     44486851 ns   43265875 ns         16 PeakBytesPerItem=781 PeakFlushBytes=7.45735M   225.712k items/s
    VBucketBench/FlushVBucket/100000   429518562 ns  423825500 ns          2 PeakBytesPerItem=759 PeakFlushBytes=72.427M   230.416k items/s
    VBucketBench/FlushVBucket/1000000 4025349877 ns 3942721000 ns          1 PeakBytesPerItem=775 PeakFlushBytes=740.04M   247.687k items/s

After:

    Run on (8 X 2300 MHz CPU s)
    2018-02-16 17:19:51
    -----------------------------------------------------------------------------------------
    Benchmark                                  Time           CPU Iterations
     UserCounters...-----------------------------------------------------------------------------------------
    VBucketBench/FlushVBucket/1           479525 ns     340742 ns       2023 PeakBytesPerItem=175.281k PeakFlushBytes=175.281k   2.86599k items/s
    VBucketBench/FlushVBucket/10          526072 ns     375763 ns       1868 PeakBytesPerItem=18.1943k PeakFlushBytes=181.945k   25.9888k items/s
    VBucketBench/FlushVBucket/100         981275 ns     721473 ns       1003 PeakBytesPerItem=2.82617k PeakFlushBytes=282.711k   135.357k items/s
    VBucketBench/FlushVBucket/1000       4459568 ns    4118994 ns        173 PeakBytesPerItem=1000 PeakFlushBytes=977.438k   237.088k items/s
    VBucketBench/FlushVBucket/10000     45353759 ns   44451063 ns         16 PeakBytesPerItem=781 PeakFlushBytes=7.45737M   219.694k items/s
    VBucketBench/FlushVBucket/100000   414823038 ns  406181000 ns          2 PeakBytesPerItem=137 PeakFlushBytes=13.0832M   240.425k items/s
    VBucketBench/FlushVBucket/1000000 3116659340 ns 3000999000 ns          1 PeakBytesPerItem=7 PeakFlushBytes=7.57903M   325.412k items/s

Change-Id: I2d3c618557f3f5928879f09f7cba58968abd04db
Reviewed-on: http://review.couchbase.org/86391
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
  • Loading branch information
daverigby committed Feb 23, 2018
1 parent 90c76d4 commit 4fa4905
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 66 deletions.
47 changes: 39 additions & 8 deletions engines/ep/src/checkpoint.cc
Expand Up @@ -1249,8 +1249,14 @@ void CheckpointManager::queueSetVBState(VBucket& vb) {
}

snapshot_range_t CheckpointManager::getAllItemsForCursor(
const std::string& name,
std::vector<queued_item> &items) {
const std::string& name, std::vector<queued_item>& items) {
return getItemsForCursor(name, items, std::numeric_limits<size_t>::max());
}

snapshot_range_t CheckpointManager::getItemsForCursor(
const std::string& name,
std::vector<queued_item>& items,
size_t approxLimit) {
LockHolder lh(queueLock);
snapshot_range_t range;
cursor_index::iterator it = connCursors.find(name);
Expand All @@ -1265,19 +1271,43 @@ snapshot_range_t CheckpointManager::getAllItemsForCursor(
return range;
}

auto& cursor = it->second;

// Fetch whole checkpoints; as long as we don't exceed the approx item
// limit.
bool moreItems;
range.start = (*it->second.currentCheckpoint)->getSnapshotStartSeqno();
while ((moreItems = incrCursor(it->second))) {
queued_item& qi = *(it->second.currentPos);
range.start = (*cursor.currentCheckpoint)->getSnapshotStartSeqno();
range.end = (*cursor.currentCheckpoint)->getSnapshotEndSeqno();
size_t itemCount = 0;
while ((moreItems = incrCursor(cursor))) {
queued_item& qi = *(cursor.currentPos);
items.push_back(qi);
itemCount++;

if (qi->getOperation() == queue_op::checkpoint_end) {
// Reached the end of a checkpoint; check if we have exceeded
// our limit.
if (itemCount >= approxLimit) {
// Reached our limit - don't want any more items.
range.end = (*cursor.currentCheckpoint)->getSnapshotEndSeqno();

// However, we *do* want to move the cursor into the next
// checkpoint if possible; as that means the checkpoint we just
// completed has one less cursor in it (and could potentially be
// freed).
moveCursorToNextCheckpoint(cursor);
break;
}
}
// May have moved into a new checkpoint - update range.end.
range.end = (*cursor.currentCheckpoint)->getSnapshotEndSeqno();
}
range.end = (*it->second.currentCheckpoint)->getSnapshotEndSeqno();

LOG(EXTENSION_LOG_DEBUG, "CheckpointManager::getAllItemsForCursor() "
"cursor:%s range:{%" PRIu64 ", %" PRIu64 "}",
name.c_str(), range.start, range.end);

it->second.numVisits++;
cursor.numVisits++;

return range;
}
Expand Down Expand Up @@ -1326,7 +1356,8 @@ bool CheckpointManager::incrCursor(CheckpointCursor &cursor) {
cursor.incrMetaItemOffset(1);
}
return true;
} else if (!moveCursorToNextCheckpoint(cursor)) {
}
if (!moveCursorToNextCheckpoint(cursor)) {
--(cursor.currentPos);
return false;
}
Expand Down
28 changes: 27 additions & 1 deletion engines/ep/src/checkpoint.h
Expand Up @@ -718,8 +718,34 @@ class CheckpointManager {
*/
queued_item nextItem(const std::string &name, bool &isLastMutationItem);

/**
* Add all outstanding items for the given cursor name to the vector.
*
* @param name Cursor to advance.
* @param items container which items will be appended to.
* @return The low/high sequence number added to `items` on success,
* or (0,0) if no items were added.
*/
snapshot_range_t getAllItemsForCursor(const std::string& name,
std::vector<queued_item> &items);
std::vector<queued_item>& items);

/**
* Add items for the given cursor to the vector, stopping on a checkpoint
* boundary which is greater or equal to `approxLimit`. The cursor is
* advanced to point after the items fetched.
*
* Note: It is only valid to fetch complete checkpoints; as such we cannot
* limit to a precise number of items.
*
* @param name Cursor to advance.
* @param items container which items will be appended to.
* @param approxLimit Approximate number of items to add.
* @return sequenceRange - the low/high sequence number added to `items`
* on success, or (0,0) if no items were added.
*/
snapshot_range_t getItemsForCursor(const std::string& name,
std::vector<queued_item>& items,
size_t approxLimit);

/**
* Return the total number of items (including meta items) that belong to
Expand Down
31 changes: 8 additions & 23 deletions engines/ep/src/ep_bucket.cc
Expand Up @@ -255,29 +255,14 @@ std::pair<bool, size_t> EPBucket::flushVBucket(uint16_t vbid) {
return {true, 0};
}
if (vb) {
std::vector<queued_item> items;
KVStore *rwUnderlying = getRWUnderlying(vbid);

while (!vb->rejectQueue.empty()) {
items.push_back(vb->rejectQueue.front());
vb->rejectQueue.pop();
}

// Append any 'backfill' items (mutations added by a DCP stream).
moreAvailable = vb->getBackfillItems(items, flusherBatchSplitTrigger);

// Append all items outstanding for the persistence cursor, as long as
// we haven't already hit the batch limit.
snapshot_range_t range{0, 0};
if (!moreAvailable) {
auto _begin_ = ProcessClock::now();
range = vb->checkpointManager->getAllItemsForCursor(
CheckpointManager::pCursorName, items);
moreAvailable = false;
stats.persistenceCursorGetItemsHisto.add(
std::chrono::duration_cast<std::chrono::microseconds>(
ProcessClock::now() - _begin_));
}
// Obtain the set of items to flush, up to the maximum allowed for
// a single flush.
auto toFlush = vb->getItemsToPersist(flusherBatchSplitTrigger);
auto& items = toFlush.items;
auto& range = toFlush.range;
moreAvailable = toFlush.moreAvailable;

KVStore* rwUnderlying = getRWUnderlying(vb->getId());

if (!items.empty()) {
while (!rwUnderlying->begin(
Expand Down
53 changes: 40 additions & 13 deletions engines/ep/src/vbucket.cc
Expand Up @@ -311,26 +311,53 @@ void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
}
}

bool VBucket::getBackfillItems(std::vector<queued_item>& items, size_t limit) {
if (limit == 0) {
limit = std::numeric_limits<size_t>::max();
VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
// Fetch up to approxLimit items from rejectQueue, backfill items and
// checkpointManager (in that order); then check if we obtained everything
// which is available.
ItemsToFlush result;

// First add any items from the rejectQueue.
while (result.items.size() < approxLimit && !rejectQueue.empty()) {
result.items.push_back(rejectQueue.front());
rejectQueue.pop();
}

bool moreAvailable = false;
size_t num_items = 0;
{ // Locking scope
// Transfer up to `limit` items from backfill queue.
// Append any 'backfill' items (mutations added by a DCP stream).
bool backfillEmpty;
{
LockHolder lh(backfill.mutex);
for (; num_items < limit && !backfill.items.empty(); num_items++) {
items.push_back(backfill.items.front());
size_t num_items = 0;
while (result.items.size() < approxLimit && !backfill.items.empty()) {
result.items.push_back(backfill.items.front());
backfill.items.pop();
num_items++;
}
moreAvailable = !backfill.items.empty();
backfillEmpty = backfill.items.empty();
stats.vbBackfillQueueSize.fetch_sub(num_items);
stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
}

// Append up to approxLimit checkpoint items outstanding for the persistence
// cursor, if we haven't yet hit the limit.
// Note that it is only valid to queue a complete checkpoint - this is where
// the "approx" in the limit comes from.
const auto ckptMgrLimit = approxLimit - result.items.size();
if (ckptMgrLimit > 0) {
auto _begin_ = ProcessClock::now();
result.range = checkpointManager->getItemsForCursor(
CheckpointManager::pCursorName, result.items, ckptMgrLimit);
stats.persistenceCursorGetItemsHisto.add(
std::chrono::duration_cast<std::chrono::microseconds>(
ProcessClock::now() - _begin_));
}

stats.vbBackfillQueueSize.fetch_sub(num_items);
stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
return moreAvailable;
// Check if there's any more items remaining.
result.moreAvailable = !rejectQueue.empty() || !backfillEmpty ||
(checkpointManager->getNumItemsForCursor(
CheckpointManager::pCursorName) > 0);

return result;
}

void VBucket::setState(vbucket_state_t to) {
Expand Down
17 changes: 17 additions & 0 deletions engines/ep/src/vbucket.h
Expand Up @@ -414,6 +414,23 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
*/
bool getBackfillItems(std::vector<queued_item>& items, size_t limit = 0);

struct ItemsToFlush {
std::vector<queued_item> items;
snapshot_range_t range;
bool moreAvailable = false;
};

/**
* Obtain the series of items to be flushed for this vBucket.
*
* @param vb VBucket to fetch items for.
* @param approxLimit Upper bound on how many items to fetch.
* @return The items to flush; along with their seqno range and
* if more items are available for this vBucket (i.e. the
* limit was reached).
*/
ItemsToFlush getItemsToPersist(size_t approxLimit);

bool isBackfillPhase() {
return backfill.isBackfillPhase.load();
}
Expand Down
44 changes: 44 additions & 0 deletions engines/ep/tests/module_tests/checkpoint_test.cc
Expand Up @@ -31,6 +31,7 @@
#include "thread_gate.h"
#include "ep_vb.h"

#include <engines/ep/src/ep_types.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <valgrind/valgrind.h>
Expand Down Expand Up @@ -784,6 +785,49 @@ TYPED_TEST(CheckpointTest, ItemsForCheckpointCursor) {
EXPECT_EQ(1000 + 2 * MIN_CHECKPOINT_ITEMS, range.end);
}

// Test getAllItemsForCursor() when it is limited to fewer items than exist
// in total. Cursor should only advanced to the start of the 2nd checkpoint.
TYPED_TEST(CheckpointTest, ItemsForCheckpointCursorLimited) {
/* We want to have items across 2 checkpoints. Size down the default number
of items to create a new checkpoint and recreate the manager */
this->checkpoint_config = CheckpointConfig(DEFAULT_CHECKPOINT_PERIOD,
MIN_CHECKPOINT_ITEMS,
/*numCheckpoints*/ 2,
/*itemBased*/ true,
/*keepClosed*/ false,
/*enableMerge*/ false,
/*persistenceEnabled*/ true);

this->createManager();

/* Add items such that we have 2 checkpoints */
queued_item qi;
for (unsigned int ii = 0; ii < 2 * MIN_CHECKPOINT_ITEMS; ii++) {
ASSERT_TRUE(this->queueNewItem("key" + std::to_string(ii)));
}

/* Verify we have desired number of checkpoints and desired number of
items */
ASSERT_EQ(2, this->manager->getNumCheckpoints());
ASSERT_EQ(MIN_CHECKPOINT_ITEMS, this->manager->getNumOpenChkItems());

/* Get items for persistence. Specify a limit of 1 so we should only
* fetch the first checkpoints' worth.
*/
std::vector<queued_item> items;
auto range = this->manager->getItemsForCursor(
CheckpointManager::pCursorName, items, 1);
EXPECT_EQ(0, range.start);
EXPECT_EQ(1000 + MIN_CHECKPOINT_ITEMS, range.end);
EXPECT_EQ(MIN_CHECKPOINT_ITEMS + 2, items.size())
<< "Should have MIN_CHECKPOINT_ITEMS + 2 (ckpt start & end) items";
EXPECT_EQ(2,
this->manager->getCheckpointIdForCursor(
CheckpointManager::pCursorName))
<< "Cursor should have moved into second checkpoint.";

}

// Test the checkpoint cursor movement
TYPED_TEST(CheckpointTest, CursorMovement) {
/* We want to have items across 2 checkpoints. Size down the default number
Expand Down

0 comments on commit 4fa4905

Please sign in to comment.