Skip to content

Commit

Permalink
MB-41944: Item::removeUserXattrs() operates on a copy
Browse files Browse the repository at this point in the history
Item::removeUserXattrs is executed in the ActiveStream path for DCP
connections that set IncludeDeletedUserXattrs::No.

When we make any change to the payload being streamed, we must operate
on a copy of Item::value. Our changes will reflect to other connections
and to the persistence cursor otherwise, as Item::value points to the
shared in-memory blob referenced by items in the CheckpointManager.

Change-Id: I5e8ec8df788b695a3388fdfd95c4db9c79dd0849
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/137837
Reviewed-by: Dave Rigby <daver@couchbase.com>
Well-Formed: Build Bot <build@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
paolococchi committed Oct 19, 2020
1 parent 927dbde commit 897cd88
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 18 deletions.
11 changes: 7 additions & 4 deletions engines/ep/src/item.cc
Expand Up @@ -490,12 +490,15 @@ Item::WasValueInflated Item::removeUserXattrs() {
const auto bodySize = valNBytes - cb::xattr::get_body_offset(valBuf);
Expects(bodySize == 0);

cb::xattr::Blob xattrBlob(valBuf, false);
xattrBlob.prune_user_keys();
setData(xattrBlob.data(), xattrBlob.size());
// Operate on a copy
const cb::xattr::Blob originalBlob(valBuf, false /*compressed*/);
auto copy = cb::xattr::Blob(originalBlob);
copy.prune_user_keys();
const auto final = copy.finalize();
setData(final.data(), final.size());

// We have removed all user-xattrs, clear the xattr dt if no xattr left
if (xattrBlob.get_system_size() == 0) {
if (copy.get_system_size() == 0) {
setDataType(getDataType() & ~PROTOCOL_BINARY_DATATYPE_XATTR);
}

Expand Down
107 changes: 97 additions & 10 deletions engines/ep/tests/module_tests/dcp_stream_test.cc
Expand Up @@ -3015,8 +3015,8 @@ void SingleThreadedActiveStreamTest::testProducerPrunesUserXattrsForDelete(
// only configurations that trigger user-xattr pruning in deletes.
ASSERT_TRUE((flags & DcpOpenFlag::IncludeDeletedUserXattrs) == 0);

auto vb = engine->getVBucket(vbid);
recreateProducerAndStream(*vb, flags);
auto& vb = *engine->getVBucket(vbid);
recreateProducerAndStream(vb, flags);

const auto currIncDelUserXattr =
(flags & DcpOpenFlag::IncludeDeletedUserXattrs) != 0
Expand Down Expand Up @@ -3045,9 +3045,34 @@ void SingleThreadedActiveStreamTest::testProducerPrunesUserXattrsForDelete(

auto* cookie = create_mock_cookie();

// Store a Deleted doc
struct Sizes {
Sizes(const Item& item) {
value = item.getNBytes();

cb::char_buffer valBuf{const_cast<char*>(item.getData()),
item.getNBytes()};
cb::xattr::Blob xattrBlob(valBuf, false);
xattrs = xattrBlob.size();
userXattrs = xattrBlob.get_user_size();
sysXattrs = xattrBlob.get_system_size();
body = item.getNBytes() - cb::xattr::get_body_offset(valBuf);
}

size_t value;
size_t xattrs;
size_t userXattrs;
size_t sysXattrs;
size_t body;
};

// Make an item..
auto item = makeCommittedItem(makeStoredDocKey("keyD"), value);
item->setDataType(bodyType | PROTOCOL_BINARY_DATATYPE_XATTR);
// .. and save the payload sizes for later checks.
const auto originalValue = value;
const auto originalSizes = Sizes(*item);

// Store the item as deleted
uint64_t cas = 0;
const auto expectedStoreRes = durReqs ? ENGINE_EWOULDBLOCK : ENGINE_SUCCESS;
ASSERT_EQ(expectedStoreRes,
Expand All @@ -3058,6 +3083,75 @@ void SingleThreadedActiveStreamTest::testProducerPrunesUserXattrsForDelete(
durReqs,
DocumentState::Deleted));

auto& readyQ = stream->public_readyQ();
ASSERT_EQ(0, readyQ.size());

// Verfies that the payload pointed by the item in CM is the same as the
// original one
const auto checkPayloadInCM =
[&vb, &originalValue, &originalSizes, &durReqs]() -> void {
const auto& manager = *vb.checkpointManager;
const auto& ckptList =
CheckpointManagerTestIntrospector::public_getCheckpointList(
manager);
// 1 checkpoint
ASSERT_EQ(1, ckptList.size());
const auto* ckpt = ckptList.front().get();
ASSERT_EQ(checkpoint_state::CHECKPOINT_OPEN, ckpt->getState());
// empty-item
auto it = ckpt->begin();
ASSERT_EQ(queue_op::empty, (*it)->getOperation());
// 1 metaitem (checkpoint-start)
it++;
ASSERT_EQ(3, ckpt->getNumMetaItems());
EXPECT_EQ(queue_op::checkpoint_start, (*it)->getOperation());
it++;
EXPECT_EQ(queue_op::set_vbucket_state, (*it)->getOperation());
it++;
EXPECT_EQ(queue_op::set_vbucket_state, (*it)->getOperation());
// 1 non-metaitem is our deletion
it++;
ASSERT_EQ(1, ckpt->getNumItems());
ASSERT_TRUE((*it)->isDeleted());
const auto expectedOp =
durReqs ? queue_op::pending_sync_write : queue_op::mutation;
EXPECT_EQ(expectedOp, (*it)->getOperation());

// Byte-by-byte comparison
EXPECT_EQ(originalValue, (*it)->getValue()->to_s());

// The latest check should already fail if even a single byte in the
// payload has changed, but check also the sizes of the specific value
// chunks.
const auto cmSizes = Sizes(**it);
EXPECT_EQ(originalSizes.value, cmSizes.value);
EXPECT_EQ(originalSizes.xattrs, cmSizes.xattrs);
EXPECT_EQ(originalSizes.userXattrs, cmSizes.userXattrs);
EXPECT_EQ(originalSizes.sysXattrs, cmSizes.sysXattrs);
ASSERT_EQ(originalSizes.body, cmSizes.body);
};

// Verify that the value of the item in CM has not changed
{
SCOPED_TRACE("");
checkPayloadInCM();
}

// Push items to the readyQ and check what we get
stream->nextCheckpointItemTask();
ASSERT_EQ(2, readyQ.size());

// MB-41944: The call to Stream::nextCheckpointItemTask() has removed
// UserXattrs from the payload. Before the fix we modified the item's value
// (which is a reference-counted object in memory) rather that a copy of it.
// So here we check that the item's value in CM is still untouched.
{
SCOPED_TRACE("");
checkPayloadInCM();
}

// Note: Doing this check after Stream::nextCheckpointItemTask() is another
// coverage for MB-41944, so I move it here.
if (persistent()) {
// Flush and ensure docs on disk
flush_vbucket_to_disk(vbid, 1 /*expectedNumFlushed*/);
Expand All @@ -3074,13 +3168,6 @@ void SingleThreadedActiveStreamTest::testProducerPrunesUserXattrsForDelete(
doc.item->getNBytes()));
}

auto& readyQ = stream->public_readyQ();
ASSERT_EQ(0, readyQ.size());

// Push items to the readyQ and check what we get
stream->nextCheckpointItemTask();
ASSERT_EQ(2, readyQ.size());

auto resp = stream->public_nextQueuedItem();
ASSERT_TRUE(resp);
ASSERT_EQ(DcpResponse::Event::SnapshotMarker, resp->getEvent());
Expand Down
14 changes: 13 additions & 1 deletion include/xattr/blob.h
Expand Up @@ -136,10 +136,15 @@ class XATTR_PUBLIC_API Blob {
}

/**
* Get the size of the system xattr's located in the blob
* Get the size of the system xattrs located in the blob
*/
size_t get_system_size() const;

/**
* Get the size of the user xattrs located in the blob
*/
size_t get_user_size() const;

/**
* Get pointer to the xattr data (raw data, including the len word)
*/
Expand Down Expand Up @@ -273,6 +278,13 @@ class XATTR_PUBLIC_API Blob {
void remove_segment(const size_t offset, const size_t size);

private:
enum class Type : uint8_t { System, User };

/**
* Get the size of the specific category of Xattrs located in the blob
*/
size_t get_xattrs_size(Type type) const;

cb::char_buffer blob;

/// When the incoming data is compressed will auto-decompress into this
Expand Down
25 changes: 22 additions & 3 deletions xattr/blob.cc
Expand Up @@ -277,7 +277,7 @@ uint32_t Blob::read_length(size_t offset) const {
return ntohl(*ptr);
}

size_t Blob::get_system_size() const {
size_t Blob::get_xattrs_size(Type type) const {
// special case.. there are no xattr's
if (blob.len == 0) {
return 0;
Expand All @@ -291,9 +291,20 @@ size_t Blob::get_system_size() const {
while (current < blob.len) {
// Get the length of the next kv-pair
const auto size = read_length(current);
if (blob.buf[current + 4] == '_') {
ret += size + 4;

switch (type) {
case Type::System:
if (blob.buf[current + 4] == '_') {
ret += size + 4;
}
break;
case Type::User:
if (blob.buf[current + 4] != '_') {
ret += size + 4;
}
break;
}

current += 4 + size;
}
} catch (const std::out_of_range&) {
Expand All @@ -302,6 +313,14 @@ size_t Blob::get_system_size() const {
return ret;
}

size_t Blob::get_system_size() const {
return get_xattrs_size(Type::System);
}

size_t Blob::get_user_size() const {
return get_xattrs_size(Type::User);
}

nlohmann::json Blob::to_json() const {
nlohmann::json ret;

Expand Down

0 comments on commit 897cd88

Please sign in to comment.