Skip to content

Commit

Permalink
Merge "Merge branch 'neo'"
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit Code Review committed Jun 13, 2022
2 parents c2706f4 + 1de7900 commit ec1e88f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 0 deletions.
21 changes: 21 additions & 0 deletions engines/ep/src/dcp/passive_stream.cc
Expand Up @@ -16,6 +16,7 @@
#include "collections/vbucket_manifest_handles.h"
#include "dcp/consumer.h"
#include "dcp/response.h"
#include "durability/durability_monitor.h"
#include "ep_engine.h"
#include "failover-table.h"
#include "kv_bucket.h"
Expand Down Expand Up @@ -189,6 +190,26 @@ void PassiveStream::acceptStream(cb::mcbp::Status status, uint32_t add_opaque) {
return;
}

// We use the cur_snapshot_prepare member to determine if we should
// notify the PDM of any Memory snapshots. It is set when we see a
// prepare in any snapshot. Consider the following snapshot:
//
// [1:Prepare(A), 2:Mutation(B)] Type = Memory
//
// If we have only received and persisted the following sequence of events
// but then restart, we would fail to notify the PDM of the complete
// snapshot:
//
// 1) SnapshotMarker (1-2) Type = Memory
// 2) Prepare (1) <- Persisted to disk
//
// To solve this, we can fix the cur_snapshot_prepare state on
// PassiveStream acceptance. The PDM already avoids acking back the same
// seqno, so notifying an extra snapshot shouldn't matter, and even if we
// did ack back the same seqno, the ADM should already deal with weakly
// monotonic acks as we ack back the HPS on stream connection.
cur_snapshot_prepare = true;

// SyncReplication: About to commence accepting data on this stream. Check
// if the associated consumer supports SyncReplication, so we can later
// correctly process Snapshot Markers.
Expand Down
96 changes: 96 additions & 0 deletions engines/ep/tests/module_tests/dcp_durability_stream_test.cc
Expand Up @@ -1067,6 +1067,102 @@ void DurabilityPassiveStreamTest::TearDown() {
SingleThreadedPassiveStreamTest::TearDown();
}

TEST_P(DurabilityPassiveStreamPersistentTest,
RestartMidSnapshotAfterPrepareMovesHPSOnReconnect) {
// 1) SnapshotMarker 1 - 2 Memory
auto opaque = 0;
SnapshotMarker marker(
opaque,
vbid,
1 /*snapStart*/,
2 /*snapEnd*/,
dcp_marker_flag_t::MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
{} /*HCS*/,
{} /*maxVisibleSeqno*/,
{}, // timestamp
{} /*streamId*/);
stream->processMarker(&marker);

// 2) Prepare at 1
using namespace cb::durability;
auto prepare =
makePendingItem(makeStoredDocKey("prepare"),
"value",
Requirements(Level::Majority, Timeout::Infinity()));
prepare->setBySeqno(1);
prepare->setCas(1);

ASSERT_EQ(cb::engine_errc::success,
stream->messageReceived(std::make_unique<MutationConsumerMessage>(
prepare,
stream->getOpaque(),
IncludeValue::Yes,
IncludeXattrs::Yes,
IncludeDeleteTime::No,
IncludeDeletedUserXattrs::Yes,
DocKeyEncodesCollectionId::No,
nullptr,
cb::mcbp::DcpStreamId{})));

// 3) Persist prepare
flushVBucketToDiskIfPersistent(vbid, 1);

// HPS not moved as we don't have a complete snapshot
ASSERT_EQ(0, store->getVBucket(vbid)->getHighPreparedSeqno());
auto res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid);
ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status);
ASSERT_EQ(0, res.state.highPreparedSeqno);

// 4) Restart
consumer->closeAllStreams();
consumer.reset();

resetEngineAndWarmup();

// HPS state not change
ASSERT_EQ(0, store->getVBucket(vbid)->getHighPreparedSeqno());
res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid);
ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status);
ASSERT_EQ(0, res.state.highPreparedSeqno);

// 5) Recreate our stream
consumer =
std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
consumer->enableSyncReplication();
consumer->addStream(opaque, vbid, 0 /*flags*/);
stream = static_cast<MockPassiveStream*>(
(consumer->getVbucketStream(vbid)).get());
stream->acceptStream(cb::mcbp::Status::Success, opaque);

// 6) Same marker as before
stream->processMarker(&marker);

// 7) Mutation at 2
auto mutation = makeCommittedItem(makeStoredDocKey("mutation"), "value");
mutation->setBySeqno(2);
mutation->setCas(2);

ASSERT_EQ(cb::engine_errc::success,
stream->messageReceived(std::make_unique<MutationConsumerMessage>(
mutation,
stream->getOpaque(),
IncludeValue::Yes,
IncludeXattrs::Yes,
IncludeDeleteTime::No,
IncludeDeletedUserXattrs::Yes,
DocKeyEncodesCollectionId::No,
nullptr,
cb::mcbp::DcpStreamId{})));
flushVBucketToDiskIfPersistent(vbid, 1);

// Prepare at 1 was Majority level so we should be able to move our HPS now
EXPECT_EQ(1, store->getVBucket(vbid)->getHighPreparedSeqno());

res = store->getRWUnderlying(vbid)->getPersistedVBucketState(vbid);
ASSERT_EQ(KVStoreIface::ReadVBStateStatus::Success, res.status);
EXPECT_EQ(1, res.state.highPreparedSeqno);
}

TEST_P(DurabilityPassiveStreamTest, SendSeqnoAckOnStreamAcceptance) {
// 1) Put something in the vBucket as we won't send a seqno ack if there are
// no items
Expand Down

0 comments on commit ec1e88f

Please sign in to comment.