Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual flow-control and back-pressure demo #3119

Merged
merged 18 commits into from
Sep 26, 2017

Conversation

rmichela
Copy link
Contributor

In the process of adapting RxJava to gRPC, I found I had to make use of manual flow-control and back-pressure, however, there were no clear examples for how to use the manual flow-control APIs.

This sample implements a bidirectional streaming service with client and server workers processing messages at different rates using manual flow-control and back-pressure-aware idioms on both ends of the wire.

I've done my best to document how things work. Please correct me if any of the idioms are wrong.

@grpc-kokoro
Copy link

Thanks for your pull request. The automated tests will run as soon as one of the admins verifies this change is ok for us to run on our infrastructure.

@rmichela rmichela changed the title Feature/manual flow control demo Manual flow-control and back-pressure demo Jun 21, 2017
Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't finish the review, but sending what I have. We've really needed an example like this.

}

private static List<String> names() {
List<String> names = new ArrayList<String>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays.asList() supports varargs, so it is great for a case like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that was available in JDK6.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -25,6 +25,7 @@ package helloworld;
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStreaming (stream HelloRequest) returns (stream HelloReply) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is shared by other languages, so we can't "just" change our version. But also, this is used by the helloworld, which is supposed to be super-simple. Let's instead create a new .proto for this streaming example. If you wanted to name it "helloworld_streaming.proto" or similar, that'd be no big deal to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

// Set up a back-pressure-aware producer for the request stream. The onReadyHandler will be invoked
// when the consuming side has enough buffer space to receive more messages.
//
// Note: the onReadyHandler is invoked by gRPC's internal thread pool. You can't block in this in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a cached thread pool for callbacks to the application by default. Blocking is "okay", and if doing work to generate a response to a request will provide automatic back-pressure (which is a good thing). The only thing to point out though is that we can't call any other callback while a callback is blocked, since all callbacks are serialized to reduce threading headache for users. That mainly means onError/onCompleted couldn't be called. So in this example, I think it'd be good and fine to do all work within callbacks.

Note also, that the current use of pool seems like it may have multithreading issues because the onReadyHandler can be called why the Runnable is running, thus running multiple simultaneously. The only way that would happen today is if there was an "invisible" readiness flip-flop between onNext() and isReady() (isReady() never returned false, but if it would have been called 1 µs earlier it would have returned false, and you just never happened to see it).

I have considered optimizing the onReady() callback to only call if isReady() returned false (with an exception for the initial readiness at the start of a call). Although even with such an optimization it's unclear whether the application can assume such behavior, since maybe an interceptor called isReady().

Copy link
Contributor Author

@rmichela rmichela Jun 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced the external thread pool on the server side specifically because all the callbacks are serialized. The onReadyHandler was waiting for work and was blocking onNext from accepting that work.

The client side needed an external thread because the onReadyHandler would have to run to completion before any server responses could be processed. This prevented true bi-directional streaming from happening. All the requests would be sent, then all the responses would be consumed.

Basically, if the onReadyHandler is not running on another thread, any delays in sending messages on the outbound stream will prevent the inbound stream from calling onNext(). onReadyHandlers that complete "instantly", such as iterating over a fixed list, aren't really noticeable, but an onReadyHandler with delays will cause blocking problems and prevent bi-directional streaming.

@ejona86
Copy link
Member

ejona86 commented Jun 21, 2017

okay to test

@ejona86 ejona86 added the kokoro:run Add this label to a PR to tell Kokoro the code is safe and tests can be run label Jun 21, 2017
@kokoro-team kokoro-team removed the kokoro:run Add this label to a PR to tell Kokoro the code is safe and tests can be run label Jun 21, 2017

// Create a channel and a stub
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 50051)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically use indent +2 for new scope, and +4 for line wrap. It keeps lines shorter, and wrap less often.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public class ManualFlowControlClient {
public static void main(String[] args) throws InterruptedException {
final ExecutorService pool = Executors.newCachedThreadPool();
final Object done = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: this would be cleaner as a CountDownLatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -0,0 +1,147 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2017

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public void run() {
System.out.println("Shutting down");
server.shutdown();
pool.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, pool needs to await server termination since it could add more events to the pool after server shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@jroper
Copy link
Contributor

jroper commented Jun 27, 2017

I don't think this is a good example, handling back pressure manually like this should be considered an anti pattern. The design of the incoming flow control for gRPC intentionally mirrors reactive streams (which is being included in JDK9 as the new Flow API), it is quite trivial to adapt the API to the reactive streams Publisher/Subscriber APIs. The thing about reactive streams is that it is not meant to be implemented or used by end users in this way, it is considered an anti pattern for end users to be invoking request() themselves, for example. Even trivial examples are hard to get right, but add situations where you want to process multiple messages concurrently, for example, and the threading/concurrency concerns become very difficult to manage. Instead, reactive streams is an integration API, intended to allow different streaming providers to integrate with each other. If you want more control over flow than what gRPC gives you, then best practice would be to use a dedicated reactive streams implementation, such as Akka streams or rxJava, and use the much more powerful, but more importantly safer and easier to get right APIs that they provide to process the stream.

So I think to achieve the goals that this example is trying to achieve, we should show how to bridge gRPCs API to reactive streams (it's just a few lines of code, adapting interfaces with identical method signatures to each other), and then integrate that with a third party stream processing library. I don't think handling it manually should be encouraged.

@rmichela
Copy link
Contributor Author

rmichela commented Jun 27, 2017

@jroper I completely agree with you. These APIs should not be used directly if at all possible. Bridging gRPC to other reactive technologies is the end goal. In fact, I did exactly that with RxJava2. See: salesforce/grpc-java-contrib#17

The purpose of this demo is to show how manual flow control fits together in gRPC, so that others looking to adapt different reactive technologies can more easily do so.

System.out.println("--> " + name);
work.add(name);
// Signal the sender to send another request.
serverCallStreamObserver.request(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As-is, the disableAutoInboundFlowControl() is not adding value and is just busy work. To do this "for real" you wouldn't request() until you had sent everything you needed.

Copy link
Contributor Author

@rmichela rmichela Jul 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included disableAutoInboundFlowControl() on the server for illustration. It's paired with lines 54 and 109, where request(1) is called. These lines mimic the automatic flow control performed by ServerCalls.

Technically, the scenario presented here could be implemented automatic flow control, but this demo's purpose is to show how to do everything manually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With automatic flow control, we wouldn't have this extra work
queue. The response would be generated and sent out from
onNext. So if we can't keep up with the work, we would also be
too slow to ask for request(1). As @ejona86 mentioned in the
other comment, the problem is that the message is put into an
unbounded queue and then we immediately ask for more. One way we
can fix address this is by calling request(1) only after we
send out a response (line 86).

// Accept and enqueue the request.
String name = request.getName();
System.out.println("--> " + name);
work.add(name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming that the Runnable is still running in pool. That's not guaranteed to be the case. You need to try to send here. I really think you should drop the pool and run everything in the callback thread.

Copy link
Contributor Author

@rmichela rmichela Jul 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't run everything on the callback thread. The onReadyHandler will livelock the callback thread.

  1. serverCallStreamObserver.isReady() goes true and onReadyHandler is invoked.
  2. onReadyHandler enters a processing loop, processing messages while serverCallStreamObserver.isReady() == true.
  3. Because onReadyHandler is looping, the callback thread is blocked until isReady() == false, but isReady() can never be set to false because doing so requires the callback thread, which is stuck in a loop.
  4. Additionally, since the callback thread is stuck in a loop, no inbound messages are ingested. Ingestion is also handled by the callback thread.

The root problem is that the onReadyHandler is invoked by the same SerializingExecutor that runs the rest of gRPC's internal processing loop. Ideally, gRPC would maintain a separate executor for onReadyHandlers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the source of confusion:

Because onReadyHandler is looping, the callback thread is blocked until isReady() == false, but isReady() can never be set to false because doing so requires the callback thread, which is stuck in a loop.

isReady() changes immediately. As soon as onNext() returns when sending a message, isReady() is up-to-date. If it didn't it would be pretty weak, like you are describing.

Additionally, since the callback thread is stuck in a loop, no inbound messages are ingested. Ingestion is also handled by the callback thread.

That is on purpose. Since the server is doing some "processing" it should push-back on the client. It should only request more messages when it is ready to process them. Having an infinite queue of requests is counter-productive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onReadyHandler can be triggered while the previous handler is
still running. This is because the handler is called every time the
stream transitions from notReady->ready. The code right now has the
potential to kick off multiple sender threads by mistake. One way to
fix this could be to have only a single sender thread that blocks
rather than returns, and use the onReadyHandler to wake the thread
up.

It's possible to make this all work, but the scope of this example is
getting a bit larger than the original goal showing how to use
request. Perhaps we can simplify the demo to avoid simultaneous
reading and writing:

  • The server receives the message and responds in the same
    thread. The server can still sleep to simulate doing some work.
  • The client sends requests using logic similar to
    StreamObservers#copyWithFlowControl so that it will get pushed
    back.


// Send more messages if there are more messages to send.
String name = iterator.next();
System.out.println("--> " + name);
Copy link
Contributor

@zpencer zpencer Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we have been using java.util.logging.Logger to print UI messages like these. I know it's a bit arbitrary but let's use that here as well to be consistent :)

@rmichela
Copy link
Contributor Author

Thank you all for the feedback. I'll look into simplifying the example.

@rmichela
Copy link
Contributor Author

@ejona86 @zpencer I have simplified the manual flow control server example to show single message processing using request(1).

String message = "Hello " + name;
logger.info("<-- " + message);
HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
responseObserver.onNext(reply);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For proper flow control on server-side, maybe something like this:

responseObserver.onNext(reply);
if (responseObserver.isReady()) {
  responseObserver.request(1);
} else {
  notReady = true;
}

// onReadyHandler
if (notReady && responseObserver.isReady()) {
  notReady = false;
  responseObserver.request(1);
}

@Override
public void run() {
logger.info("READY");
// Signal the request sender to send one message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's expand the comment here to explain that we are requesting a message here because we expect to send a response upon receiving the request. Otherwise it might be confusing to request a message when the outbound direction is ready.

// MANY messages may be buffered, however, they haven't yet been sent to the server. The server must call
// request() to pull a buffered message from the client.
//
// Note: the onReadyHandler is invoked by gRPC's internal thread pool. You can't block here or deadlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more precise, the onReadyHandler executes on a thread pool donated by the user application. We have a default thread pool if none is passed into ServerBuilder.executor. The thread pool is used to create sets of serialized executor queues, one for each RPC. This onReadyHandler is serialized on one of these such executors, and the same one would be used to run the onNext, onCompleted, etc. So just as it is fine to block in onNext, it is technically fine to block in onReady. The risk is that no further progress can be made for this RPC stream, and one of the threads on the thread pool will be eaten up. Let's update this comment and the one in the server to clarify that blocking would not cause all of gRPC to hang.

}
});

// Give gRPC a StreamObserver it can write incoming requests into.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/it can write incoming requests to/that can observe (i.e. process) incoming requests/

examples/pom.xml Outdated
@@ -10,7 +10,8 @@
<name>examples</name>
<url>http://maven.apache.org</url>
<properties>
<grpc.version>1.7.0-SNAPSHOT</grpc.version><!-- CURRENT_GRPC_VERSION -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.6.1</grpc.version><!-- CURRENT_GRPC_VERSION -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this part is stale now since we've bumped the gRPC version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. didn't mean to check that in.

@rmichela
Copy link
Contributor Author

@zpencer @ejona86 Documentation updated as requested

@zpencer
Copy link
Contributor

zpencer commented Sep 21, 2017

Had a chat with @ejona86 and it looks like the notReady = true variable is important to guard against spurious onReadys. This can happen if the stream becomes ready inside of the server's onNext:

responseObserver.onNext(reply);
// time=0 stream becomes ready, and an onReady becomes scheduled
if (responseObserver.isReady()) {
 // time=1 we enter this block and request a message
  responseObserver.request(1);
}
//time=2 onReady runs, and we request yet another message
responseObserver.request(1);

IMO the notReady boolean would be more clear if we flip the meaning to alreadyRequested, so it feels natural to only request if !alreadyRequested.

@rmichela
Copy link
Contributor Author

@zpencer @ejona86 I've addressed the subtle race between onNext() and isReady().

Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small change. Looks good.

// toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
// request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
// execution.
final AtomicBoolean wasReady = new AtomicBoolean(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a simple boolean field in this anonymous StreamingGreeterImplBase class. There's no need for synchronization and since we're already creating a class we can use a field instead of a final.

@rmichela
Copy link
Contributor Author

@ejona86 @zpencer I've been thinking about the wasReady flag and I don't think it's right.

Putting it at the class level makes wasReady shared state between all concurrent clients of the service. Doesn't this introduce a race condition between two independent clients?

@zpencer
Copy link
Contributor

zpencer commented Sep 26, 2017

Actually you're right, sayHelloStreaming should be independent from one call to the next.

@rmichela rmichela force-pushed the feature/manual-flow-control-demo branch from 79f8368 to 9f8b8a3 Compare September 26, 2017 15:49
@rmichela
Copy link
Contributor Author

rmichela commented Sep 26, 2017

@zpencer I've reverted the change that made wasReady a global variable. It's back to being an AtomicBoolean. I had to use AtomicBoolean over regular boolean because the onReadyHandler is an anonymous inner Runnable and can only reference final outer variables. If wasReady were a simple boolean, I would not be able to modify it from within the onReadyHandler.

@ejona86
Copy link
Member

ejona86 commented Sep 26, 2017

okay to test

Copy link
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right; you'd need a single class for all callbacks (which is possible, but meh). AtomicBoolean is the natural thing to use.

@ejona86 ejona86 merged commit 589da07 into grpc:master Sep 26, 2017
@ejona86
Copy link
Member

ejona86 commented Sep 26, 2017

Thanks @rmichela! Thanks for bearing with us!

@rmichela rmichela deleted the feature/manual-flow-control-demo branch December 18, 2017 04:42
@lock lock bot locked as resolved and limited conversation to collaborators Jan 19, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants