Skip to content

Commit

Permalink
Simplify handling of chunkBufferSize = 0
Browse files Browse the repository at this point in the history
Summary:
We currently send a 0 credit message and then immediately a 1 credit message on every loop iteration. We can instead reduce this case to the size = 1 case before the loop and remove the extra check.

The purpose of supporting 0 here is to allow deferring payload receipt until the subscription begins; once it has there is no need to treat this differently from a size of 1.

Reviewed By: rhodo

Differential Revision: D25537142

fbshipit-source-id: 48245e1b4d8db865f1aa2fe22bc445346f50c43d
  • Loading branch information
iahs authored and facebook-github-bot committed Dec 15, 2020
1 parent 29ef30e commit d03d99e
Showing 1 changed file with 18 additions and 27 deletions.
45 changes: 18 additions & 27 deletions thrift/lib/cpp2/async/ClientBufferedStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class ClientBufferedStream {
void subscribeInline(OnNextTry&& onNextTry) && {
auto streamBridge = std::move(streamBridge_);

if (bufferSize_ == 0) {
streamBridge->requestN(1);
++bufferSize_;
}

int32_t outstanding = bufferSize_;
int32_t payloadDataSize = 0;

Expand All @@ -74,11 +79,6 @@ class ClientBufferedStream {

while (true) {
if (queue.empty()) {
if (outstanding == 0) {
streamBridge->requestN(1);
++outstanding;
}

ReadyCallback callback;
if (streamBridge->wait(&callback)) {
callback.wait();
Expand Down Expand Up @@ -110,10 +110,7 @@ class ClientBufferedStream {
}
}

outstanding--;
// we request credits only if bufferSize_ > 0.
// For bufferSize_ = 0, the loop requests 1 credit at a time.
if ((outstanding <= bufferSize_ / 2) ||
if ((--outstanding <= bufferSize_ / 2) ||
(payloadDataSize >= kRequestCreditPayloadSize)) {
streamBridge->requestN(bufferSize_ - outstanding);
outstanding = bufferSize_;
Expand Down Expand Up @@ -141,6 +138,11 @@ class ClientBufferedStream {

template <typename Callback>
auto subscribeExTry(folly::Executor::KeepAlive<> e, Callback&& onNextTry) && {
if (bufferSize_ == 0) {
streamBridge_->requestN(1);
++bufferSize_;
}

auto c = new Continuation<std::decay_t<Callback>>(
e,
std::forward<Callback>(onNextTry),
Expand All @@ -161,6 +163,11 @@ class ClientBufferedStream {
apache::thrift::detail::ClientStreamBridge::ClientPtr streamBridge,
int32_t bufferSize,
folly::Try<T> (*decode)(folly::Try<StreamPayload>&&)) {
if (bufferSize == 0) {
streamBridge->requestN(1);
++bufferSize;
}

int32_t outstanding = bufferSize;
int32_t payloadDataSize = 0;

Expand All @@ -184,11 +191,6 @@ class ClientBufferedStream {
co_yield folly::coro::co_error(folly::OperationCancelled());
}
if (queue.empty()) {
if (outstanding == 0) {
streamBridge->requestN(1);
++outstanding;
}

ReadyCallback callback;
if (streamBridge->wait(&callback)) {
folly::CancellationCallback cb{
Expand Down Expand Up @@ -246,10 +248,7 @@ class ClientBufferedStream {
co_yield folly::coro::co_result(std::move(value));
}

outstanding--;
// we request credits only if bufferSize_ > 0.
// For bufferSize_ = 0, the loop requests 1 credit at a time.
if ((outstanding <= bufferSize / 2) ||
if ((--outstanding <= bufferSize / 2) ||
(payloadDataSize >= kRequestCreditPayloadSize)) {
streamBridge->requestN(bufferSize - outstanding);
outstanding = bufferSize;
Expand Down Expand Up @@ -358,11 +357,6 @@ class ClientBufferedStream {

while (!state_->streamBridge->isCanceled()) {
if (queue.empty()) {
if (outstanding_ == 0) {
state_->streamBridge->requestN(1);
++outstanding_;
}

if (Continuation::wait(cb)) {
// The filler will schedule us back on the executor once the queue
// is refilled so we return here
Expand Down Expand Up @@ -400,10 +394,7 @@ class ClientBufferedStream {
}
}

outstanding_--;
// we request credits only if bufferSize_ > 0.
// For bufferSize_ = 0, the loop requests 1 credit at a time.
if ((outstanding_ <= bufferSize_ / 2) ||
if ((--outstanding_ <= bufferSize_ / 2) ||
(payloadDataSize_ >= kRequestCreditPayloadSize)) {
state_->streamBridge->requestN(bufferSize_ - outstanding_);
outstanding_ = bufferSize_;
Expand Down

0 comments on commit d03d99e

Please sign in to comment.