From 8c3189f72887d19ab3d491a43fc627b5b5ada258 Mon Sep 17 00:00:00 2001 From: Paolo Cocchi Date: Tue, 19 Jan 2021 14:57:37 +0100 Subject: [PATCH] MB-43205: Consumer enforces allowSanitizeValueInDeletion at DCP_PREPARE At DCP_PREPARE, if the sanitizer is enabled then the consumer removes any invalid body in the payload for SyncDelete. ENGINE_EINVAL is returned otherwise. Change-Id: I33970e329517101ad6e7f93c90fca5e509cd2176 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143799 Well-Formed: Build Bot Reviewed-by: Dave Rigby Tested-by: Build Bot --- engines/ep/src/dcp/consumer.cc | 15 +- .../ep/tests/module_tests/dcp_stream_test.cc | 130 +++++++++++++----- .../ep/tests/module_tests/dcp_stream_test.h | 6 +- 3 files changed, 108 insertions(+), 43 deletions(-) diff --git a/engines/ep/src/dcp/consumer.cc b/engines/ep/src/dcp/consumer.cc index b607626132..60f1529e07 100644 --- a/engines/ep/src/dcp/consumer.cc +++ b/engines/ep/src/dcp/consumer.cc @@ -1757,11 +1757,16 @@ ENGINE_ERROR_CODE DcpConsumer::prepare(uint32_t opaque, datatype, {reinterpret_cast(value.data()), value.size()}) > 0) { - logger->warn( - "DcpConsumer::prepare: ({}) Value cannot contain a body " - "for SyncDelete", - vbucket); - return ENGINE_EINVAL; + if (!allowSanitizeValueInDeletion) { + logger->error( + "DcpConsumer::prepare: ({}) Value cannot contain a " + "body " + "for SyncDelete", + vbucket); + return ENGINE_EINVAL; + } + + item->removeBody(); } } diff --git a/engines/ep/tests/module_tests/dcp_stream_test.cc b/engines/ep/tests/module_tests/dcp_stream_test.cc index 30a197dbb7..f4edf8116c 100644 --- a/engines/ep/tests/module_tests/dcp_stream_test.cc +++ b/engines/ep/tests/module_tests/dcp_stream_test.cc @@ -3535,70 +3535,122 @@ TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInSyncDeletion) { testConsumerRejectsBodyInDeletion(cb::durability::Requirements()); } -void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() { +void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion( + const boost::optional& durReqs) { ASSERT_TRUE(consumer->isAllowSanitizeValueInDeletion()); consumer->public_setIncludeDeletedUserXattrs(IncludeDeletedUserXattrs::Yes); + auto& vb = *store->getVBucket(vbid); + ASSERT_EQ(0, vb.getHighSeqno()); + + const uint64_t initialEndSeqno = 10; EXPECT_EQ(ENGINE_SUCCESS, consumer->snapshotMarker(1 /*opaque*/, vbid, 1 /*startSeqno*/, - 10 /*endSeqno*/, + initialEndSeqno, MARKER_FLAG_CHK, {} /*HCS*/, {} /*maxVisibleSeqno*/)); + const auto key = makeStoredDocKey("key"); + const auto verifyDCPSuccess = [this, &key, &durReqs]( + const cb::const_byte_buffer& value, + protocol_binary_datatype_t datatype, + int64_t bySeqno) -> void { + const uint32_t opaque = 1; + if (durReqs) { + EXPECT_EQ(ENGINE_SUCCESS, + consumer->prepare(opaque, + key, + value, + 0 /*priv_bytes*/, + datatype, + 0 /*cas*/, + vbid, + 0 /*flags*/, + bySeqno, + 0 /*revSeqno*/, + 0 /*exp*/, + 0 /*lockTime*/, + 0 /*nru*/, + DocumentState::Deleted, + durReqs->getLevel())); + } else { + EXPECT_EQ(ENGINE_SUCCESS, + consumer->deletion(opaque, + key, + value, + 0 /*priv_bytes*/, + datatype, + 0 /*cas*/, + vbid, + bySeqno, + 0 /*revSeqno*/, + {} /*meta*/)); + } + }; + // Build up a value with just raw body and verify that DCP deletion succeeds // and the Body has been removed from the payload. const std::string body = "body"; cb::const_byte_buffer value{reinterpret_cast(body.data()), body.size()}; - const uint32_t opaque = 1; - const auto key = makeStoredDocKey("key"); - EXPECT_EQ(ENGINE_SUCCESS, - consumer->deletion(opaque, - key, - value, - 0 /*priv_bytes*/, - PROTOCOL_BINARY_RAW_BYTES, - 0 /*cas*/, - vbid, - 1 /*bySeqno*/, - 0 /*revSeqno*/, - {} /*meta*/)); - // Verify that the body has been removed - auto& vb = *store->getVBucket(vbid); + { + SCOPED_TRACE(""); + verifyDCPSuccess(value, PROTOCOL_BINARY_RAW_BYTES, 1 /*bySeqno*/); + } auto& ht = vb.ht; { - const auto& res = ht.findForUpdate(key); - const auto* sv = res.committed; + auto res = ht.findForUpdate(key); + const auto* sv = durReqs ? res.pending.getSV() : res.committed; EXPECT_TRUE(sv); EXPECT_EQ(1, sv->getBySeqno()); - EXPECT_FALSE(sv->getValue().get()); + if (durReqs) { + EXPECT_EQ(0, sv->getValue()->valueSize()); + } else { + // Note: Normal deletion with 0-size value goes through DelWithMeta + // that sets the value to nullptr. + EXPECT_FALSE(sv->getValue().get()); + } + } + + int64_t nextSeqno = 2; + if (durReqs) { + // Need to commit the first prepare for queuing a new one in the next + // steps. + EXPECT_EQ(ENGINE_SUCCESS, + vb.commit(key, 1, {}, vb.lockCollections(key))); + // Replica doesn't like 2 prepares for the same key into the same + // checkpoint. + const int64_t newStartSeqno = initialEndSeqno + 1; + EXPECT_EQ(ENGINE_SUCCESS, + consumer->snapshotMarker(1 /*opaque*/, + vbid, + newStartSeqno, + newStartSeqno + 10 /*endSeqno*/, + MARKER_FLAG_CHK, + {} /*HCS*/, + {} /*maxVisibleSeqno*/)); + nextSeqno = newStartSeqno; } // Verify the same for body + user-xattrs + sys-xattrs const auto xattrValue = createXattrValue(body); value = {reinterpret_cast(xattrValue.data()), xattrValue.size()}; - EXPECT_EQ(ENGINE_SUCCESS, - consumer->deletion(opaque, - key, - value, - 0 /*priv_bytes*/, - PROTOCOL_BINARY_RAW_BYTES | - PROTOCOL_BINARY_DATATYPE_XATTR, - 0 /*cas*/, - vbid, - 2 /*bySeqno*/, - 0 /*revSeqno*/, - {} /*meta*/)); - { - const auto& res = ht.findForUpdate(key); - const auto* sv = res.committed; + SCOPED_TRACE(""); + verifyDCPSuccess( + value, + PROTOCOL_BINARY_RAW_BYTES | PROTOCOL_BINARY_DATATYPE_XATTR, + nextSeqno); + } + { + auto res = ht.findForUpdate(key); + const auto* sv = durReqs ? res.pending.getSV() : res.committed; EXPECT_TRUE(sv); - EXPECT_EQ(2, sv->getBySeqno()); + EXPECT_EQ(nextSeqno, sv->getBySeqno()); EXPECT_TRUE(sv->getValue().get()); EXPECT_LT(sv->getValue()->valueSize(), xattrValue.size()); @@ -3618,7 +3670,11 @@ void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() { } TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInDeletion) { - testConsumerSanitizesBodyInDeletion(); + testConsumerSanitizesBodyInDeletion({}); +} + +TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInSyncDeletion) { + testConsumerSanitizesBodyInDeletion(cb::durability::Requirements()); } void SingleThreadedPassiveStreamTest::testConsumerReceivesUserXattrsInDelete( diff --git a/engines/ep/tests/module_tests/dcp_stream_test.h b/engines/ep/tests/module_tests/dcp_stream_test.h index ea6c6d36f3..7fc14e3149 100644 --- a/engines/ep/tests/module_tests/dcp_stream_test.h +++ b/engines/ep/tests/module_tests/dcp_stream_test.h @@ -161,8 +161,12 @@ class SingleThreadedPassiveStreamTest /** * Test that the Consumer removes the body (if any) in deletion's value. + * That is the behaviour when (allowSanitizeValueInDeletion = true). + * + * @param durReqs (optional) The Dur Reqs, if we are testing SyncDelete */ - void testConsumerSanitizesBodyInDeletion(); + void testConsumerSanitizesBodyInDeletion( + const boost::optional& durReqs); /** * Test that the Consumer accepts user-xattrs in deletion for enabled