Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: okhttp client and server transport should use padded length for flow control #10422

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,12 @@ public void transportHeadersReceived(List<Header> headers, boolean endOfStream)
* Must be called with holding the transport lock.
*/
@GuardedBy("lock")
public void transportDataReceived(okio.Buffer frame, boolean endOfStream) {
public void transportDataReceived(okio.Buffer frame, boolean endOfStream, int paddingLen) {
// We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified
// in OkHttp's Http2 deframer. In addition, this code is after the data has been read.
int length = (int) frame.size();
window -= length;
window -= length + paddingLen;
processedWindow -= paddingLen;
if (window < 0) {
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
transport.finishStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,8 @@ public void run() {
*/
@SuppressWarnings("GuardedBy")
@Override
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
int paddedLength)
throws IOException {
logger.logData(OkHttpFrameLogger.Direction.INBOUND,
streamId, in.getBuffer(), length, inFinished);
Expand All @@ -1166,12 +1167,12 @@ public void data(boolean inFinished, int streamId, BufferedSource in, int length
synchronized (lock) {
// TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
// instead found: 'OkHttpClientTransport.this.lock'
stream.transportState().transportDataReceived(buf, inFinished);
stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
}
}

// connection window update
connectionUnacknowledgedBytesRead += length;
connectionUnacknowledgedBytesRead += paddedLength;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
synchronized (lock) {
frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
Expand Down
6 changes: 4 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ public void runOnTransportThread(final Runnable r) {
* Must be called with holding the transport lock.
*/
@Override
public void inboundDataReceived(okio.Buffer frame, int windowConsumed, boolean endOfStream) {
public void inboundDataReceived(okio.Buffer frame, int dataLength, int paddingLength,
boolean endOfStream) {
synchronized (lock) {
PerfMark.event("OkHttpServerTransport$FrameHandler.data", tag);
if (endOfStream) {
this.receivedEndOfStream = true;
}
window -= windowConsumed;
window -= dataLength + paddingLength;
processedWindow -= paddingLength;
super.inboundDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
}
}
Expand Down
33 changes: 21 additions & 12 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -139,6 +140,8 @@
@GuardedBy("lock")
private Long gracefulShutdownPeriod = null;

private FrameHandler handler;

public OkHttpServerTransport(Config config, Socket bareSocket) {
this.config = Preconditions.checkNotNull(config, "config");
this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
Expand Down Expand Up @@ -248,8 +251,8 @@
TimeUnit.NANOSECONDS);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
handler = new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false));
transportExecutor.execute(handler);
} catch (Error | IOException | RuntimeException ex) {
synchronized (lock) {
if (!handshakeShutdown) {
Expand All @@ -261,6 +264,11 @@
}
}

@VisibleForTesting
FrameHandler getHandler() {
return handler;
}

@Override
public void shutdown() {
shutdown(null);
Expand Down Expand Up @@ -708,7 +716,7 @@
return;
}
// Ignore the trailers, but still half-close the stream
stream.inboundDataReceived(new Buffer(), 0, true);
stream.inboundDataReceived(new Buffer(), 0, 0, true);
return;
}
} else {
Expand Down Expand Up @@ -799,7 +807,7 @@
listener.streamCreated(streamForApp, method, metadata);
stream.onStreamAllocated();
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, inFinished);
stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
}
}
}
Expand All @@ -819,7 +827,8 @@
* Handle an HTTP2 DATA frame.
*/
@Override
public void data(boolean inFinished, int streamId, BufferedSource in, int length)
public void data(boolean inFinished, int streamId, BufferedSource in, int length,
int paddedLength)
throws IOException {
frameLogger.logData(
OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
Expand Down Expand Up @@ -853,19 +862,19 @@
"Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
return;
}
if (stream.inboundWindowAvailable() < length) {
if (stream.inboundWindowAvailable() < paddedLength) {
in.skip(length);
streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
"Received DATA size exceeded window size. RFC7540 section 6.9");
return;
}
Buffer buf = new Buffer();
buf.write(in.getBuffer(), length);
stream.inboundDataReceived(buf, length, inFinished);
stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
}

// connection window update
connectionUnacknowledgedBytesRead += length;
connectionUnacknowledgedBytesRead += paddedLength;
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
if (connectionUnacknowledgedBytesRead
>= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
synchronized (lock) {
Expand Down Expand Up @@ -1064,7 +1073,7 @@
}
streams.put(streamId, stream);
if (inFinished) {
stream.inboundDataReceived(new Buffer(), 0, true);
stream.inboundDataReceived(new Buffer(), 0, 0, true);

Check warning on line 1076 in okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java#L1076

Added line #L1076 was not covered by tests
}
frameWriter.headers(streamId, headers);
outboundFlow.data(
Expand Down Expand Up @@ -1122,7 +1131,7 @@

interface StreamState {
/** Must be holding 'lock' when calling. */
void inboundDataReceived(Buffer frame, int windowConsumed, boolean endOfStream);
void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);

/** Must be holding 'lock' when calling. */
boolean hasReceivedEndOfStream();
Expand Down Expand Up @@ -1159,12 +1168,12 @@
@Override public void onSentBytes(int frameBytes) {}

@Override public void inboundDataReceived(
Buffer frame, int windowConsumed, boolean endOfStream) {
Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
synchronized (lock) {
if (endOfStream) {
receivedEndOfStream = true;
}
window -= windowConsumed;
window -= dataLength + paddingLength;

Check warning on line 1176 in okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

View check run for this annotation

Codecov / codecov/patch

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java#L1176

Added line #L1176 was not covered by tests
try {
frame.skip(frame.size()); // Recycle segments
} catch (IOException ex) {
Expand Down