Skip to content

Commit

Permalink
MB-35053: Extend allowedDuplicatePrepareSeqnos to highSeqno
Browse files Browse the repository at this point in the history
When reconnecting, we can see duplicate prepares (when the intervening
commit has been deduped) *after* the hps, which were not previously
accepted by the PassiveDM. This is because the HPS is dependent on
persistence; PersistToMajority prepares "hold up" the HPS until they
have been persisted. BUT, we can still receive a duplicate for the
PersistToMaajority even if it has not been persisted.

   1           2                 3      4      5      6      7      8
 PRE(a) PRE(b, l=PersistToMaj) CMT(a) CMT(b) PRE(a) PRE(b) CMT(a) CMT(b)

If the replica received the following (but hasn't persisted anything
yet)

 PRE(a) PRE(b, l=PersistToMaj)
   ^
   |
  HPS = 1
then disconnected, and reconnected and saw

                                             PRE(a) PRE(b) CMT(a) CMT(b)

The replica has then seen a duplicate prepare that needs to replace the
prepare with seqno 2 - and 2 > HPS.

Therefore, the replica should permit duplicate prepares for any seqno
meeting the following:

HighCompletedSeqno < seqno <= highSeqno

No prepare prior to the HCS should be replaced, because they have been
completed - we have received a commit/abort for them.

Prepares with seqno > highSeqno are in the "future" - they were
received after the snapshot marker, and are not expected to be duped.

Change-Id: I32ff808b0538f245ba9fcf45859ab186e5854de6
Reviewed-on: http://review.couchbase.org/112054
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
jameseh96 committed Jul 17, 2019
1 parent 70a4d0a commit 9bfe8e2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 13 deletions.
8 changes: 4 additions & 4 deletions engines/ep/src/vbucket.cc
Expand Up @@ -3908,14 +3908,14 @@ void VBucket::setDuplicateSyncWriteWindow(uint64_t highSeqno) {
void VBucket::setUpAllowedDuplicatePrepareWindow() {
auto& dm = getDurabilityMonitor();
auto hcs = dm.getHighCompletedSeqno();
auto hps = dm.getHighPreparedSeqno();
Expects(hcs <= hps);
auto highSeqno = getHighSeqno();
Expects(hcs <= highSeqno);

int64_t newDuplicateCount = hps - hcs;
int64_t newDuplicateCount = highSeqno - hcs;
allowedDuplicatePrepareSeqnos.reserve(allowedDuplicatePrepareSeqnos.size() +
newDuplicateCount);

for (int64_t dupSeqno = hcs + 1; dupSeqno <= hps; dupSeqno++) {
for (int64_t dupSeqno = hcs + 1; dupSeqno <= highSeqno; dupSeqno++) {
allowedDuplicatePrepareSeqnos.insert(dupSeqno);
}
}
Expand Down
35 changes: 29 additions & 6 deletions engines/ep/tests/module_tests/dcp_durability_stream_test.cc
Expand Up @@ -1222,7 +1222,10 @@ TEST_P(DurabilityPassiveStreamPersistentTest, DurabilityFence) {
}

queued_item DurabilityPassiveStreamTest::makeAndReceiveDcpPrepare(
const StoredDocKey& key, uint64_t cas, uint64_t seqno) {
const StoredDocKey& key,
uint64_t cas,
uint64_t seqno,
cb::durability::Level level) {
using namespace cb::durability;

// The consumer receives snapshot-marker [seqno, seqno]
Expand All @@ -1237,7 +1240,7 @@ queued_item DurabilityPassiveStreamTest::makeAndReceiveDcpPrepare(
stream->processMarker(&marker);

queued_item qi = makePendingItem(
key, "value", Requirements(Level::Majority, Timeout::Infinity()));
key, "value", Requirements(level, Timeout::Infinity()));
qi->setBySeqno(seqno);
qi->setCas(cas);

Expand Down Expand Up @@ -1387,19 +1390,39 @@ void DurabilityPassiveStreamTest::testReceiveMultipleDuplicateDcpPrepares() {
// PRE1 PRE2 PRE3 ||Disconnect|| PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
// All 3 duplicate prepares should be accepted by
// allowedDuplicatePrepareSeqnos

// NB: A mix of prepares levels is used intentionally - they allow
// us to test that duplicate prepares are permitted:
// - regardless of level of the replaced prepare
// - regardless of persistence of the replaced prepare
// - regardless of the HPS
// They should only be rejected if they would replace a prepare with a seqno
// outside the "allowed window". This window is specified as the range
// [highCompletedSeqno+1, highSeqno] at the time the snapshot marker
// is received.
// No prepare with seqno <= highCompletedSeqno should ever be replaced,
// because it has already been completed and should not be being tracked any
// more No prepare with seqno > highSeqno (latest seqno seen by VB) should
// be replaced, because these were received *after* the snapshot marker.
const uint64_t cas = 999;
uint64_t seqno = 1;
std::vector<StoredDocKey> keys = {makeStoredDocKey("key1"),
makeStoredDocKey("key2"),
makeStoredDocKey("key3")};

// Do first prepare for each of three keys
// Send the first prepare for each of three keys
// PRE1 PRE2 PRE3 CMT1 CMT2 CMT3 PRE1 PRE2 PRE3 CMT1 CMT2 CMT3
// ^^^^ ^^^^ ^^^^
std::vector<queued_item> queued_items;
for (const auto& key : keys) {
queued_items.push_back(makeAndReceiveDcpPrepare(key, cas, seqno++));
}
queued_items.push_back(makeAndReceiveDcpPrepare(
keys[0], cas, seqno++, cb::durability::Level::Majority));
queued_items.push_back(makeAndReceiveDcpPrepare(
keys[1],
cas,
seqno++,
cb::durability::Level::MajorityAndPersistOnMaster));
queued_items.push_back(makeAndReceiveDcpPrepare(
keys[2], cas, seqno++, cb::durability::Level::PersistToMajority));

// The consumer now "disconnects" then "re-connects" and misses the commits
// at seqnos 4, 5, 6.
Expand Down
8 changes: 5 additions & 3 deletions engines/ep/tests/module_tests/dcp_durability_stream_test.h
Expand Up @@ -63,9 +63,11 @@ class DurabilityPassiveStreamTest : public SingleThreadedPassiveStreamTest {
* processes it on the DCP stream.
* Returns the SyncWrite prepare item.
*/
queued_item makeAndReceiveDcpPrepare(const StoredDocKey& key,
uint64_t cas,
uint64_t seqno);
queued_item makeAndReceiveDcpPrepare(
const StoredDocKey& key,
uint64_t cas,
uint64_t seqno,
cb::durability::Level level = cb::durability::Level::Majority);

/*
* Simulates a Replica receiving a DCP_PREPARE and checks that it is
Expand Down

0 comments on commit 9bfe8e2

Please sign in to comment.