Skip to content

Commit

Permalink
MB-46738: Early return if FlowControl disabled in FC::handleFlowCtl
Browse files Browse the repository at this point in the history
Change-Id: I188c8c45898bac7609d874685cc7fe700c16374a
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/177014
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Paolo Cocchi <paolo.cocchi@couchbase.com>
  • Loading branch information
paolococchi committed Jul 7, 2022
1 parent 0ea5e2b commit 0339d7a
Showing 1 changed file with 37 additions and 37 deletions.
74 changes: 37 additions & 37 deletions engines/ep/src/dcp/flow-control.cc
Expand Up @@ -40,44 +40,44 @@ FlowControl::~FlowControl() {

cb::engine_errc FlowControl::handleFlowCtl(
DcpMessageProducersIface& producers) {
if (enabled) {
cb::engine_errc ret;
uint32_t ackable_bytes = freedBytes.load();
std::unique_lock<std::mutex> lh(bufferSizeLock);
if (pendingControl) {
pendingControl = false;
std::string buf_size(std::to_string(bufferSize));
lh.unlock();
uint64_t opaque = consumerConn.incrOpaqueCounter();
const std::string& controlMsgKey = consumerConn.getControlMsgKey();
NonBucketAllocationGuard guard;
ret = producers.control(opaque, controlMsgKey, buf_size);
return ret;
} else if (isBufferSufficientlyDrained_UNLOCKED(ackable_bytes)) {
lh.unlock();
/* Send a buffer ack when at least 20% of the buffer is drained */
uint64_t opaque = consumerConn.incrOpaqueCounter();
ret = producers.buffer_acknowledgement(
opaque, Vbid(0), ackable_bytes);
lastBufferAck = ep_current_time();
ackedBytes.fetch_add(ackable_bytes);
freedBytes.fetch_sub(ackable_bytes);
return ret;
} else if (ackable_bytes > 0 &&
(ep_current_time() - lastBufferAck) > 5) {
lh.unlock();
/* Ack at least every 5 seconds */
uint64_t opaque = consumerConn.incrOpaqueCounter();
ret = producers.buffer_acknowledgement(
opaque, Vbid(0), ackable_bytes);
lastBufferAck = ep_current_time();
ackedBytes.fetch_add(ackable_bytes);
freedBytes.fetch_sub(ackable_bytes);
return ret;
} else {
lh.unlock();
}
if (!enabled) {
return cb::engine_errc::failed;
}

cb::engine_errc ret;
uint32_t ackable_bytes = freedBytes.load();
std::unique_lock<std::mutex> lh(bufferSizeLock);
if (pendingControl) {
pendingControl = false;
std::string buf_size(std::to_string(bufferSize));
lh.unlock();
uint64_t opaque = consumerConn.incrOpaqueCounter();
const std::string& controlMsgKey = consumerConn.getControlMsgKey();
NonBucketAllocationGuard guard;
ret = producers.control(opaque, controlMsgKey, buf_size);
return ret;
} else if (isBufferSufficientlyDrained_UNLOCKED(ackable_bytes)) {
lh.unlock();
/* Send a buffer ack when at least 20% of the buffer is drained */
uint64_t opaque = consumerConn.incrOpaqueCounter();
ret = producers.buffer_acknowledgement(opaque, Vbid(0), ackable_bytes);
lastBufferAck = ep_current_time();
ackedBytes.fetch_add(ackable_bytes);
freedBytes.fetch_sub(ackable_bytes);
return ret;
} else if (ackable_bytes > 0 && (ep_current_time() - lastBufferAck) > 5) {
lh.unlock();
/* Ack at least every 5 seconds */
uint64_t opaque = consumerConn.incrOpaqueCounter();
ret = producers.buffer_acknowledgement(opaque, Vbid(0), ackable_bytes);
lastBufferAck = ep_current_time();
ackedBytes.fetch_add(ackable_bytes);
freedBytes.fetch_sub(ackable_bytes);
return ret;
} else {
lh.unlock();
}

return cb::engine_errc::failed;
}

Expand Down

0 comments on commit 0339d7a

Please sign in to comment.