-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changing gRPC Java inbound flow control symantics #14
Conversation
The goal is to mirror the token-based approach used by the Reactive Streams API.
@ejona86 can you take a look at this? This is a rather large PR, so you can start by just reviewing the changes to |
@ejona86 submitted changes to address first round of comments. |
@ejona86, just submitted more changes. PTAL when you have a moment. |
@ejona86 I just pushed a couple minor changes to address some of your comments. |
} | ||
}, MoreExecutors.directExecutor()); | ||
if (closed) { | ||
log.info("Dropping headers received after closing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm questioning whether this should be logged. Given that we could have just simply not decoded the incoming data at all (like we used to do), it seems that the log may just be noise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done ... removed all similar logs for both client and server.
Finished a full review. Only small stuff left. It is looking quite good. |
@@ -98,6 +94,14 @@ | |||
public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers); | |||
|
|||
/** | |||
* Requests up to the given number of messages from the call to be delivered to | |||
* {@link Listener#onPayload(Object)}. No additional messages will be delivered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some implementations suggest giving a very big number to effectively unbound the stream. worthwhile noting or better left out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@ejona86 @adriancole I think I've addressed all of the comments. Take another look when you have a moment. |
@Override | ||
public void request(final int numMessages) { | ||
synchronized (lock) { | ||
requestMessagesFromDeframer(numMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a try-catch here, since unlike Netty there isn't anything higher in the transport that will catch an exception here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the best place to handle exceptions is in AbstractStream.requestMessagesFromDeframer
, so that it's consistent with AbstractStream.deframe
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine to me.
@nmittler Some small comments, but after those LGTM |
@ejona86 just checked in some changes. If you're ok with changes to |
@nmittler LGTM |
Woot! Thanks for reviewing! :) |
Cherry-picked as de3a131 |
The goal is to mirror the token-based approach used by the Reactive
Streams API.