Skip to content

Commit

Permalink
MB-51572: Remove EPBucket::FlushResult::WakeCkptRemover
Browse files Browse the repository at this point in the history
Legacy class used for handling checkpoint removal Lazy.

Change-Id: Ie51a29c14a48098730c2e576f6fbf81e44d17d90
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/173020
Reviewed-by: James H <james.harrison@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
paolococchi committed Apr 8, 2022
1 parent 36bb71a commit 28c9d43
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 225 deletions.
37 changes: 0 additions & 37 deletions engines/ep/src/checkpoint_manager.cc
Expand Up @@ -721,43 +721,6 @@ bool CheckpointManager::hasClosedCheckpointWhichCanBeRemoved() const {
(oldestCkpt->isNoCursorsInCheckpoint());
}

bool CheckpointManager::isEligibleForCheckpointRemovalAfterPersistence() const {
std::lock_guard<std::mutex> lh(queueLock);

const auto& oldestCkpt = checkpointList.front();

// Just 1 (open) checkpoint in CM
if (oldestCkpt->getState() == CHECKPOINT_OPEN) {
Expects(checkpointList.size() == 1);
return false;
}
Expects(checkpointList.size() > 1);

// Is the oldest checkpoint closed and unreferenced?
const auto numCursors = oldestCkpt->getNumCursorsInCheckpoint();
if (numCursors == 0) {
return true;
}

// Some cursors in oldest checkpoint

// If more than 1 cursor, then no checkpoint is eligible for removal
if (numCursors > 1) {
return false;
}

// Just 1 cursor in oldest checkpoint, is it the backup pcursor?
const auto backupIt = cursors.find(backupPCursorName);
if (backupIt != cursors.end() &&
backupIt->second->getCheckpoint()->get() == oldestCkpt.get()) {
// Backup cursor in oldest checkpoint, checkpoint(s) will be eligible
// for removal after backup cursor has gone
return true;
}
// No backup cursor in CM, some other cursor is in oldest checkpoint
return false;
}

