Skip to content

Commit

Permalink
MB-45026: Expelling doesn't remove the checkpoint_start item
Browse files Browse the repository at this point in the history
A good amount of logic around checkpoint assumes the invariant that
the checkpoint_start item always exists in Checkpoint, same as the
empty-item.

When we implemented item-expel we broke that invariant. Expel may remove
the checkpoint_start item because it always assumes that it has to
remove all items that share the same seqno.
We still keep that logic at "expel", but not for the checkpoint_start
item that stays untouched now.

The change requires to preserve our cursor-registering logic by updating
the Checkpoint::getLowSeqno() logic accordingly to the new expel
behaviour.

Change-Id: Ic9dd5534b8888a416f93745e42b13ba1aaf3d324
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/155204
Tested-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
paolococchi committed Jun 16, 2021
1 parent bf2008d commit 9bdac12
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 69 deletions.
55 changes: 44 additions & 11 deletions engines/ep/src/checkpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,33 @@ bool Checkpoint::canDedup(const queued_item& existing,
return !(isDurabilityOp(existing) || isDurabilityOp(in));
}

uint64_t Checkpoint::getMinimumCursorSeqno() const {
auto pos = begin();
Expects((*pos)->isEmptyItem());
const auto seqno = (*pos)->getBySeqno();
++pos;
Expects((*pos)->isCheckpointStart());
Expects(seqno == (*pos)->getBySeqno());

if (highestExpelledSeqno == 0) {
// Old path for the pre-expel behaviour.
// Expel has never modified this checkpoint, so any seqno-gap was
// generated by normal de-duplication.
//
// Note: This path ensures that we don't trigger useless backfills where
// backfilling is not really necessary.
return seqno;
}

// Expel has run and modified the checkpoint, we must have at least one
// item as expel would not remove high-seqno.
Expects(numItems > 0);

// Seek to the first item after checkpoint start
++pos;
return (*pos)->getBySeqno();
}

