New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify implementation of back-pressure in StreamObserver-based stub #1549

Open
jhump opened this Issue Mar 14, 2016 · 24 comments

Comments

Projects
None yet
9 participants
@jhump
Copy link
Member

jhump commented Mar 14, 2016

Pending API changes can allow reactive/async pattern for interacting with flow control and applying back pressure: https://github.com/grpc/grpc-java/pull/1545/files

In many cases, automatic back-pressure in generated stubs could be very useful -- e.g. having calls to StreamObserve#onNext(T) block instead of queueing.

It's been pointed out that this could cause deadlock for bidi-streaming operations, so perhaps we can just not expose this functionality for bidi-streaming calls?

It may also be worth pointing out that most other runtimes (wrapped languages and Go) already expose streams via blocking operations and already require that apps be aware of and work-around possible deadlock issues resulting therefrom. So maybe providing similar mechanisms in Java is fine, with said caveats.

Another possible alternative could possibly be done in an extension/add-on instead of in GRPC. For example, wrapping streaming requests and responses with RxJava Observables may further simplify the async case enough to make the synchronous (and possibly-deadlock-prone) case unnecessary.

@ejona86

This comment has been minimized.

Copy link
Member

ejona86 commented Mar 14, 2016

It may also be worth pointing out that most other runtimes (wrapped languages and Go) already expose streams via blocking operations and already require that apps be aware of and work-around possible deadlock issues resulting therefrom. So maybe providing similar mechanisms in Java is fine, with said caveats.

This could possibly be as an alternative to StreamObserver. For example, we could make a more robust blocking API that could handle bidi. To my knowledge, the various languages' APIs are either completely blocking or completely async; they don't "randomly" block.

But alternatively, simply having the blocking behavior be opt-in could be enough to address deadlock concerns.

@ejona86 ejona86 added this to the Unscheduled milestone Apr 8, 2016

@ejona86 ejona86 modified the milestones: 1.0, Unscheduled May 11, 2016

@ejona86 ejona86 added the P1 label May 11, 2016

@ejona86 ejona86 changed the title Simplify implementation of back-pressure in generated stubs Simplify implementation of back-pressure in StreamObserver-based stub May 11, 2016

@ejona86

This comment has been minimized.

Copy link
Member

ejona86 commented May 11, 2016

The StreamObserver used for sending messages currently completely ignores outbound flow control. The original argument for this is that most applications don't care. While not debating that point, it does seem enough applications do care that they experience a rude awakening.

In discussing with @louiscryan, it seems we are agreeing that it may have been a misstep to ignore flow control by default. If doing it again, we may have had blocking for sending enabled by default. However, changing the behavior now would impact existing applications and it isn't clear how much pain that would cause our existing users. So it may be prudent to introduce something new that we encourage users to migrate to.

In any case, we do still feel the need for a non-blocking variant, where the application should use isReady()/onReady() (which is not yet available on the async client stub).

In a world where some StreamObservers block when sending and others do not, we need to make it easy for our users to make sure they are using the correct one. So although it may be easy for us to have blocking by default and then an option on the returned StreamObserver to use non-blocking, this seems prone to error. Similarly, having non-blocking by default and a utility to convert it to blocking seems prone to error. Having to audit every call site to make sure a method call is present to get the preferred behavior seems untenable. So instead, we may want a different stub creation method, although this could have complications for injection since they type does not fully define the behavior.

This discussion is also impacted by the client-side stub not having an easy way to add onReady() notification due to races. A worsening of an existing race is an implementation of onReady() would need a reference to the outbound StreamObserver, but that is only available after the call was started. But the opposite is also a problem: if onReady() is configured after the call is started then notifications may be missed. Attempts to save missing notifications gets complex because the executor to use isn't available and it is important not to call onReady() after onClose().

Our answer for client-side is still use use Channel/ClientCall directly, but the lack of a generated stub, being forced to use a different API, and the need to call request() contribute to users seeming to be disappointed by the solution.

Using RxJava's Observables (or similar) would simplify the flow control aspect of this discussion, and would allow reversing the input/outputs (#1475) to be more obvious. However, it also causes a dependency problem for our stub generation, since users using the blocking stub should not need to depend on RxJava. We'd also want to ensure that we'd be able to add additional gRPC-specific methods over time.