void CheckpointManager::updateStatsForNewQueuedItem(
const std::lock_guard<std::mutex>& lh, const queued_item& qi) {
++stats.totalEnqueued;
Expand Down
7 changes: 0 additions & 7 deletions engines/ep/src/checkpoint_manager.h
Expand Up @@ -475,13 +475,6 @@ class CheckpointManager {
*/
bool hasClosedCheckpointWhichCanBeRemoved() const;

/**
* @return true if only the backup pcursor is blocking checkpoint removal.
* Ie, some checkpoints will be eligible for removal as soon as the backup
* pcursor is removed.
*/
bool isEligibleForCheckpointRemovalAfterPersistence() const;

void createSnapshot(uint64_t snapStartSeqno,
uint64_t snapEndSeqno,
std::optional<uint64_t> highCompletedSeqno,
Expand Down
27 changes: 9 additions & 18 deletions engines/ep/src/ep_bucket.cc
Expand Up @@ -368,11 +368,11 @@ EPBucket::FlushResult EPBucket::flushVBucket(Vbid vbid) {
auto vb = getLockedVBucket(vbid, std::try_to_lock);
if (!vb.owns_lock()) {
// Try another bucket if this one is locked to avoid blocking flusher.
return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
return {MoreAvailable::Yes, 0};
}

if (!vb) {
return {MoreAvailable::No, 0, WakeCkptRemover::No};
return {MoreAvailable::No, 0};
}

return flushVBucket_UNLOCKED(std::move(vb));
Expand Down Expand Up @@ -401,15 +401,8 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
const auto moreAvailable =
toFlush.moreAvailable ? MoreAvailable::Yes : MoreAvailable::No;

// The Flusher will wake up the CheckpointRemover if necessary.
const auto wakeupCheckpointRemover =
vb->checkpointManager
->isEligibleForCheckpointRemovalAfterPersistence()
? WakeCkptRemover::Yes
: WakeCkptRemover::No;

if (toFlush.items.empty()) {
return {moreAvailable, 0, wakeupCheckpointRemover};
return {moreAvailable, 0};
}

// The range becomes initialised only when an item is flushed
Expand Down Expand Up @@ -707,7 +700,7 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {

// Just return if nothing to flush
if (!mustPersistVBState && flushBatchSize == 0) {
return {moreAvailable, 0, wakeupCheckpointRemover};
return {moreAvailable, 0};
}

if (proposedVBState.transition.state == vbucket_state_active) {
Expand Down Expand Up @@ -788,7 +781,7 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
if (!rwUnderlying->snapshotVBucket(vbid, commitData.proposedVBState)) {
flushFailureEpilogue(*vb, toFlush);

return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
return {MoreAvailable::Yes, 0};
}

// The new vbstate was the only thing to flush. All done.
Expand All @@ -798,7 +791,7 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
aggStats,
commitData.collections);

return {moreAvailable, 0, wakeupCheckpointRemover};
return {moreAvailable, 0};
}

// The flush-batch must be non-empty by logic at this point.
Expand All @@ -824,7 +817,7 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
if (!commit(*rwUnderlying, std::move(ctx), commitData)) {
flushFailureEpilogue(*vb, toFlush);

return {MoreAvailable::Yes, 0, WakeCkptRemover::No};
return {MoreAvailable::Yes, 0};
}

// Note: We want to update the snap-range only if we have flushed at least
Expand Down Expand Up @@ -880,7 +873,7 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
// Handle Seqno Persistence requests
vb->notifyHighPriorityRequests(engine, vb->getPersistenceSeqno());

return {moreAvailable, flushBatchSize, wakeupCheckpointRemover};
return {moreAvailable, flushBatchSize};
}

void EPBucket::flushSuccessEpilogue(
Expand Down Expand Up @@ -2342,9 +2335,7 @@ bool EPBucket::maybeScheduleManifestPersistence(
std::ostream& operator<<(std::ostream& os, const EPBucket::FlushResult& res) {
os << std::boolalpha << "moreAvailable:{"
<< (res.moreAvailable == EPBucket::MoreAvailable::Yes) << "} "
<< "numFlushed:{" << res.numFlushed << "} "
<< "wakeupCkptRemover:{"
<< (res.wakeupCkptRemover == EPBucket::WakeCkptRemover::Yes) << "}";
<< "numFlushed:{" << res.numFlushed << "}";
return os;
}

Expand Down
9 changes: 3 additions & 6 deletions engines/ep/src/ep_bucket.h
Expand Up @@ -50,21 +50,18 @@ class EPBucket : public KVBucket {
void deinitialize() override;

enum class MoreAvailable : uint8_t { No = 0, Yes };
enum class WakeCkptRemover : uint8_t { No = 0, Yes };

struct FlushResult {
FlushResult(MoreAvailable m, size_t n, WakeCkptRemover w)
: moreAvailable(m), wakeupCkptRemover(w), numFlushed(n) {
FlushResult(MoreAvailable m, size_t n)
: moreAvailable(m), numFlushed(n) {
}

bool operator==(const FlushResult& other) const {
return (moreAvailable == other.moreAvailable &&
numFlushed == other.numFlushed &&
wakeupCkptRemover == other.wakeupCkptRemover);
numFlushed == other.numFlushed);
}

MoreAvailable moreAvailable = MoreAvailable::No;
WakeCkptRemover wakeupCkptRemover = WakeCkptRemover::No;
size_t numFlushed = 0;
};

Expand Down
17 changes: 0 additions & 17 deletions engines/ep/src/flusher.cc
Expand Up @@ -252,9 +252,6 @@ bool Flusher::flushVB() {
doHighPriority = true;
}

const auto checkpointRemovalMode =
store->getEPEngine().getCheckpointConfig().getCheckpointRemoval();

// Flush a high priority vBucket if applicable
Vbid vbid;
if (doHighPriority && hpVbs.popFront(vbid)) {
Expand All @@ -265,13 +262,6 @@ bool Flusher::flushVB() {
hpVbs.pushUnique(vbid);
}

if (checkpointRemovalMode == CheckpointRemoval::Lazy) {
// Flushing may move the persistence cursor to a new checkpoint.
if (res.wakeupCkptRemover == EPBucket::WakeCkptRemover::Yes) {
store->wakeUpCheckpointMemRecoveryTask();
}
}

// Return false (don't re-wake) if the lpVbs is empty (i.e. nothing to
// do on our next iteration). If another vBucket joins this queue after
// then it will wake the task.
Expand All @@ -297,13 +287,6 @@ bool Flusher::flushVB() {
lpVbs.pushUnique(vbid);
}

if (checkpointRemovalMode == CheckpointRemoval::Lazy) {
// Flushing may move the persistence cursor to a new checkpoint.
if (res.wakeupCkptRemover == EPBucket::WakeCkptRemover::Yes) {
store->wakeUpCheckpointMemRecoveryTask();
}
}

// Return more (as we may have low priority vBuckets to flush)
return true;
}
Expand Down
Expand Up @@ -2040,7 +2040,6 @@ TEST_P(CollectionsEraserPersistentOnly, DropDuringFlush) {
res = epBucket.flushVBucket(vbid);
EXPECT_EQ(EPBucket::MoreAvailable::No, res.moreAvailable);
EXPECT_EQ(1, res.numFlushed);
EXPECT_EQ(EPBucket::WakeCkptRemover::No, res.wakeupCkptRemover);

std::tie(status, persistedStats) =
kvstore->getCollectionStats(vbid, CollectionEntry::dairy);
Expand Down
3 changes: 1 addition & 2 deletions engines/ep/tests/module_tests/couchstore_bucket_tests.cc
Expand Up @@ -30,7 +30,6 @@

using FlushResult = EPBucket::FlushResult;
using MoreAvailable = EPBucket::MoreAvailable;
using WakeCkptRemover = EPBucket::WakeCkptRemover;

class STParamCouchstoreBucketTest : public STParamPersistentBucketTest {};

Expand Down Expand Up @@ -147,7 +146,7 @@ TEST_P(STParamCouchstoreBucketTest, FlusherMarksCleanBySeqno) {
const auto flush = [this]() -> void {
auto& epBucket = dynamic_cast<EPBucket&>(*store);
const auto res = epBucket.flushVBucket(vbid);
EXPECT_EQ(FlushResult(MoreAvailable::No, 1, WakeCkptRemover::No), res);
EXPECT_EQ(FlushResult(MoreAvailable::No, 1), res);
};
auto flusher = std::thread(flush);

Expand Down
16 changes: 5 additions & 11 deletions engines/ep/tests/module_tests/dcp_stream_test.cc
Expand Up @@ -52,7 +52,6 @@

using FlushResult = EPBucket::FlushResult;
using MoreAvailable = EPBucket::MoreAvailable;
using WakeCkptRemover = EPBucket::WakeCkptRemover;

using namespace std::string_view_literals;

Expand Down Expand Up @@ -2979,8 +2978,7 @@ void SingleThreadedPassiveStreamTest::testConsumerReceivesUserXattrsInDelete(
}

auto& epBucket = dynamic_cast<EPBucket&>(*store);
EXPECT_EQ(FlushResult(MoreAvailable::No, 1, WakeCkptRemover::No),
epBucket.flushVBucket(vbid));
EXPECT_EQ(FlushResult(MoreAvailable::No, 1), epBucket.flushVBucket(vbid));

// Check item persisted

Expand Down Expand Up @@ -4656,8 +4654,7 @@ TEST_P(STPassiveStreamPersistentTest, VBStateNotLostAfterFlushFailure) {

// This flush fails, we have not written HCS to disk
auto& epBucket = dynamic_cast<EPBucket&>(*store);
EXPECT_EQ(FlushResult(MoreAvailable::Yes, 0, WakeCkptRemover::No),
epBucket.flushVBucket(vbid));
EXPECT_EQ(FlushResult(MoreAvailable::Yes, 0), epBucket.flushVBucket(vbid));
EXPECT_EQ(3, vb.dirtyQueueSize);
{
SCOPED_TRACE("");
Expand All @@ -4671,8 +4668,7 @@ TEST_P(STPassiveStreamPersistentTest, VBStateNotLostAfterFlushFailure) {

// This flush succeeds, we must write all the expected SnapRange info in
// vbstate on disk
EXPECT_EQ(FlushResult(MoreAvailable::No, 3, WakeCkptRemover::No),
epBucket.flushVBucket(vbid));
EXPECT_EQ(FlushResult(MoreAvailable::No, 3), epBucket.flushVBucket(vbid));
EXPECT_EQ(0, vb.dirtyQueueSize);
{
SCOPED_TRACE("");
Expand Down Expand Up @@ -4756,8 +4752,7 @@ TEST_P(STPassiveStreamPersistentTest, MB_37948) {
// the flusher wrongly uses the state on disk (state=active) for computing
// the new snapshot range to be persisted.
auto& epBucket = dynamic_cast<EPBucket&>(*store);
EXPECT_EQ(FlushResult(MoreAvailable::No, 2, WakeCkptRemover::Yes),
epBucket.flushVBucket(vbid));
EXPECT_EQ(FlushResult(MoreAvailable::No, 2), epBucket.flushVBucket(vbid));

// Before the fix this fails because we have persisted snapEnd=2
// Note: We have persisted a partial snapshot at replica, snapStart must
Expand All @@ -4772,8 +4767,7 @@ TEST_P(STPassiveStreamPersistentTest, MB_37948) {
stream->messageReceived(makeMutationConsumerMessage(
3 /*seqno*/, vbid, value, opaque)));

EXPECT_EQ(FlushResult(MoreAvailable::No, 1, WakeCkptRemover::No),
epBucket.flushVBucket(vbid));
EXPECT_EQ(FlushResult(MoreAvailable::No, 1), epBucket.flushVBucket(vbid));

checkPersistedSnapshot(3, 3);
}
Expand Down
4 changes: 0 additions & 4 deletions engines/ep/tests/module_tests/evp_store_durability_test.cc
Expand Up @@ -3285,7 +3285,6 @@ TEST_P(DurabilityEPBucketTest, MB_36739) {
auto res = dynamic_cast<EPBucket&>(*store).flushVBucket(vbid);
EXPECT_EQ(EPBucket::MoreAvailable::Yes, res.moreAvailable);
EXPECT_EQ(0, res.numFlushed);
EXPECT_EQ(EPBucket::WakeCkptRemover::No, res.wakeupCkptRemover);
EXPECT_EQ(1, stats.commitFailed);
EXPECT_EQ(1, stats.flusherCommits);
EXPECT_EQ(vbs, *store->getRWUnderlying(vbid)->getCachedVBucketState(vbid));
Expand All @@ -3294,7 +3293,6 @@ TEST_P(DurabilityEPBucketTest, MB_36739) {
res = dynamic_cast<EPBucket&>(*store).flushVBucket(vbid);
EXPECT_EQ(EPBucket::MoreAvailable::No, res.moreAvailable);
EXPECT_EQ(1, res.numFlushed);
EXPECT_EQ(EPBucket::WakeCkptRemover::No, res.wakeupCkptRemover);
EXPECT_EQ(1, engine->getEpStats().commitFailed);
EXPECT_EQ(2, engine->getEpStats().flusherCommits);

Expand Down Expand Up @@ -3326,7 +3324,6 @@ TEST_P(DurabilityCouchstoreBucketTest, MB_43964) {
auto res = dynamic_cast<EPBucket&>(*store).flushVBucket(vbid);
EXPECT_EQ(EPBucket::MoreAvailable::No, res.moreAvailable);
EXPECT_EQ(1, res.numFlushed);
EXPECT_EQ(EPBucket::WakeCkptRemover::No, res.wakeupCkptRemover);
EXPECT_EQ(2, stats.flusherCommits);

// Still 0
Expand All @@ -3341,7 +3338,6 @@ TEST_P(DurabilityCouchstoreBucketTest, MB_43964) {
res = dynamic_cast<EPBucket&>(*store).flushVBucket(vbid);
EXPECT_EQ(EPBucket::MoreAvailable::No, res.moreAvailable);
EXPECT_EQ(1, res.numFlushed);
EXPECT_EQ(EPBucket::WakeCkptRemover::No, res.wakeupCkptRemover);
EXPECT_EQ(3, stats.flusherCommits);

// Still 0
Expand Down

0 comments on commit 28c9d43

Please sign in to comment.