Skip to content

Commit

Permalink
Merge "Merge mad-hatter into master"
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit Code Review committed Jan 26, 2021
2 parents d7081bc + 5f5c82f commit 322ae5d
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 43 deletions.
15 changes: 10 additions & 5 deletions engines/ep/src/dcp/consumer.cc
Expand Up @@ -1799,11 +1799,16 @@ ENGINE_ERROR_CODE DcpConsumer::prepare(uint32_t opaque,
datatype,
{reinterpret_cast<const char*>(value.data()),
value.size()}) > 0) {
logger->warn(
"DcpConsumer::prepare: ({}) Value cannot contain a body "
"for SyncDelete",
vbucket);
return ENGINE_EINVAL;
if (!allowSanitizeValueInDeletion) {
logger->error(
"DcpConsumer::prepare: ({}) Value cannot contain a "
"body "
"for SyncDelete",
vbucket);
return ENGINE_EINVAL;
}

item->removeBody();
}
}

Expand Down
130 changes: 93 additions & 37 deletions engines/ep/tests/module_tests/dcp_stream_test.cc
Expand Up @@ -2779,70 +2779,122 @@ TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInSyncDeletion) {
testConsumerRejectsBodyInDeletion(cb::durability::Requirements());
}

void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() {
void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion(
const std::optional<cb::durability::Requirements>& durReqs) {
ASSERT_TRUE(consumer->isAllowSanitizeValueInDeletion());
consumer->public_setIncludeDeletedUserXattrs(IncludeDeletedUserXattrs::Yes);

auto& vb = *store->getVBucket(vbid);
ASSERT_EQ(0, vb.getHighSeqno());

const uint64_t initialEndSeqno = 10;
EXPECT_EQ(ENGINE_SUCCESS,
consumer->snapshotMarker(1 /*opaque*/,
vbid,
1 /*startSeqno*/,
10 /*endSeqno*/,
initialEndSeqno,
MARKER_FLAG_CHK,
{} /*HCS*/,
{} /*maxVisibleSeqno*/));

const auto key = makeStoredDocKey("key");
const auto verifyDCPSuccess = [this, &key, &durReqs](
const cb::const_byte_buffer& value,
protocol_binary_datatype_t datatype,
int64_t bySeqno) -> void {
const uint32_t opaque = 1;
if (durReqs) {
EXPECT_EQ(ENGINE_SUCCESS,
consumer->prepare(opaque,
key,
value,
0 /*priv_bytes*/,
datatype,
0 /*cas*/,
vbid,
0 /*flags*/,
bySeqno,
0 /*revSeqno*/,
0 /*exp*/,
0 /*lockTime*/,
0 /*nru*/,
DocumentState::Deleted,
durReqs->getLevel()));
} else {
EXPECT_EQ(ENGINE_SUCCESS,
consumer->deletion(opaque,
key,
value,
0 /*priv_bytes*/,
datatype,
0 /*cas*/,
vbid,
bySeqno,
0 /*revSeqno*/,
{} /*meta*/));
}
};

// Build up a value with just raw body and verify that DCP deletion succeeds
// and the Body has been removed from the payload.
const std::string body = "body";
cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(body.data()),
body.size()};
const uint32_t opaque = 1;
const auto key = makeStoredDocKey("key");
EXPECT_EQ(ENGINE_SUCCESS,
consumer->deletion(opaque,
key,
value,
0 /*priv_bytes*/,
PROTOCOL_BINARY_RAW_BYTES,
0 /*cas*/,
vbid,
1 /*bySeqno*/,
0 /*revSeqno*/,
{} /*meta*/));
// Verify that the body has been removed
auto& vb = *store->getVBucket(vbid);
{
SCOPED_TRACE("");
verifyDCPSuccess(value, PROTOCOL_BINARY_RAW_BYTES, 1 /*bySeqno*/);
}
auto& ht = vb.ht;
{
const auto& res = ht.findForUpdate(key);
const auto* sv = res.committed;
auto res = ht.findForUpdate(key);
const auto* sv = durReqs ? res.pending.getSV() : res.committed;
EXPECT_TRUE(sv);
EXPECT_EQ(1, sv->getBySeqno());
EXPECT_FALSE(sv->getValue().get());
if (durReqs) {
EXPECT_EQ(0, sv->getValue()->valueSize());
} else {
// Note: Normal deletion with 0-size value goes through DelWithMeta
// that sets the value to nullptr.
EXPECT_FALSE(sv->getValue().get());
}
}

int64_t nextSeqno = 2;
if (durReqs) {
// Need to commit the first prepare for queuing a new one in the next
// steps.
EXPECT_EQ(ENGINE_SUCCESS,
vb.commit(key, 1, {}, vb.lockCollections(key)));
// Replica doesn't like 2 prepares for the same key into the same
// checkpoint.
const int64_t newStartSeqno = initialEndSeqno + 1;
EXPECT_EQ(ENGINE_SUCCESS,
consumer->snapshotMarker(1 /*opaque*/,
vbid,
newStartSeqno,
newStartSeqno + 10 /*endSeqno*/,
MARKER_FLAG_CHK,
{} /*HCS*/,
{} /*maxVisibleSeqno*/));
nextSeqno = newStartSeqno;
}

// Verify the same for body + user-xattrs + sys-xattrs
const auto xattrValue = createXattrValue(body);
value = {reinterpret_cast<const uint8_t*>(xattrValue.data()),
xattrValue.size()};
EXPECT_EQ(ENGINE_SUCCESS,
consumer->deletion(opaque,
key,
value,
0 /*priv_bytes*/,
PROTOCOL_BINARY_RAW_BYTES |
PROTOCOL_BINARY_DATATYPE_XATTR,
0 /*cas*/,
vbid,
2 /*bySeqno*/,
0 /*revSeqno*/,
{} /*meta*/));

{
const auto& res = ht.findForUpdate(key);
const auto* sv = res.committed;
SCOPED_TRACE("");
verifyDCPSuccess(
value,
PROTOCOL_BINARY_RAW_BYTES | PROTOCOL_BINARY_DATATYPE_XATTR,
nextSeqno);
}
{
auto res = ht.findForUpdate(key);
const auto* sv = durReqs ? res.pending.getSV() : res.committed;
EXPECT_TRUE(sv);
EXPECT_EQ(2, sv->getBySeqno());
EXPECT_EQ(nextSeqno, sv->getBySeqno());
EXPECT_TRUE(sv->getValue().get());
EXPECT_LT(sv->getValue()->valueSize(), xattrValue.size());

Expand All @@ -2866,7 +2918,11 @@ void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() {
}

TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInDeletion) {
testConsumerSanitizesBodyInDeletion();
testConsumerSanitizesBodyInDeletion({});
}

TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInSyncDeletion) {
testConsumerSanitizesBodyInDeletion(cb::durability::Requirements());
}

void SingleThreadedPassiveStreamTest::testConsumerReceivesUserXattrsInDelete(
Expand Down
6 changes: 5 additions & 1 deletion engines/ep/tests/module_tests/dcp_stream_test.h
Expand Up @@ -181,8 +181,12 @@ class SingleThreadedPassiveStreamTest

/**
* Test that the Consumer removes the body (if any) in deletion's value.
* That is the behaviour when (allowSanitizeValueInDeletion = true).
*
* @param durReqs (optional) The Dur Reqs, if we are testing SyncDelete
*/
void testConsumerSanitizesBodyInDeletion();
void testConsumerSanitizesBodyInDeletion(
const std::optional<cb::durability::Requirements>& durReqs);

/**
* Test that the Consumer accepts user-xattrs in deletion for enabled
Expand Down

0 comments on commit 322ae5d

Please sign in to comment.