One important but side point, is that flow control currently includes push-back related to RPC creation. onReady() is only called after the call has be accepted by a transport (transport must be connected and not exceed MAX_CONCURRENT_STREAMS), so even unary RPCs would be impacted. Granted, it is good that we are observing flow control in more cases, but it is worrisome that unary RPCs would be impacted since they are so much more prevalent. It's unclear whether we would want to make tweaks for the unary use-case.

@ejona86

This comment has been minimized.

Copy link
Member

ejona86 commented May 20, 2016

We discovered a fundamental issue yesterday on doing blocking from the async interface. If you send from a callback, then you will block until onReady is called. But onReady will be queued behind the currently running callback (the one you are blocking within) and never run. Oops.

requestObserver = asyncStub.recordRoute(responseObserver);
StreamObserver<Response> responseObserver = new StreamObserver<Response>() {
  @Override
  public void onNext(Response resp) {
    // The onNext callbacks is blocking any future onReady callback
    requestObserver.onNext(req); // Permanent blocking!
  }
  ...
};

It is possible to fix, but is ugly enough (executors, executors everywhere) that it doesn't seem like a great idea. So we're trying to figure out what to do now.

@jhump

This comment has been minimized.

Copy link
Member Author

jhump commented May 20, 2016

I'm not sure I follow. I'm guessing this is because of how the library serializes calls to a given listener. But it's not clear why these two operations would be serialized together -- I would have thought that they'd be independent. Or is this a concern with using a parallelism-constrained Executor, which could deadlock due not creating an additional thread to run onReady?

@ejona86

This comment has been minimized.

Copy link
Member

ejona86 commented May 20, 2016