void Checkpoint::addItemToCheckpoint(const queued_item& qi) {
toWrite.push_back(qi);
// Increase the size of the checkpoint by the item being added
Expand All @@ -495,18 +522,24 @@ void Checkpoint::addItemToCheckpoint(const queued_item& qi) {
CheckpointQueue Checkpoint::expelItems(const ChkptQueueIterator& last) {
CheckpointQueue expelledItems(toWrite.get_allocator());

// Record the seqno of the last item to be expelled.
highestExpelledSeqno = (*last)->getBySeqno();

// Expel from the the first item after the checkpoint_start item (included)
// to 'last' (included).
const auto dummy = begin();
Expects((*dummy)->isEmptyItem());
auto first = std::next(dummy);
Expects((*first)->isCheckpointStart());
// This function expects that there is at least one item to expel, caller is
// responsible to ensure that.
++first;
if (first == end()) {
throw std::logic_error(
"Checkpoint::expelItems: Called on an empty checkpoint");
}
// The last item to be expelled is not expected to be a meta-item.
Expects(!(*last)->isCheckPointMetaItem());

// @todo: MB-45026 - don't expel checkpoint_start
// Expel from the the first item after the empty item (included) to 'last'
// (included).
auto dummy = begin();
Expects((*dummy)->isEmptyItem());
const auto first = std::next(dummy);
// Record the seqno of the last item to be expelled.
highestExpelledSeqno = (*last)->getBySeqno();

expelledItems.splice(
ChkptQueueIterator::const_underlying_iterator{
Expand Down Expand Up @@ -622,8 +655,8 @@ void Checkpoint::addStats(const AddStatFn& add_stat,

std::ostream& operator <<(std::ostream& os, const Checkpoint& c) {
os << "Checkpoint[" << &c << "] with"
<< " id:" << c.checkpointId << " seqno:{" << c.getLowSeqno() << ","
<< c.getHighSeqno() << "}"
<< " id:" << c.checkpointId << " seqno:{" << c.getMinimumCursorSeqno()
<< "," << c.getHighSeqno() << "}"
<< " snap:{" << c.getSnapshotStartSeqno() << ","
<< c.getSnapshotEndSeqno()
<< ", visible:" << c.getVisibleSnapshotEndSeqno() << "}"
Expand Down
21 changes: 13 additions & 8 deletions engines/ep/src/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ class CheckpointCursor {
return isValid;
}

const StoredDocKey& getKey() const {
return (*currentPos)->getKey();
}

private:
/**
* Move the cursor's iterator back one if it is not currently pointing to
Expand Down Expand Up @@ -495,15 +499,16 @@ class Checkpoint {
bool canDedup(const queued_item& existing, const queued_item& in) const;

/**
* Returns the seqno of a non-expelled (and non-dummy) item that has the
* lowest seqno.
* Returns the first seqno available in this checkpoint for a cursor to pick
* up. Used for registering cursors at the right position.
* Logically the returned seqno is a different quantity depending on whether
* expelling has modified the checkpoint queue:
*
* 1. Expel hasn't run -> that's the seqno of checkpoint_start
* 2. Expel has run -> that's the seqno of the first item after the
* checkpoint_start
*/
uint64_t getLowSeqno() const {
auto pos = begin();
// Skip passed the dummy item
++pos;
return (*pos)->getBySeqno();
}
uint64_t getMinimumCursorSeqno() const;

uint64_t getHighSeqno() const {
auto pos = end();
Expand Down
6 changes: 3 additions & 3 deletions engines/ep/src/checkpoint_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void CheckpointManager::addNewCheckpoint_UNLOCKED(
"snapEnd:{}]",
vbucketId,
oldOpenCkpt.getId(),
oldOpenCkpt.getLowSeqno(),
oldOpenCkpt.getMinimumCursorSeqno(),
oldOpenCkpt.getHighSeqno());
queued_item qi = createCheckpointItem(
oldOpenCkpt.getId(), vbucketId, queue_op::checkpoint_end);
Expand Down Expand Up @@ -268,7 +268,7 @@ CursorRegResult CheckpointManager::registerCursorBySeqno_UNLOCKED(
auto itr = checkpointList.begin();
for (; itr != checkpointList.end(); ++itr) {
uint64_t en = (*itr)->getHighSeqno();
uint64_t st = (*itr)->getLowSeqno();
uint64_t st = (*itr)->getMinimumCursorSeqno();

if (startBySeqno < st) {
// Requested sequence number is before the start of this
Expand Down Expand Up @@ -583,7 +583,7 @@ CheckpointManager::expelUnreferencedCheckpointItems() {
<< ") lowest found cursor is not in the oldest "
"checkpoint. Oldest checkpoint ID: "
<< oldestCheckpoint->getId()
<< " lowSeqno: " << oldestCheckpoint->getLowSeqno()
<< " lowSeqno: " << oldestCheckpoint->getMinimumCursorSeqno()
<< " highSeqno: " << oldestCheckpoint->getHighSeqno()
<< " snapStart: " << oldestCheckpoint->getSnapshotStartSeqno()
<< " snapEnd: " << oldestCheckpoint->getSnapshotEndSeqno()
Expand Down
4 changes: 4 additions & 0 deletions engines/ep/src/item.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,10 @@ class Item : public ItemIface, public RCValue {
return op == queue_op::checkpoint_start;
}

bool isCheckpointEnd() const {
return op == queue_op::checkpoint_end;
}

private:
/**
* Set the item's data. This is only used by constructors, so we
Expand Down
26 changes: 13 additions & 13 deletions engines/ep/tests/module_tests/checkpoint_remover_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -624,32 +624,32 @@ TEST_F(CheckpointRemoverEPTest, expelsOnlyIfOldestCheckpointIsReferenced) {
* dummy << cursor starts here
* chk start |
* vb state |
* key_1 V
* key_2 << advance to here
* key_3
* key_0 V
* key_1 << advance to here
* key_2
* chk end
*/

size_t expellItemCount = 4;

for (size_t i = 0; i < expellItemCount; i++) {
while (cursor->getKey().to_string() != "cid:0x0:key_1") {
cm->incrCursor(*cursor);
}

// can now expel the 4 items before the above cursor
// Can now expel the 2 items in (ckpt_start, cursor)
auto result = cm->expelUnreferencedCheckpointItems();

EXPECT_EQ(3, result.expelCount);
EXPECT_EQ(2, result.expelCount);

/* items in first checkpoint
*
* dummy
* chk start
* key_1
* key_2
* key_3
*/

afterCount = getItemsWithCursor("Cursor4", 0, true).size();

EXPECT_EQ(beforeCount - 3, afterCount);
EXPECT_EQ(beforeCount - 2, afterCount);
}

TEST_F(CheckpointRemoverEPTest, earliestCheckpointSelectedCorrectly) {
Expand Down Expand Up @@ -746,7 +746,7 @@ TEST_F(CheckpointRemoverEPTest, NewSyncWriteCreatesNewCheckpointIfCantDedupe) {
flushVBucketToDiskIfPersistent(vbid, 3);

auto result = cm->expelUnreferencedCheckpointItems();
EXPECT_EQ(3, result.expelCount);
EXPECT_EQ(2, result.expelCount);

EXPECT_EQ(cb::engine_errc::success,
vb->commit(prepareKey,
Expand Down Expand Up @@ -788,7 +788,7 @@ TEST_F(CheckpointRemoverEPTest, UseOpenCheckpointIfCanDedupeAfterExpel) {
flushVBucketToDiskIfPersistent(vbid, 3);

auto result = cm->expelUnreferencedCheckpointItems();
EXPECT_EQ(3, result.expelCount);
EXPECT_EQ(2, result.expelCount);

store_item(vbid, makeStoredDocKey("key_2"), "value");

Expand Down Expand Up @@ -850,7 +850,7 @@ TEST_F(CheckpointRemoverEPTest,
// expel from the checkpoint. This will invalidate keyIndex entries
// for all expelled items.
auto result = cm->expelUnreferencedCheckpointItems();
EXPECT_EQ(3, result.expelCount);
EXPECT_EQ(2, result.expelCount);

EXPECT_EQ(1, cm->getNumCheckpoints());

Expand Down
61 changes: 27 additions & 34 deletions engines/ep/tests/module_tests/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1774,26 +1774,24 @@ void CheckpointTest::testExpelCheckpointItems() {

CheckpointManager::ExpelResult expelResult =
this->manager->expelUnreferencedCheckpointItems();
EXPECT_EQ(2, expelResult.expelCount);
EXPECT_EQ(1, expelResult.expelCount);
EXPECT_LT(0, expelResult.estimateOfFreeMemory);
EXPECT_EQ(2, this->global_stats.itemsExpelledFromCheckpoints);
EXPECT_EQ(1, this->global_stats.itemsExpelledFromCheckpoints);

/*
* After expelling checkpoint now looks as follows:
* We have expelled:
* 1001 - 1st item (key 0)
*
* Now the checkpoint looks as follows:
* 1000 - dummy Item
* 1001 - checkpoint start
* 1002 - 2nd item (key1) <<<<<<< persistenceCursor
* 1003 - 3rd item (key 2)
*/
if (persistent()) {
EXPECT_EQ(1002, (*manager->getPersistenceCursorPos())->getBySeqno());
}

/*
* We have expelled:
* 1001 - checkpoint start
* 1001 - 1st item (key 0)
*/

// The full checkpoint still contains the 3 items added.
EXPECT_EQ(itemCount, this->manager->getNumOpenChkItems());

Expand Down Expand Up @@ -1849,16 +1847,17 @@ TEST_P(CheckpointTest, expelCheckpointItemsWithDuplicateTest) {

CheckpointManager::ExpelResult expelResult =
this->manager->expelUnreferencedCheckpointItems();
EXPECT_EQ(2, expelResult.expelCount);
EXPECT_EQ(1, expelResult.expelCount);
EXPECT_LT(0, expelResult.estimateOfFreeMemory);
EXPECT_EQ(2, this->global_stats.itemsExpelledFromCheckpoints);
EXPECT_EQ(1, this->global_stats.itemsExpelledFromCheckpoints);

// Item count doens't change
EXPECT_EQ(3, this->manager->getNumOpenChkItems());

/*
* After expelling checkpoint now looks as follows:
* 1000 - dummy Item
* 1000 - checkpoint_start
* 1002 - 2nd item (key1) <<<<<<< persistenceCursor
* 1003 - 3rd item (key 2)
*/
Expand All @@ -1873,6 +1872,7 @@ TEST_P(CheckpointTest, expelCheckpointItemsWithDuplicateTest) {
/*
* Checkpoint now looks as follows:
* 1000 - dummy Item
* 1000 - checkpoint_start
* 1002 - 2nd item (key1) <<<<<<< persistenceCursor
* 1003 - 3rd item (key2)
* 1004 - 4th item (key0) << The New item added >>
Expand Down Expand Up @@ -1915,20 +1915,20 @@ void CheckpointTest::testExpelCursorPointingToLastItem() {
/*
* Checkpoint now looks as follows:
* 1000 - dummy item
* 1001 - checkpoint start
* 1000 - checkpoint start
* 1001 - 1st item
* 1002 - 2nd item <<<<<<< persistenceCursor
*/

// Only expel seqno 1001 and earlier - the cursor points to item that
// Only expel seqno 1001 - the cursor points to item that
// has the highest seqno for the checkpoint so we move the expel point back
// one. That item isn't a metadata item nor is it's successor item
// (1002) the same seqno as itself (1001) so can expel from there.
CheckpointManager::ExpelResult expelResult =
this->manager->expelUnreferencedCheckpointItems();
EXPECT_EQ(2, expelResult.expelCount);
EXPECT_EQ(1, expelResult.expelCount);
EXPECT_GT(expelResult.estimateOfFreeMemory, 0);
EXPECT_EQ(2, this->global_stats.itemsExpelledFromCheckpoints);
EXPECT_EQ(1, this->global_stats.itemsExpelledFromCheckpoints);
}

TEST_P(CheckpointTest, testExpelCursorPointingToLastItemMemory) {
Expand Down Expand Up @@ -2155,6 +2155,7 @@ void CheckpointTest::testExpelCheckpointItemsMemoryRecovered() {
}

/*
*
* Checkpoint now looks as follows:
* 1000 - dummy item
* 1001 - checkpoint start
Expand All @@ -2167,25 +2168,24 @@ void CheckpointTest::testExpelCheckpointItemsMemoryRecovered() {
const auto checkpointMemoryUsageBeforeExpel =
this->manager->getMemoryUsage();

CheckpointManager::ExpelResult expelResult =
this->manager->expelUnreferencedCheckpointItems();
const auto expelResult = manager->expelUnreferencedCheckpointItems();
EXPECT_EQ(1, expelResult.expelCount);
EXPECT_EQ(1, global_stats.itemsExpelledFromCheckpoints);

/*
* After expelling checkpoint now looks as follows:
* We have expelled:
* 1001 - 1st item (key 0)
*
* Checkpoint now looks as follows:
* 1000 - dummy Item
* 1001 - checkpoint start
* 1002 - 2nd item (key 1) <<<<<<< Cursor
* 1003 - 3rd item (key 2)
*/
if (persistent()) {
EXPECT_EQ(1002, (*manager->getPersistenceCursorPos())->getBySeqno());
}

/*
* We have expelled:
* 1001 - checkpoint start
* 1001 - 1st item (key 0)
*/

// Get the memory usage after expelling
auto checkpointMemoryUsageAfterExpel = this->manager->getMemoryUsage();

Expand All @@ -2207,19 +2207,12 @@ void CheckpointTest::testExpelCheckpointItemsMemoryRecovered() {
checkpointMemoryUsageBeforeExpel - checkpointMemoryUsageAfterExpel;
const size_t checkpointListSaving =
(perElementOverhead * expelResult.expelCount);
const auto& checkpointStartItem =
this->manager->public_createCheckpointItem(
0, Vbid(0), queue_op::checkpoint_start);
// Expelled checkpoint_start + 1 mutation
const size_t queuedItemSaving = checkpointStartItem->size() + sizeOfItem;
const size_t expectedMemoryRecovered =
checkpointListSaving + queuedItemSaving;

EXPECT_EQ(2, expelResult.expelCount);
// List saving + 1 mutation
const size_t expectedMemoryRecovered = checkpointListSaving + sizeOfItem;

EXPECT_EQ(expectedMemoryRecovered,
expelResult.estimateOfFreeMemory - extra);
EXPECT_EQ(expectedMemoryRecovered, reductionInCheckpointMemoryUsage);
EXPECT_EQ(2, this->global_stats.itemsExpelledFromCheckpoints);
}

TEST_P(CheckpointTest, testExpelCheckpointItemsMemoryRecoveredMemory) {
Expand Down
2 changes: 2 additions & 0 deletions engines/ep/tests/module_tests/dcp_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,8 @@ TEST_P(StreamTest, MB17653_ItemsRemaining) {

setup_dcp_stream();

ASSERT_TRUE(stream->isInMemory());

// Should start with one item remaining.
EXPECT_EQ(1, stream->getItemsRemaining())
<< "Unexpected initial stream item count";
Expand Down

0 comments on commit 9bdac12

Please sign in to comment.