Skip to content

Commit

Permalink
MB-49526: Modify MagmaKVStore MagmaCompactionCB & compactionCallBack
Browse files Browse the repository at this point in the history
Modify MagmaCompactionCB so that we create CompactionContext in the
ctor of MagmaCompactionCB. Rather than on the first call to
MagmaKVStore::compactionCallBack(). Also throw if we're unable to create
a CompactionContext, preventing Magma from running a compaction that
can't do anything.

A few unit tests are skipped when running with Nexus, as they trigger
Magma's implicit compaction. This causes issues as we currently don't
support implicit compaction when running under the Nexus back end,
meaning we don't have MagmaKVStore::makeCompactionContextCallback set,
which intern will cause us to throw in the MagmaCompactionCB()
constructor.

Change-Id: Ie4f0a176f51bbbdb5d55c3c41711aaac8f47bf15
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/165921
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Dave Rigby <daver@couchbase.com>
  • Loading branch information
rdemellow committed Nov 24, 2021
1 parent f92ac63 commit 97f2218
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 78 deletions.
75 changes: 34 additions & 41 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,24 +205,46 @@ class MagmaKVFileHandle : public KVFileHandle {
MagmaKVStore::MagmaCompactionCB::MagmaCompactionCB(
MagmaKVStore& magmaKVStore,
Vbid vbid,
std::shared_ptr<CompactionContext> ctx)
: magmaKVStore(magmaKVStore), vbid(vbid), ctx(std::move(ctx)) {
std::shared_ptr<CompactionContext> compactionContext)
: vbid(vbid),
ctx(std::move(compactionContext)),
magmaKVStore(magmaKVStore) {
magmaKVStore.logger->TRACE("MagmaCompactionCB constructor");
setCtxPurgedItemCtx();
}

MagmaKVStore::MagmaCompactionCB::~MagmaCompactionCB() {
magmaKVStore.logger->debug("MagmaCompactionCB destructor");
}

void MagmaKVStore::MagmaCompactionCB::setCtxPurgedItemCtx() {
if (ctx) {
// If we've not got a CompactionContext then this must be an implicit
// compaction so we need to create a CompactionContext
if (!ctx) {
if (!magmaKVStore.makeCompactionContextCallback) {
// We can't perform an implicit compaction if
// makeCompactionContextCallback isn't set.
throw std::invalid_argument(
"MagmaKVStore::MagmaCompactionCB::MagmaCompactionCB() no "
"makeCompactionContextCallback set");
}
ctx = magmaKVStore.makeImplicitCompactionContext(vbid);
// If we don't have a valid compaction context then throw to prevent us
// creating a MagmaCompactionCB that won't be able to do any compaction
if (!ctx) {
throw std::runtime_error(
"MagmaKVStore::MagmaCompactionCB::MagmaCompactionCB() "
"couldn't create compaction context");
}
ctx->purgedItemCtx->rollbackPurgeSeqnoCtx =
std::make_unique<MagmaImplicitCompactionPurgedItemContext>(
ctx->getRollbackPurgeSeqno(),
magmaDbStats,
ctx->maybeUpdateVBucketPurgeSeqno);
} else {
ctx->purgedItemCtx->rollbackPurgeSeqnoCtx =
std::make_unique<MagmaRollbackPurgeSeqnoCtx>(
ctx->getRollbackPurgeSeqno(), magmaDbStats);
}
}

MagmaKVStore::MagmaCompactionCB::~MagmaCompactionCB() {
magmaKVStore.logger->debug("MagmaCompactionCB destructor");
}

bool MagmaKVStore::MagmaCompactionCB::operator()(
const magma::Slice& keySlice,
const magma::Slice& metaSlice,
Expand Down Expand Up @@ -279,43 +301,14 @@ bool MagmaKVStore::compactionCallBack(MagmaKVStore::MagmaCompactionCB& cbCtx,
logger->TRACE("MagmaCompactionCB: {} {}", vbid, userSanitizedItemStr);
}

if (cbCtx.magmaKVStore.configuration.isSanityCheckingVBucketMapping()) {
if (configuration.isSanityCheckingVBucketMapping()) {
validateKeyMapping("MagmaKVStore::compactionCallback",
cbCtx.magmaKVStore.configuration
.getVBucketMappingErrorHandlingMethod(),
configuration.getVBucketMappingErrorHandlingMethod(),
makeDiskDocKey(keySlice).getDocKey(),
vbid,
configuration.getMaxVBuckets());
}

if (!cbCtx.ctx) {
// Don't already have a compaction context (i.e. this is the first
// key for an implicit compaction) - attempt to create one.

// Note that Magma implicit (internal) compactions can start _as soon
// as_ the Magma instance is Open()'d, which means this method can be
// called before Warmup has completed - and hence before
// makeCompactionContextCallback is assigned to a non-empty value.
// Until warmup *does* complete it isn't possible for us to know how
// to correctly deal with those keys - for example need to have
// initialised document counters during warmup to be able to update
// counter on TTL expiry. As such, until we have a non-empty
// makeCompactionContextCallback, simply return false.
if (!makeCompactionContextCallback) {
return false;
}
cbCtx.ctx = makeImplicitCompactionContext(vbid);
// If we don't have a valid compaction context return false
if (!cbCtx.ctx) {
return false;
}
cbCtx.implicitCompaction = true;
cbCtx.ctx->purgedItemCtx->rollbackPurgeSeqnoCtx =
std::make_unique<MagmaImplicitCompactionPurgedItemContext>(
cbCtx.ctx->getRollbackPurgeSeqno(),
cbCtx.magmaDbStats,
cbCtx.ctx->maybeUpdateVBucketPurgeSeqno);
}
return compactionCore(
cbCtx, keySlice, metaSlice, valueSlice, userSanitizedItemStr);
}
Expand Down
20 changes: 6 additions & 14 deletions engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -625,14 +625,10 @@ class MagmaKVStore : public KVStore {
*/
class MagmaCompactionCB : public magma::Magma::CompactionCallback {
public:
MagmaCompactionCB(MagmaKVStore& magmaKVStore,
Vbid vbid,
std::shared_ptr<CompactionContext> ctx = nullptr);

/**
* Set the PurgedItemCtx to the magma type
*/
void setCtxPurgedItemCtx();
MagmaCompactionCB(
MagmaKVStore& magmaKVStore,
Vbid vbid,
std::shared_ptr<CompactionContext> compactionContext = nullptr);

~MagmaCompactionCB() override;
bool operator()(const magma::Slice& keySlice,
Expand All @@ -641,7 +637,6 @@ class MagmaKVStore : public KVStore {
const magma::UserStats* GetUserStats() override {
return &magmaDbStats;
}
MagmaKVStore& magmaKVStore;

/**
* Vbucket being compacted - required so that we can work out which
Expand All @@ -655,15 +650,12 @@ class MagmaKVStore : public KVStore {
*/
std::shared_ptr<CompactionContext> ctx;

private:
MagmaKVStore& magmaKVStore;
/**
* Stats updates made during compaction
*/
MagmaDbStats magmaDbStats;

// bool to inform MagmaKVStore::compactionCallBack if the compaction is
// an implicit or not. If true any purged seqnos will also update the
// in memory purge seqno
bool implicitCompaction = false;
};

/**
Expand Down
10 changes: 10 additions & 0 deletions engines/ep/tests/module_tests/evp_store_durability_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,11 @@ TEST_P(DurabilityEPBucketTest,
* that our on disk accounting remain consistent.
*/
TEST_P(DurabilityEPBucketTest, PersistSyncWriteSyncDeleteTenDocs3Times) {
if (isNexus()) {
// TODO MB-47604 enable for nexus as magma's implicit compaction kicks
// in for this test but isn't supported for nexus
GTEST_SKIP();
}
setVBucketStateAndRunPersistTask(
vbid,
vbucket_state_active,
Expand Down Expand Up @@ -1837,6 +1842,11 @@ TEST_P(DurabilityEPBucketTest, PersistSyncWriteSyncDeleteTenDocs3Times) {
/// Sanity test to make sure our accounting is consitant when we create
/// Multiple documents on disk.
TEST_P(DurabilityEPBucketTest, PersistSyncWrite20SyncDelete20) {
if (isNexus()) {
// TODO MB-47604 enable for nexus as magma's implicit compaction kicks
// in for this test but isn't supported for nexus
GTEST_SKIP();
}
setVBucketStateAndRunPersistTask(
vbid,
vbucket_state_active,
Expand Down
5 changes: 5 additions & 0 deletions engines/ep/tests/module_tests/kvstore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,11 @@ TEST_P(KVStoreParamTest, InvalidSnapshotDetected) {
// Expect ThreadSanitizer to pick this.
// Rocks has race condition issues
TEST_P(KVStoreParamTestSkipRocks, DelVBucketConcurrentOperationsTest) {
if (isNexus()) {
// TODO MB-47604 enable for nexus as magma's implicit compaction kicks
// in for this test but isn't supported for nexus
GTEST_SKIP();
}
std::atomic<bool> stop{false};
bool okToDelete{false};
uint32_t deletes{0};
Expand Down
33 changes: 10 additions & 23 deletions engines/ep/tests/module_tests/magma-kvstore_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,22 +357,13 @@ TEST_F(MagmaKVStoreTest, initializeWithHeaderButNoVBState) {
}

// Check that if Magma performs an internal (implicit) compaction before
// ep-engine has completed warmup, then any compaction callbacks into
// MagmaKVStore are ignored - until a later compaction after warmup.
// ep-engine has completed warmup, then we will throw to prevent an implicit
// compaction that can't drop keys from running
TEST_F(MagmaKVStoreTest, MB39669_CompactionBeforeWarmup) {
// Simulate a compaction callback early on - before Warmup has completed.
auto newCompaction = kvstoreConfig->magmaCfg.MakeCompactionCallback(
magma::Magma::KVStoreID(0));
magma::Slice key;
magma::Slice value;
// Require a valid metadata slice to (a) ensure the item isn't just
// skipped (zero-length meta == local document) and (b) to provide a valid
// Vbid.
magmakv::MetaData metadata;
auto encoded = metadata.encode();
magma::Slice meta{encoded};
// Compaction callback should return false for anything before warmup.
EXPECT_FALSE(newCompaction->operator()(key, meta, value));
EXPECT_THROW(kvstoreConfig->magmaCfg.MakeCompactionCallback(
magma::Magma::KVStoreID(0)),
std::invalid_argument);
}

TEST_F(MagmaKVStoreTest, MetadataEncoding) {
Expand Down Expand Up @@ -673,13 +664,9 @@ TEST_F(MagmaKVStoreTest, MB_49465) {
[](Vbid vbid, CompactionConfig& config, uint64_t purgeSeqno) {
return nullptr;
});
// Create a MagmaCompactionCB so that we can ensure it's bool operator
// returns false if makeCompactionContextCallback returns nullptr
auto compaction = kvstoreConfig->magmaCfg.MakeCompactionCallback(
magma::Magma::KVStoreID(99));
magma::Slice key;
magma::Slice value;
magma::Slice meta;
// Ensure we return false as we were unable to get a CompactionContext
EXPECT_FALSE(compaction->operator()(key, meta, value));
// Create a MagmaCompactionCB so that we can ensure that we throw a run time
// error if we're unable to create a compaction context
EXPECT_THROW(kvstoreConfig->magmaCfg.MakeCompactionCallback(
magma::Magma::KVStoreID(99)),
std::runtime_error);
}

0 comments on commit 97f2218

Please sign in to comment.