Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): Adjust batching behavior to reduce network latency on MULTI blocks #1777

Merged
merged 2 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion helio
30 changes: 27 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
stats_->io_read_bytes += *recv_sz;
++stats_->io_read_cnt;
phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

if (redis_parser_) {
parse_status = ParseRedis(orig_builder);
Expand All @@ -735,11 +736,20 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
if (redis_parser_)
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.

// If we got a partial request and we managed to parse its
// length, make sure we have space to store it instead of
// increasing space incrementally.
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint));
}

if (io_buf_.AppendLen() < 64u) {
// If we got a partial request and we couldn't parse the length, just
// double the capacity.
// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) {
// Last io used most of the io_buf to the end.
io_buf_.Reserve(capacity * 2); // Valid growth range.
}
Expand Down Expand Up @@ -784,12 +794,28 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {

size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);

uint64_t prev_epoch = fb2::FiberSwitchEpoch();
while (!builder->GetError()) {
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
if (cc_->conn_closing)
break;

// We really want to have batching in the builder if possible. This is especially
// critical in situations where Nagle's algorithm can introduce unwanted high
// latencies. However we can only batch if we're sure that there are more commands
// on the way that will trigger a flush. To know if there are, we sometimes yield before
// executing the last command in the queue and let the producer fiber push more commands if it
// wants to.
// As an optimization, we only yield if the fiber was not suspended since the last dispatch.
uint64_t cur_epoch = fb2::FiberSwitchEpoch();
if (dispatch_q_.size() == 1 && cur_epoch == prev_epoch) {
ThisFiber::Yield();
DVLOG(1) << "After yielding to producer, dispatch_q_.size()=" << dispatch_q_.size();
}
prev_epoch = cur_epoch;
builder->SetBatchMode(dispatch_q_.size() > 1);

auto recycle = [this, request_cache_limit](MessageHandle msg) {
dispatch_q_bytes_.fetch_sub(msg.UsedMemory(), memory_order_relaxed);

Expand All @@ -803,8 +829,6 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
}
};

builder->SetBatchMode(dispatch_q_.size() > 1);

// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
Expand Down
18 changes: 16 additions & 2 deletions src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <absl/strings/str_cat.h>
#include <double-conversion/double-to-string.h>

#include "absl/strings/escaping.h"
#include "base/logging.h"
#include "facade/error.h"

Expand Down Expand Up @@ -57,7 +58,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
DVLOG(2) << "Appending to stream " << absl::CHexEscape(src);
batch_.append(src.data(), src.size());
}
return;
Expand All @@ -71,7 +72,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
if (batch_.empty()) {
ec = sink_->Write(v, len);
} else {
DVLOG(2) << "Sending batch to stream " << sink_ << "\n" << batch_;
DVLOG(2) << "Sending batch to stream " << sink_ << ": " << absl::CHexEscape(batch_);

io_write_bytes_ += batch_.size();

Expand Down Expand Up @@ -120,7 +121,13 @@ void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
Send(arr.data(), msg_vec.size());
}

void SinkReplyBuilder::StartAggregate() {
DVLOG(1) << "StartAggregate";
should_aggregate_ = true;
}

void SinkReplyBuilder::StopAggregate() {
DVLOG(1) << "StopAggregate";
should_aggregate_ = false;

if (should_batch_ || batch_.empty())
Expand All @@ -129,6 +136,11 @@ void SinkReplyBuilder::StopAggregate() {
FlushBatch();
}

void SinkReplyBuilder::SetBatchMode(bool batch) {
DVLOG(1) << "SetBatchMode(" << (batch ? "true" : "false") << ")";
should_batch_ = batch;
}

void SinkReplyBuilder::FlushBatch() {
error_code ec = sink_->Write(io::Buffer(batch_));
batch_.clear();
Expand Down Expand Up @@ -391,6 +403,8 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
type = ARRAY;
}

DVLOG(2) << "StartCollection(" << len << ", " << type << ")";

// We do not want to send multiple packets for small responses because these
// trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole
// response.
Expand Down
9 changes: 2 additions & 7 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ class SinkReplyBuilder {
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
void SetBatchMode(bool batch) {
should_batch_ = batch;
}
void SetBatchMode(bool batch);

void FlushBatch();

Expand Down Expand Up @@ -121,10 +119,7 @@ class SinkReplyBuilder {

void Send(const iovec* v, uint32_t len);

void StartAggregate() {
should_aggregate_ = true;
}

void StartAggregate();
void StopAggregate();

std::string batch_;
Expand Down