Skip to content

Commit

Permalink
fix(server): Adjust batching behavior to reduce network latency on MU…
Browse files Browse the repository at this point in the history
…LTI blocks (#1777)

* 1. Add a Yield() call before executing the last command in the async queue when needed.
2. Allow the receive buffer to grow when needed.
3. Improve debugging logs for batching behavior.

* Update helio and use the new epoch interface for deciding on yields.
  • Loading branch information
royjacobson committed Sep 4, 2023
1 parent 502f76f commit f94c4be
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
30 changes: 27 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
stats_->io_read_bytes += *recv_sz;
++stats_->io_read_cnt;
phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

if (redis_parser_) {
parse_status = ParseRedis(orig_builder);
Expand All @@ -735,11 +736,20 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
if (redis_parser_)
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.

// If we got a partial request and we managed to parse its
// length, make sure we have space to store it instead of
// increasing space incrementally.
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint));
}

if (io_buf_.AppendLen() < 64u) {
// If we got a partial request and we couldn't parse the length, just
// double the capacity.
// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) {
// Last io used most of the io_buf to the end.
io_buf_.Reserve(capacity * 2); // Valid growth range.
}
Expand Down Expand Up @@ -784,12 +794,28 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
while (!builder->GetError()) {
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
if (cc_->conn_closing)
break;

// We really want to have batching in the builder if possible. This is especially
// critical in situations where Nagle's algorithm can introduce unwanted high
// latencies. However we can only batch if we're sure that there are more commands
// on the way that will trigger a flush. To know if there are, we sometimes yield before
// executing the last command in the queue and let the producer fiber push more commands if it
// wants to.
// As an optimization, we only yield if the fiber was not suspended since the last dispatch.
uint64_t cur_epoch = fb2::FiberSwitchEpoch();
if (dispatch_q_.size() == 1 && cur_epoch == prev_epoch) {
ThisFiber::Yield();
DVLOG(1) << "After yielding to producer, dispatch_q_.size()=" << dispatch_q_.size();
}
prev_epoch = cur_epoch;
builder->SetBatchMode(dispatch_q_.size() > 1);

auto recycle = [this, request_cache_limit](MessageHandle msg) {
dispatch_q_bytes_.fetch_sub(msg.UsedMemory(), memory_order_relaxed);

Expand All @@ -803,8 +829,6 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
}
};

builder->SetBatchMode(dispatch_q_.size() > 1);

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
Expand Down
18 changes: 16 additions & 2 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <absl/strings/str_cat.h>
#include <double-conversion/double-to-string.h>

#include "absl/strings/escaping.h"
#include "base/logging.h"
#include "facade/error.h"

Expand Down Expand Up @@ -57,7 +58,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
DVLOG(2) << "Appending to stream " << absl::CHexEscape(src);
batch_.append(src.data(), src.size());
}
return;
Expand All @@ -71,7 +72,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if (batch_.empty()) {
ec = sink_->Write(v, len);
} else {
DVLOG(2) << "Sending batch to stream " << sink_ << "\n" << batch_;
DVLOG(2) << "Sending batch to stream " << sink_ << ": " << absl::CHexEscape(batch_);

io_write_bytes_ += batch_.size();

Expand Down Expand Up @@ -120,7 +121,13 @@ void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
Send(arr.data(), msg_vec.size());
}

void SinkReplyBuilder::StartAggregate() {
DVLOG(1) << "StartAggregate";
should_aggregate_ = true;
}

void SinkReplyBuilder::StopAggregate() {
DVLOG(1) << "StopAggregate";
should_aggregate_ = false;

if (should_batch_ || batch_.empty())
Expand All @@ -129,6 +136,11 @@ void SinkReplyBuilder::StopAggregate() {
FlushBatch();
}

void SinkReplyBuilder::SetBatchMode(bool batch) {
DVLOG(1) << "SetBatchMode(" << (batch ? "true" : "false") << ")";
should_batch_ = batch;
}

void SinkReplyBuilder::FlushBatch() {
error_code ec = sink_->Write(io::Buffer(batch_));
batch_.clear();
Expand Down Expand Up @@ -391,6 +403,8 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
type = ARRAY;
}

DVLOG(2) << "StartCollection(" << len << ", " << type << ")";

// We do not want to send multiple packets for small responses because these
// trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole
// response.
Expand Down
9 changes: 2 additions & 7 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class SinkReplyBuilder {
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
void SetBatchMode(bool batch) {
should_batch_ = batch;
}
void SetBatchMode(bool batch);

void FlushBatch();

Expand Down Expand Up @@ -121,10 +119,7 @@ class SinkReplyBuilder {

void Send(const iovec* v, uint32_t len);

void StartAggregate() {
should_aggregate_ = true;
}

void StartAggregate();
void StopAggregate();

std::string batch_;
Expand Down

0 comments on commit f94c4be

Please sign in to comment.