Skip to content

Commit

Permalink
MB-16181: Transfer the collection name over DCP
Browse files Browse the repository at this point in the history
The DCP mutation/deletion callbacks now take a collection_len field,
the data in this field will be sent over DCP streams when a client
has signalled they want collection-aware DCP.

For example "dairy::cheese" will set a collection length of 5,
default collection documents, set a collection length of 0.

Change-Id: I303d9b18bc5d0fd0968708d84e461ee59577c003
Reviewed-on: http://review.couchbase.org/78135
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
jimwwalker authored and daverigby committed May 23, 2017
1 parent 126ed5a commit e708fe3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 57 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ ADD_LIBRARY(ep_objs OBJECT
src/replicationthrottle.cc
src/linked_list.cc
src/seqlist.cc
src/stats.cc
src/string_utils.cc
src/storeddockey.cc
src/stored-value.cc
Expand Down
4 changes: 2 additions & 2 deletions src/dcp/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
meta.first,
meta.second,
mutationResponse->getItem()->getNRUValue(),
0);
mutationResponse->getCollectionLen());
break;
}
case DcpResponse::Event::Deletion:
Expand All @@ -538,7 +538,7 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
mutationResponse->getRevSeqno(),
meta.first,
meta.second,
0);
mutationResponse->getCollectionLen());
break;
}
case DcpResponse::Event::SnapshotMarker:
Expand Down
52 changes: 0 additions & 52 deletions src/ep_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6349,55 +6349,3 @@ void EpEngineTaskable::logRunTime(TaskId id,
const ProcessClock::duration runTime) {
myEngine->getKVBucket()->logRunTime(id, runTime);
}

void EPStats::memAllocated(size_t sz) {
if (isShutdown) {
return;
}

if (localMemCounter.get() == nullptr) {
// this HAS to be a non-bucket allocation
// or else the callbacks would try to call this
// function again & it would become an infinite loop
SystemAllocationGuard system_alloc_guard;
localMemCounter.set(new TLMemCounter());
}

if (0 == sz) {
return;
}

localMemCounter.get()->used += sz;
mergeMemCounter();
}

void EPStats::memDeallocated(size_t sz) {
if (isShutdown) {
return;
}

if (localMemCounter.get() == nullptr) {
// this HAS to be a non-bucket allocation
// or else the callbacks would try to call this
// function again & it would become an infinite loop
SystemAllocationGuard system_alloc_guard;
localMemCounter.set(new TLMemCounter());
}

if (0 == sz) {
return;
}

localMemCounter.get()->used -= sz;
mergeMemCounter();
}

void EPStats::mergeMemCounter(bool force) {
auto& counter = *(localMemCounter.get());
counter.count++;
if (force || counter.count % mem_merge_count_threshold == 0 ||
std::abs(counter.used) > (long)mem_merge_bytes_threshold) {
totalMemory->fetch_add(counter.used);
counter.used = 0;
}
}
8 changes: 8 additions & 0 deletions src/ep_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,14 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
return isDatatypeSupported(cookie, PROTOCOL_BINARY_DATATYPE_XATTR);
}

bool isCollectionsSupported(const void* cookie) {
EventuallyPersistentEngine* epe =
ObjectRegistry::onSwitchThread(NULL, true);
bool isSupported = serverApi->cookie->is_collections_supported(cookie);
ObjectRegistry::onSwitchThread(epe);
return isSupported;
}

uint8_t getOpcodeIfEwouldblockSet(const void *cookie) {
EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
uint8_t opcode = serverApi->cookie->get_opcode_if_ewouldblock_set(cookie);
Expand Down
70 changes: 70 additions & 0 deletions src/stats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2017 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "stats.h"

void EPStats::memAllocated(size_t sz) {
if (isShutdown) {
return;
}

if (localMemCounter.get() == nullptr) {
// this HAS to be a non-bucket allocation
// or else the callbacks would try to call this
// function again & it would become an infinite loop
SystemAllocationGuard system_alloc_guard;
localMemCounter.set(new TLMemCounter());
}

if (0 == sz) {
return;
}

localMemCounter.get()->used += sz;
mergeMemCounter();
}

void EPStats::memDeallocated(size_t sz) {
if (isShutdown) {
return;
}

if (localMemCounter.get() == nullptr) {
// this HAS to be a non-bucket allocation
// or else the callbacks would try to call this
// function again & it would become an infinite loop
SystemAllocationGuard system_alloc_guard;
localMemCounter.set(new TLMemCounter());
}

if (0 == sz) {
return;
}

localMemCounter.get()->used -= sz;
mergeMemCounter();
}

void EPStats::mergeMemCounter(bool force) {
auto& counter = *(localMemCounter.get());
counter.count++;
if (force || counter.count % mem_merge_count_threshold == 0 ||
std::abs(counter.used) > (long)mem_merge_bytes_threshold) {
totalMemory->fetch_add(counter.used);
counter.used = 0;
}
}
21 changes: 18 additions & 3 deletions tests/mock/mock_dcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ uint64_t dcp_last_snap_start_seqno;
uint64_t dcp_last_snap_end_seqno;
uint64_t dcp_last_byseqno;
uint64_t dcp_last_revseqno;
uint8_t dcp_last_collection_len;
std::string dcp_last_meta;
std::string dcp_last_value;
std::string dcp_last_key;
Expand Down Expand Up @@ -188,8 +189,17 @@ static ENGINE_ERROR_CODE mock_mutation(const void* cookie,
dcp_last_value.assign(static_cast<const char*>(item->getData()),
item->getNBytes());
dcp_last_nru = nru;
dcp_last_packet_size = 55 + dcp_last_key.length() +

// @todo: MB-24391: We are querying the header length with collections
// off, which if we extended our testapp tests to do collections may not be
// correct. For now collections testing is done via GTEST tests and isn't
// reliant on dcp_last_packet_size so this doesn't cause any problems.
dcp_last_packet_size =
protocol_binary_request_dcp_mutation::getHeaderLength(false);
dcp_last_packet_size = dcp_last_packet_size + dcp_last_key.length() +
item->getNBytes() + nmeta;

dcp_last_collection_len = collectionLen;
if (engine_handle_v1 && engine_handle) {
engine_handle_v1->release(engine_handle, NULL, item);
}
Expand All @@ -216,10 +226,15 @@ static ENGINE_ERROR_CODE mock_deletion(const void* cookie,
dcp_last_byseqno = by_seqno;
dcp_last_revseqno = rev_seqno;
dcp_last_meta.assign(static_cast<const char*>(meta), nmeta);
dcp_last_packet_size =
42 + dcp_last_key.length() + item->getNBytes() + nmeta;

// @todo: MB-24391 as above.
dcp_last_packet_size = dcp_last_key.length() + item->getNBytes() + nmeta;
dcp_last_packet_size +=
protocol_binary_request_dcp_deletion::getHeaderLength(false);

dcp_last_value.assign(static_cast<const char*>(item->getData()),
item->getNBytes());
dcp_last_collection_len = collectionLen;

if (engine_handle_v1 && engine_handle) {
engine_handle_v1->release(engine_handle, nullptr, item);
Expand Down

0 comments on commit e708fe3

Please sign in to comment.