@jhump, all callbacks are serialized. That is pretty necessary for onMessage and onClose. But we serialize all callbacks for a listener to avoid implementations of the listener from needing to worry about thread-safety (because they'd get it wrong).

@jhump

This comment has been minimized.

Copy link
Member Author

jhump commented May 20, 2016

Yeah, I see now. I think I just misunderstood your example, so the crux of the problem didn't jump out at me.

@louiscryan

This comment has been minimized.

Copy link
Contributor

louiscryan commented May 23, 2016

This PR #1842 is somewhat relevant here

@louiscryan

This comment has been minimized.

Copy link
Contributor

louiscryan commented May 23, 2016

... in particular we can now layer a 'blocking' stream observer over the underlying stream observer returned by the runtime. This could be provided as utility but I'm not sure we could do anything beyond that for 1.0

@hsaliak hsaliak modified the milestones: 1.1, 1.0 Jun 14, 2016

@hsaliak

This comment has been minimized.

Copy link
Member

hsaliak commented Jun 14, 2016

Removing this from 1.0 /cc @louiscryan

@ejona86 ejona86 removed the P1 label Aug 30, 2016

@ejona86 ejona86 modified the milestones: Next, 1.1 Jan 17, 2017

@cbornet

This comment has been minimized.

Copy link

cbornet commented Feb 2, 2017

Is there any way to apply back-pressure today ? There seem to be some utility functions but I have no clue how to use them. Can someone point to docs or examples ?
Also, to prevent deadlocks, it would be nice to use the reactive stream pattern through rxjava2's Flowable.

@carl-mastrangelo

This comment has been minimized.

Copy link
Member

carl-mastrangelo commented Feb 2, 2017

I don't think there is documentation today, but the intended way is to cast the StreamObserver to a CallStreamObserver, disable automatic flow control, and call request and isReady appropriately.

@stephenh

This comment has been minimized.

Copy link
Contributor

stephenh commented Feb 3, 2017

@cbornet FWIW this is what I came up with ~several months ago (poke around the rest of the project for the callers/usage):

https://github.com/stephenh/mirror/blob/master/src/main/java/mirror/BlockingStreamObserver.java

Disclaimer this is my hobby project and it "works great" AFAICT, but I make no claims to intimately understanding grpc-java internals, so YMMV. Just passing along the link in case it saves you some time.

@cbornet

This comment has been minimized.

Copy link

cbornet commented Feb 3, 2017

@stephenh thanks a lot. That will be very helpful. My ultimate goal would be to have non-blocking back-pressure but that's certainly a good step towards that direction.

@carl-mastrangelo

This comment has been minimized.

Copy link
Member

carl-mastrangelo commented Feb 3, 2017

@stephenh that is going to be really easy to consume all your executor threads. Also, it will mysteriously and silently drop messages if an IE comes in.

@cbornet

This comment has been minimized.

Copy link

cbornet commented Feb 3, 2017

@carl-mastrangelo I'm trying to understand if the onReady/request can bring end-to-end non-blocking back pressure from a client to a server (or vice-versa). Is it the case ? I mean : is there some flow control at the transport level so that the client receives onReady only when the server calls request or does onReady only means that the message was sent to the server and buffered in some incoming queue of the server ?

Also, it would be great to have a grpc-rx library that would bring ready-to-use adapters for people wanting to use rxJava or Reactor.

@stephenh

This comment has been minimized.

Copy link
Contributor

stephenh commented Feb 3, 2017

@carl-mastrangelo I'm very likely being naive here, but do you mean my application's own threads, or the internal grpc threads?

My intention was specifically to block my application thread(s), because I'm sending N files out over grpc-java, and I don't want to read in the bytes for all N files and have them queue them up in the grpc queue, as then it will OOME. So I want my app thread that calls onNext(messageWithFileBytesInIt) to block.

If this BlockingStreamObserver will cause some internal grpc IO threads to block, then yes, that's unintended and would appreciate pointers on a better approach.

Per dropping messages on IE, yes, that's true, I should probably at least log about that, but AFAIU / for my app, an IE would only happen when the JVM/app is shutting down, e.g. on ctrl-c, so dropping a message there/for my app, is fine/desirable.

@cbornet

This comment has been minimized.

Copy link

cbornet commented Feb 6, 2017

After some reading, it does seem there is flow control on the wire at the HTTP2 level. So it should be possible to build a reactive streams / back-pressured implementation. The main problem is the lack of documentation and examples which make it very hard for non gRPC experts like me to output something working.

@carl-mastrangelo

This comment has been minimized.

Copy link
Member

carl-mastrangelo commented Feb 6, 2017

@stephenh Yes, you app threads, as provided to executor() when you set up your Channel / Server. If you stall all your threads, none of them will be able to take more traffic. The pushback logic probably needs to live outside gRPC code, and not inline as in your example above. If you are calling the stubs from your own threads (not on the executor you gave to the channel) then you are probably fine blocking.

IE is actually a "collaborative" feature. You have to want an IE in the reciever, and also have to intentionally interrupt another thread (such as on Future.cancel(true)). The trouble comes from code you may not own interrupting your code. If you aren't expecting an IE, its better to just let it propagate.

@cbornet There are two levels of FC: wire and API. The gRPC API is message based, which is converted to byte based in the library. I agree, this is not well covered anywhere. One effect that is not obvious is that unary calls generally don't honor the message flow control, but do honor the byte flow control. That means it is still easy to overload your sender if you don't pay attention to isReady, because they will buffer up in memory. One day I will write a comprehensive guide on how this works, but that's extremely time consuming and thus hasn't been a priority.

@stephenh

This comment has been minimized.

Copy link
Contributor

stephenh commented Feb 6, 2017

If you are calling the stubs from your own threads (not on the executor you gave to the
channel) then you are probably fine blocking.

Yep, that's what I'm doing. Thanks!

@bobbymicroby

This comment has been minimized.

Copy link

bobbymicroby commented Feb 13, 2017

@carl-mastrangelo Hi Carl. Since writing proper docs can be time consuming, can you make a simplified gist showing how to make bidi streaming with flow control using reactive streams API for example ?

@bobbymicroby

This comment has been minimized.

Copy link

bobbymicroby commented Feb 13, 2017

I've seen people already trying to do it, but having hard time : xiaodongw/grpc-rx#1

@cbornet

This comment has been minimized.

Copy link

cbornet commented Oct 19, 2017

See #3119 for a flow control example
and https://github.com/salesforce/grpc-java-contrib/tree/master/rxgrpc for RxJava2 generator plugin with flow control enabled.

@davidraleigh

This comment has been minimized.

Copy link

davidraleigh commented Dec 7, 2017

@cbornet

This comment has been minimized.

Copy link

cbornet commented Dec 7, 2017

@davidraleigh exactly ! You can now generate RxJava 2 and Reactor bindings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment