diff --git a/engines/ep/src/dcp/passive_stream.cc b/engines/ep/src/dcp/passive_stream.cc index 24b4de4f7f..d13a154f0f 100644 --- a/engines/ep/src/dcp/passive_stream.cc +++ b/engines/ep/src/dcp/passive_stream.cc @@ -16,6 +16,7 @@ #include "collections/vbucket_manifest_handles.h" #include "dcp/consumer.h" #include "dcp/response.h" +#include "durability/durability_monitor.h" #include "ep_engine.h" #include "failover-table.h" #include "kv_bucket.h" @@ -189,6 +190,26 @@ void PassiveStream::acceptStream(cb::mcbp::Status status, uint32_t add_opaque) { return; } + // We use the cur_snapshot_prepare member to determine if we should + // notify the PDM of any Memory snapshots. It is set when we see a + // prepare in any snapshot. Consider the following snapshot: + // + // [1:Prepare(A), 2:Mutation(B)] Type = Memory + // + // If we have only received and persisted the following sequence of events + // but then restart, we would fail to notify the PDM of the complete + // snapshot: + // + // 1) SnapshotMarker (1-2) Type = Memory + // 2) Prepare (1) <- Persisted to disk + // + // To solve this, we can fix the cur_snapshot_prepare state on + // PassiveStream acceptance. The PDM already avoids acking back the same + // seqno, so notifying an extra snapshot shouldn't matter, and even if we + // did ack back the same seqno, the ADM should already deal with weakly + // monotonic acks as we ack back the HPS on stream connection. + cur_snapshot_prepare = true; + // SyncReplication: About to commence accepting data on this stream. Check // if the associated consumer supports SyncReplication, so we can later // correctly process Snapshot Markers. diff --git a/engines/ep/tests/module_tests/dcp_durability_stream_test.cc b/engines/ep/tests/module_tests/dcp_durability_stream_test.cc index e18c111c75..cb5a7634f8 100644 --- a/engines/ep/tests/module_tests/dcp_durability_stream_test.cc +++ b/engines/ep/tests/module_tests/dcp_durability_stream_test.cc @@ -1067,6 +1067,102 @@ void DurabilityPassiveStreamTest::TearDown() { SingleThreadedPassiveStreamTest::TearDown(); } +TEST_P(DurabilityPassiveStreamPersistentTest, + RestartMidSnapshotAfterPrepareMovesHPSOnReconnect) { + // 1) SnapshotMarker 1 - 2 Memory + auto opaque = 0; + SnapshotMarker marker( + opaque, + vbid, + 1 /*snapStart*/, + 2 /*snapEnd*/, + dcp_marker_flag_t::MARKER_FLAG_MEMORY | MARKER_FLAG_CHK, + {} /*HCS*/, + {} /*maxVisibleSeqno*/, + {}, // timestamp + {} /*streamId*/); + stream->processMarker(&marker); + + // 2) Prepare at 1 + using namespace cb::durability; + auto prepare = + makePendingItem(makeStoredDocKey("prepare"), + "value", + Requirements(Level::Majority, Timeout::Infinity())); + prepare->setBySeqno(1); + prepare->setCas(1); + + ASSERT_EQ(cb::engine_errc::success, + stream->messageReceived(std::make_unique( + prepare, + stream->getOpaque(), + IncludeValue::Yes, + IncludeXattrs::Yes, + IncludeDeleteTime::No, + IncludeDeletedUserXattrs::Yes, + DocKeyEncodesCollectionId::No, + nullptr, + cb::mcbp::DcpStreamId{}))); + + // 3) Persist prepare + flushVBucketToDiskIfPersistent(vbid, 1); + + // HPS not moved as we don't have a complete snapshot + ASSERT_EQ(0, store->getVBucket(vbid)->getHighPreparedSeqno()); + auto res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid); + ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status); + ASSERT_EQ(0, res.state.highPreparedSeqno); + + // 4) Restart + consumer->closeAllStreams(); + consumer.reset(); + + resetEngineAndWarmup(); + + // HPS state not change + ASSERT_EQ(0, store->getVBucket(vbid)->getHighPreparedSeqno()); + res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid); + ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status); + ASSERT_EQ(0, res.state.highPreparedSeqno); + + // 5) Recreate our stream + consumer = + std::make_shared(*engine, cookie, "test_consumer"); + consumer->enableSyncReplication(); + consumer->addStream(opaque, vbid, 0 /*flags*/); + stream = static_cast( + (consumer->getVbucketStream(vbid)).get()); + stream->acceptStream(cb::mcbp::Status::Success, opaque); + + // 6) Same marker as before + stream->processMarker(&marker); + + // 7) Mutation at 2 + auto mutation = makeCommittedItem(makeStoredDocKey("mutation"), "value"); + mutation->setBySeqno(2); + mutation->setCas(2); + + ASSERT_EQ(cb::engine_errc::success, + stream->messageReceived(std::make_unique( + mutation, + stream->getOpaque(), + IncludeValue::Yes, + IncludeXattrs::Yes, + IncludeDeleteTime::No, + IncludeDeletedUserXattrs::Yes, + DocKeyEncodesCollectionId::No, + nullptr, + cb::mcbp::DcpStreamId{}))); + flushVBucketToDiskIfPersistent(vbid, 1); + + // Prepare at 1 was Majority level so we should be able to move our HPS now + EXPECT_EQ(1, store->getVBucket(vbid)->getHighPreparedSeqno()); + + res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid); + ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status); + EXPECT_EQ(1, res.state.highPreparedSeqno); +} + TEST_P(DurabilityPassiveStreamTest, SendSeqnoAckOnStreamAcceptance) { // 1) Put something in the vBucket as we won't send a seqno ack if there are // no items