Skip to content

Commit

Permalink
MB-50874: Merge branch 'mad-hatter' into cheshire-cat
Browse files Browse the repository at this point in the history
* mad-hatter:
  MB-50874: Reset snap start if less than lastSeqno on new checkpoint
  creation

Change-Id: Ib841b66d79b59b1fc1e906a5c3ba122491f10efc
  • Loading branch information
daverigby committed Feb 17, 2022
2 parents fce037a + bfa0dd8 commit db53ff0
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 7 deletions.
4 changes: 2 additions & 2 deletions engines/ep/src/checkpoint_config.h
Expand Up @@ -56,6 +56,8 @@ class CheckpointConfig {
return persistenceEnabled;
}

static void addConfigChangeListener(EventuallyPersistentEngine& engine);

protected:
friend class CheckpointConfigChangeListener;
friend class EventuallyPersistentEngine;
Expand All @@ -76,8 +78,6 @@ class CheckpointConfig {
keepClosedCheckpoints = value;
}

static void addConfigChangeListener(EventuallyPersistentEngine& engine);

private:
class ChangeListener;

Expand Down
32 changes: 27 additions & 5 deletions engines/ep/src/checkpoint_manager.cc
Expand Up @@ -1218,6 +1218,11 @@ size_t CheckpointManager::getNumOpenChkItems() const {
return getOpenCheckpoint_UNLOCKED(lh).getNumItems();
}

size_t CheckpointManager::getNumCheckpoints() const {
LockHolder lh(queueLock);
return checkpointList.size();
}

