Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement backpressure using the grpc-java API #17

Merged
merged 5 commits into from Apr 6, 2018
Merged

Conversation

btlines
Copy link
Owner

@btlines btlines commented Jan 21, 2018

grpc-java provides a way to manage back pressure by manually controlling the flow.
This is done by disabling the automatic flow control and then checking that the observer can accept more elements by calling isReady before calling onNext.
If the observer is not ready the registered onReady handler will be invoke when the observer is able to accept more elements.
On the other hand request allows to ask the producer to send more elements.

This PR is a first stab at wiring the grpc-java back-pressure mechanism into akka-streams.

getAsyncCallback((_: Unit) => complete(out)).invoke(())

override def onNext(value: I) =
getAsyncCallback((value: I) => emit(out, value)).invoke(value)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push should be fine instead of emit; emit is a more advanced tool that you're not really making use of there (it can model state transitions)


var requestStream = new AtomicReference[Option[ClientCallStreamObserver[I]]](None)
val element = new AtomicReference[Option[I]](None)
val requested = new AtomicBoolean(false)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok for a start, in general though you'll want to move these to integers of "how much was requested" and also make the element a buffer instead, similar to: https://github.com/akka/akka/pull/24230/files#diff-d061ab9a88e42dd22fe19bd18271b95dR75

Notice also the request strategy used in there.

I think it's good to get it working using the simple impl you're starting on here and then we can polish it and make it more perfromant (the requesting 1 by 1 will be a bit slow), so good that you started here~!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can get rid of the atomics, and rather also the run() delegate via async callback and ride on the Stage's guarantee of "single threaded illusion". Would have to benchmark what's faster.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffering makes sense, I'll add it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably could get rid of the Atomic using the async callbacks as you suggested. I'll give it a try.

Thanks for the feedback!

val in = Inlet[I]("grpc.in")
override val shape: SinkShape[I] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with Runnable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the runnable, who invokes that run()? Could we instead materialize some value if we need to set things on this from the outside?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Runnable is invoked by the grpc framework when the grpc StreamObserver becomes ready to accept more messages.

I don't think it runs on the same thread as the stream logic ... hence the atomic variables.

It is set by reqStream.setOnReadyHandler(this). Not sure it should be exposed to the user though.

Copy link

@ktoso ktoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general idea seems good! Commented a bit, basically just some improvements.

I think making sure this works would be awesome, and then we can work on performance and cleanups :) Thanks for taking up the effort, please ping here if you have any questions

override def onCompleted(): Unit = ()

override def onNext(request: I): Unit =
ClientCalls.asyncServerStreamingCall(call, request, outputObserver)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually doesn't allow to control the flow as grpc-java always request more elements

@btlines
Copy link
Owner Author

btlines commented Jan 23, 2018

I had a go with the async handlers and it doesn't play nice with the graph stage initialisation.

The grpc callbacks(beforeStart or onReadyHandler) seem to be called before the stage is fully initialised which gives the following error:

java.lang.IllegalStateException: not yet initialized: only setHandler is allowed in GraphStageLogic constructor

@btlines
Copy link
Owner Author

btlines commented Jan 23, 2018

@ktoso
Copy link

ktoso commented Jan 24, 2018

Regarding: "java.lang.IllegalStateException: not yet initialized: only setHandler ..."

You have to call things in preStart() of the GraphStage, then things are ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants