Skip to content

Commit

Permalink
MB-45654: Update purge seqno during Magma implicit compaction
Browse files Browse the repository at this point in the history
Modify MagmaKVStore::compactionCallBack() to update a vbucket's purge
seqno if the we're performing a compaction in the implicit context. To
allow the method to be aware that we're in the implicit context set a
flag in the MagmaKVStore::MagmaCompactionCB after we've called
MagmaKVStore::makeCompactionContext(). Also add a lambda method to
CompactionContext so that we can bind on the EPBucket to give us access
to VBucket::setPurgeSeqno().

Change-Id: I230667da1bb9aff202e97ffc84cd51afc572043c
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/160292
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Jim Walker <jim@couchbase.com>
  • Loading branch information
rdemellow committed Sep 6, 2021
1 parent ab67fab commit aa1faea
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 2 deletions.
14 changes: 14 additions & 0 deletions engines/ep/src/ep_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,20 @@ std::shared_ptr<CompactionContext> EPBucket::makeCompactionContext(
compactionCompletionCallback(ctx);
};

ctx->maybeUpdatePurgeSeqno = [this, vbid](uint64_t seqno) -> void {
auto vbPtr = getVBucket(vbid);
if (!vbPtr) {
throw std::runtime_error(
"KVStore::CompactionContext::maybeUpdatePurgeSeqno(): "
"Unable to get vbucket ptr for " +
vbid.to_string());
}
if (vbPtr && seqno > vbPtr->getPurgeSeqno()) {
vbPtr->setPurgeSeqno(seqno);
}
postPurgeSeqnoImplicitCompactionHook();
};

return ctx;
}

Expand Down
4 changes: 4 additions & 0 deletions engines/ep/src/ep_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ class EPBucket : public KVBucket {

Flusher* getOneFlusher() override;

/// Hook that gets called after a the EPBuckets purge seqno has been update
/// during an implicit compaction
TestingHook<> postPurgeSeqnoImplicitCompactionHook;

protected:
// During the warmup phase we might want to enable external traffic
// at a given point in time.. The LoadStorageKvPairCallback will be
Expand Down
6 changes: 6 additions & 0 deletions engines/ep/src/kvstore/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ struct CompactionContext {

/// Time from which we expire items (if set). Otherwise current time is used
std::optional<time_t> timeToExpireFrom = {};

/**
* Function to call if the purge seqno might need to be update. This will
* only be performed if the param seqno is greater than the current seqno.
*/
std::function<void(uint64_t)> maybeUpdatePurgeSeqno;
};


Expand Down
16 changes: 14 additions & 2 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,11 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,

if (!cbCtx.ctx) {
// Don't already have a compaction context (i.e. this is the first
// key for an implicit compaction) - atempt to create one.
// key for an implicit compaction) - attempt to create one.

// Note that Magma implicit (internal) compactions can start _as soon
// as_ the Magma instance is Open()'d, which means this method can be
// called beforew Warmup has completed - and hence before
// called before Warmup has completed - and hence before
// makeCompactionContextCallback is assigned to a non-empty value.
// Until warmup *does* complete it isn't possible for us to know how
// to correctly deal with those keys - for example need to have
Expand All @@ -255,11 +255,20 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
return false;
}
cbCtx.ctx = makeCompactionContext(vbid);
cbCtx.implicitCompaction = true;
}

auto seqno = magmakv::getSeqNum(metaSlice);
auto exptime = magmakv::getExpiryTime(metaSlice);

// function to update the purge seqno when we're dropping a document if this
// method is being called for implicit compaction
auto maybeUpdatePurgeSeqno = [&cbCtx, &seqno]() -> void {
if (cbCtx.implicitCompaction) {
cbCtx.ctx->maybeUpdatePurgeSeqno(seqno);
}
};

if (cbCtx.ctx->droppedKeyCb) {
// We need to check both committed and prepared documents - if the
// collection has been logically deleted then we need to discard
Expand Down Expand Up @@ -290,6 +299,7 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
"{}",
userSanitizedItemStr);
}
maybeUpdatePurgeSeqno();
return true;
}
}
Expand Down Expand Up @@ -334,6 +344,7 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
if (dbStats.purgeSeqno < seqno) {
dbStats.purgeSeqno = seqno;
}
maybeUpdatePurgeSeqno();
return true;
}
}
Expand All @@ -352,6 +363,7 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
"DROP prepare {}",
userSanitizedItemStr);
}
maybeUpdatePurgeSeqno();
return true;
}
}
Expand Down
5 changes: 5 additions & 0 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,11 @@ class MagmaKVStore : public KVStore {
* Stats updates made during compaction
*/
MagmaDbStats magmaDbStats;

// bool to inform MagmaKVStore::compactionCallBack if the compaction is
// an implicit or not. If true any purged seqnos will also update the
// in memory purge seqno
bool implicitCompaction = false;
};

/**
Expand Down
87 changes: 87 additions & 0 deletions engines/ep/tests/module_tests/magma_bucket_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "../mock/mock_ep_bucket.h"
#include "../mock/mock_magma_kvstore.h"
#include "../tests/module_tests/thread_gate.h"
#include "checkpoint_manager.h"
#include "kvstore/magma-kvstore/magma-kvstore_config.h"
#include "tests/module_tests/test_helpers.h"
Expand Down Expand Up @@ -246,6 +247,92 @@ TEST_P(STParamMagmaBucketTest, makeCompactionContextSetupAtWarmup) {
EXPECT_NO_THROW(magmaKVStore->makeCompactionContext(vbid));
}

/**
* Test to check that we correctly update the in memory purge seqno when magma
* performs an implicit compaction.
*/
TEST_P(STParamMagmaBucketTest, CheckImplicitCompactionUpdatePurgeSeqno) {
// Function to perform 15 writes so that the next flush will hit the
// LSMMaxNumLevel0Tables threshold which will trigger implicit compaction
auto perform15Writes = [this]() -> void {
for (int i = 0; i < 15; i++) {
store_item(
vbid, makeStoredDocKey("key" + std::to_string(i)), "value");
flushVBucketToDiskIfPersistent(vbid, 1);
}
};
// Re-set the engine and warmup adding the magma rollback test config
// settings, so that we create a checkpoint at every flush
resetEngineAndWarmup(magmaRollbackConfig);

setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
auto vb = store->getVBucket(vbid);
ASSERT_EQ(0, vb->getPurgeSeqno());

// ensure we meet the LSMMaxNumLevel0Tables threshold
perform15Writes();

auto firstDeletedKey = makeStoredDocKey("keyA");
store_item(vbid, firstDeletedKey, "value");
delete_item(vbid, firstDeletedKey);
flushVBucketToDiskIfPersistent(vbid, 1);
const auto expectedPurgeSeqno = vb->getHighSeqno();

// Check that the purge seqno is still 0, but we should have a tombstone on
// disk
EXPECT_EQ(0, vb->getPurgeSeqno());

// Time travel 5 days, we want to drop the tombstone for this when we
// compact
TimeTraveller timmy{60 * 60 * 24 * 5};

ThreadGate tg(2);
auto& bucket = dynamic_cast<EPBucket&>(*store);
bucket.postPurgeSeqnoImplicitCompactionHook = [&tg]() -> void {
tg.threadUp();
};

// ensure we meet the LSMMaxNumLevel0Tables threshold
perform15Writes();

auto secondDeletedKey = makeStoredDocKey("keyB");
// Add a second tombstone to check that we don't drop everything
store_item(vbid, secondDeletedKey, "value");
delete_item(vbid, secondDeletedKey);

// And a dummy item because we can't drop the final seqno
store_item(vbid, makeStoredDocKey("dummy"), "value");
// Flush the dummy value and second deleted value
flushVBucketToDiskIfPersistent(vbid, 2);

// Wait till the purge seqno has been set
tg.threadUp();

// Write and flush another value to cause a Sync in magma to occur which
// will ensure that firstDeletedKey is no longer visible
store_item(vbid, makeStoredDocKey("dummy2"), "value");
flushVBucketToDiskIfPersistent(vbid, 1);

auto magmaKVStore =
dynamic_cast<MagmaKVStore*>(store->getRWUnderlying(vbid));
ASSERT_TRUE(magmaKVStore);

// Assert that the first key no longer has a tomb stone
auto gv = magmaKVStore->get(DiskDocKey(firstDeletedKey), vbid);
ASSERT_EQ(cb::engine_errc::no_such_key, gv.getStatus());

// Assert that the second key is still a tomb stone on disk as it hasn't hit
// its purge threshold yet
gv = magmaKVStore->get(DiskDocKey(secondDeletedKey), vbid);
ASSERT_EQ(cb::engine_errc::success, gv.getStatus());
ASSERT_TRUE(gv.item);
ASSERT_TRUE(gv.item->isDeleted());

// Ensure that the purge seqno has been set during the second flush to where
// the first tombstone was
EXPECT_EQ(expectedPurgeSeqno, vb->getPurgeSeqno());
}

INSTANTIATE_TEST_SUITE_P(STParamMagmaBucketTest,
STParamMagmaBucketTest,
STParameterizedBucketTest::magmaConfigValues(),
Expand Down

0 comments on commit aa1faea

Please sign in to comment.