Navigation Menu

Skip to content

Commit

Permalink
MB-47318: [BP] Add DcpControl to consumer and allow flow control over…
Browse files Browse the repository at this point in the history
…ride

Allow the client of the consumer to override the flow control
buffer size which can be used by tests to give greater control
over the flow-control logic.

The use of DcpControl against a consumer is only permitted when the
bucket is configured to allow it - and by default this is disabled.
The intention is that tests will manually enable this with the correct
bucket config.

Change-Id: I5c4da722cb2660112e3b651c0e414dd79fc9524e
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/165015
Well-Formed: Restriction Checker
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: James H <james.harrison@couchbase.com>
  • Loading branch information
jimwwalker committed Dec 1, 2021
1 parent 75d6f95 commit caac463
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 0 deletions.
6 changes: 6 additions & 0 deletions engines/ep/configuration.json
Expand Up @@ -310,6 +310,12 @@
"dynamic": false,
"type": "std::string"
},
"dcp_consumer_control_enabled" : {
"default": "false",
"descr": "When true, a DcpConsumer can accept control messages. This is intended to be used by tests.",
"dynamic": false,
"type": "bool"
},
"dcp_noop_mandatory_for_v5_features": {
"default": "true",
"descr": "Forces clients to enable noop for v5 features",
Expand Down
22 changes: 22 additions & 0 deletions engines/ep/src/dcp/consumer.cc
Expand Up @@ -19,6 +19,7 @@

#include "bucket_logger.h"
#include "checkpoint_manager.h"
#include "common.h"
#include "dcp/dcpconnmap.h"
#include "dcp/passive_stream.h"
#include "dcp/response.h"
Expand Down Expand Up @@ -1893,3 +1894,24 @@ std::shared_ptr<PassiveStream> DcpConsumer::removeStream(Vbid vbid) {
engine_.getDcpConnMap().removeVBConnByVBId(getCookie(), vbid);
return eraseResult;
}

ENGINE_ERROR_CODE DcpConsumer::control(uint32_t opaque,
cb::const_char_buffer key,
cb::const_char_buffer value) {
if (engine_.getConfiguration().isDcpConsumerControlEnabled()) {
if (key == "connection_buffer_size") {
uint32_t result{0};
std::string valueStr(value.data(), value.size());
if (parseUint32(valueStr.c_str(), &result)) {
logger->warn("changing flow control buffer size:{}", result);
setFlowControlBufSize(result);
return ENGINE_SUCCESS;
}
}

logger->warn("Invalid ctrl parameter {} {}", key, value);
return ENGINE_EINVAL;
}

return ConnHandler::control(opaque, key, value);
}
4 changes: 4 additions & 0 deletions engines/ep/src/dcp/consumer.h
Expand Up @@ -247,6 +247,10 @@ class DcpConsumer : public ConnHandler,
uint64_t prepareSeqno,
uint64_t abortSeqno) override;

ENGINE_ERROR_CODE control(uint32_t opaque,
cb::const_char_buffer key,
cb::const_char_buffer value) override;

bool doRollback(uint32_t opaque, Vbid vbid, uint64_t rollbackSeqno);

/**
Expand Down
2 changes: 2 additions & 0 deletions engines/ep/tests/ep_testsuite.cc
Expand Up @@ -7007,6 +7007,7 @@ static enum test_result test_mb19687_fixed(EngineIface* h) {
"ep_dcp_noop_mandatory_for_v5_features",
"ep_dcp_noop_tx_interval",
"ep_dcp_producer_snapshot_marker_yield_limit",
"ep_dcp_consumer_control_enabled",
"ep_dcp_consumer_process_buffered_messages_yield_limit",
"ep_dcp_consumer_process_buffered_messages_batch_size",
"ep_dcp_scan_byte_limit",
Expand Down Expand Up @@ -7205,6 +7206,7 @@ static enum test_result test_mb19687_fixed(EngineIface* h) {
"ep_dcp_conn_buffer_size_aggressive_perc",
"ep_dcp_conn_buffer_size_max",
"ep_dcp_conn_buffer_size_perc",
"ep_dcp_consumer_control_enabled",
"ep_dcp_consumer_process_buffered_messages_batch_size",
"ep_dcp_consumer_process_buffered_messages_yield_limit",
"ep_dcp_enable_noop",
Expand Down

0 comments on commit caac463

Please sign in to comment.