void CheckpointManager::checkOpenCheckpoint_UNLOCKED(const LockHolder& lh,
bool forceCreation,
bool timeBound) {
Expand Down Expand Up @@ -1424,13 +1429,30 @@ uint64_t CheckpointManager::createNewCheckpoint(bool force) {
LockHolder lh(queueLock);

const auto& openCkpt = getOpenCheckpoint_UNLOCKED(lh);

if (openCkpt.getNumItems() == 0 && !force) {
return openCkpt.getId();
if (openCkpt.getNumItems() > 0 || force) {
addNewCheckpoint_UNLOCKED(openCkpt.getId() + 1);
}

addNewCheckpoint_UNLOCKED(openCkpt.getId() + 1);
return getOpenCheckpointId_UNLOCKED(lh);
auto& openCkpt2 = getOpenCheckpoint_UNLOCKED(lh);

/* MB-50874: Ensure that the snapshot start of our newly-active
* checkpoint is not greater than CheckpointManager::lastBySeqno.
* Note in Neo this issue no longer occurs as the snap_start is sent
* correctly - see MB-50333.
*/
if (static_cast<uint64_t>(lastBySeqno) <
openCkpt2.getSnapshotStartSeqno()) {
EP_LOG_INFO(
"CheckpointManager::createNewCheckpoint(): {} Found "
"lastBySeqno:{} less than snapStart:{}, adjusting snapStart to lastBySeqno + 1",
vbucketId,
lastBySeqno,
openCkpt2.getSnapshotStartSeqno(),
lastBySeqno + 1);
openCkpt2.setSnapshotStartSeqno(lastBySeqno + 1);
}

return openCkpt2.getId();
}

size_t CheckpointManager::getMemoryUsage_UNLOCKED() const {
Expand Down
3 changes: 3 additions & 0 deletions engines/ep/src/checkpoint_manager.h
Expand Up @@ -329,6 +329,9 @@ class CheckpointManager {
*/
size_t getNumOpenChkItems() const;

/// @returns the number of Checkpoints this Manager has.
size_t getNumCheckpoints() const;

/* WARNING! This method can return inaccurate counts - see MB-28431. It
* at *least* can suffer from overcounting by at least 1 (in scenarios as
* yet not clear).
Expand Down
1 change: 1 addition & 0 deletions engines/ep/tests/mock/mock_synchronous_ep_engine.cc
Expand Up @@ -67,6 +67,7 @@ SynchronousEPEngine::SynchronousEPEngine(const cb::ArenaMallocClient& client,

// checkpointConfig is needed by CheckpointManager (via EPStore).
checkpointConfig = std::make_unique<CheckpointConfig>(*this);
CheckpointConfig::addConfigChangeListener(*this);

// Simplified setup for switching FlowControl on/off
if (configuration.getDcpFlowControlPolicy() == "none") {
Expand Down
72 changes: 72 additions & 0 deletions engines/ep/tests/module_tests/dcp_reflection_test.cc
Expand Up @@ -153,6 +153,9 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {

void transferResponseMessage();

/// Inject a CloseStream message into the consumer side of the route.
void closeStreamAtConsumer();

std::pair<ActiveStream*, MockPassiveStream*> getStreams();

Vbid vbid;
Expand Down Expand Up @@ -549,6 +552,10 @@ void DCPLoopbackStreamTest::DcpRoute::transferResponseMessage() {
}
}

void DCPLoopbackStreamTest::DcpRoute::closeStreamAtConsumer() {
this->consumer->closeStream(0, vbid, {});
}

std::pair<cb::engine_errc, uint64_t>
DCPLoopbackStreamTest::DcpRoute::doStreamRequest(int flags) {
// Do the add_stream
Expand Down Expand Up @@ -1061,6 +1068,71 @@ TEST_F(DCPLoopbackStreamTest, MB_36948_SnapshotEndsOnPrepare) {
EXPECT_EQ(2, replicaVB->checkpointManager->getVisibleSnapshotEndSeqno());
}

/**
* Regression test for mB-50874 - a scenario where a replica:
* 1. receives a DCP snapshot marker which has the first seqno de-duplicated
* 2. DCP stream is closed (e.g. ns_server failing over the active)
* 3. vbucket is promoted to active
*
* This results in a Checkpoint where the snapshot start - updated from
* SnapshotMarker at (1) - is greater than the lastBySeqno and this ends
* up throwing an exception in the Flusher when we next persist anything.
*/
TEST_F(DCPLoopbackStreamTest, MB50874_DeDuplicatedMutationsReplicaToActive) {
// We need a new checkpoint (MARKER_FLAG_CHK set) when the active node
// generates markers - reduce chkMaxItems to the minimum to simplify this.
engines[Node0]->getConfiguration().setChkMaxItems(MIN_CHECKPOINT_ITEMS);

// Setup - fill up the initial checkpoint, with items, so when we
// queue the next mutations a new checkpoints is created.
for (int i = 0; i < MIN_CHECKPOINT_ITEMS; i++) {
auto key = makeStoredDocKey("key_" + std::to_string(i));
ASSERT_EQ(cb::engine_errc::success, storeSet(key));
}
auto srcVB = engines[Node0]->getVBucket(vbid);
ASSERT_EQ(1, srcVB->checkpointManager->getNumCheckpoints());

// Now modify one more key, which should create a new Checkpoint.
auto key = makeStoredDocKey("deduplicated_key");
ASSERT_EQ(cb::engine_errc::success, storeSet(key));
// ... and modify again so we de-duplicate and have a seqno gap.
ASSERT_EQ(cb::engine_errc::success, storeSet(key));

// Sanity check our state - should have a 2nd checkpoint now.
ASSERT_EQ(2, srcVB->checkpointManager->getNumCheckpoints());

// Create a DCP connection between node0 and 1, and stream the initial
// marker and the 10 mutations.
auto route0_1 = createDcpRoute(Node0, Node1);
ASSERT_EQ(cb::engine_errc::success, route0_1.doStreamRequest().first);
route0_1.transferSnapshotMarker(
0, 10, MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);
for (int i = 0; i < MIN_CHECKPOINT_ITEMS; i++) {
route0_1.transferMessage(DcpResponse::Event::Mutation);
}

// Test - transfer the snapshot marker (but no mutations), then close stream
// and promote to active; and try to accept a new mutation.
route0_1.transferSnapshotMarker(
12, 12, MARKER_FLAG_MEMORY | MARKER_FLAG_CHK);

route0_1.closeStreamAtConsumer();
engines[Node1]->getKVBucket()->setVBucketState(vbid, vbucket_state_active);

// Prior to the fix, this check fails.
auto& dstCkptMgr = *engines[Node1]->getVBucket(vbid)->checkpointManager;
EXPECT_LE(dstCkptMgr.getOpenSnapshotStartSeqno(),
dstCkptMgr.getHighSeqno() + 1)
<< "Checkpoint start should be less than or equal to next seqno to "
"be assigned (highSeqno + 1)";

// Prior to the fix, this throws std::logic_error from
// CheckpointManager::queueDirty as lastBySeqno is outside snapshot range.
EXPECT_EQ(cb::engine_errc::success,
engines[Node1]->getKVBucket()->set(
*makeCommittedItem(key, "value"), cookie));
}

class DCPLoopbackSnapshots : public DCPLoopbackStreamTest,
public ::testing::WithParamInterface<int> {
public:
Expand Down

0 comments on commit db53ff0

Please sign in to comment.