Skip to content

Commit

Permalink
OkHttpServer: support maxConcurrentCallsPerConnection (Fixes #11062). (
Browse files Browse the repository at this point in the history
…#11063)

* Add option in OkHttpServerBuilder
* Add value as MAX_CONCURRENT_STREAM setting in settings frame sent by the server to the client per connection
* Enforce limit by sending a RST frame with REFUSED_STREAM error
  • Loading branch information
hypnoce committed Apr 1, 2024
1 parent e36f099 commit 8050723
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
14 changes: 14 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
static final int MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
Expand Down Expand Up @@ -129,6 +130,7 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
int maxConcurrentCallsPerConnection = MAX_CONCURRENT_STREAMS;

OkHttpServerBuilder(
SocketAddress address, HandshakerSocketFactory handshakerSocketFactory) {
Expand Down Expand Up @@ -350,6 +352,18 @@ public OkHttpServerBuilder maxInboundMetadataSize(int bytes) {
return this;
}

/**
* The maximum number of concurrent calls permitted for each incoming connection. Defaults to no
* limit.
*/
@CanIgnoreReturnValue
public OkHttpServerBuilder maxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
checkArgument(maxConcurrentCallsPerConnection > 0,
"max must be positive: %s", maxConcurrentCallsPerConnection);
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
return this;
}

/**
* Sets the maximum message size allowed to be received on the server. If not called, defaults to
* defaults to 4 MiB. The default provides protection to servers who haven't considered the
Expand Down
11 changes: 11 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
OkHttpSettingsUtil.set(settings,
OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
OkHttpSettingsUtil.set(settings,
OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
}
frameWriter.settings(settings);
if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
frameWriter.windowUpdate(
Expand Down Expand Up @@ -520,6 +524,7 @@ static final class Config {
final long permitKeepAliveTimeInNanos;
final long maxConnectionAgeInNanos;
final long maxConnectionAgeGraceInNanos;
final int maxConcurrentStreams;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -544,6 +549,7 @@ public Config(
permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
}
}

Expand Down Expand Up @@ -638,6 +644,11 @@ public void headers(boolean outFinished,
newStream = streamId > lastStreamId;
if (newStream) {
lastStreamId = streamId;
if (config.maxConcurrentStreams <= streams.size()) {
streamError(streamId, ErrorCode.REFUSED_STREAM,
"Max concurrent stream reached. RFC7540 section 5.1.2");
return;
}
}
}

Expand Down
29 changes: 29 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,35 @@ public void keepAliveEnforcer_noticesActive() throws Exception {
eq(ByteString.encodeString("too_many_pings", GrpcUtil.US_ASCII)));
}

@Test
public void maxConcurrentCallsPerConnection_failsWithRst() throws Exception {
int maxConcurrentCallsPerConnection = 1;
serverBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection);
initTransport();
handshake();

ArgumentCaptor<Settings> settingsCaptor = ArgumentCaptor.forClass(Settings.class);
verify(clientFramesRead).settings(eq(false), settingsCaptor.capture());
final Settings settings = settingsCaptor.getValue();
assertThat(OkHttpSettingsUtil.get(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS))
.isEqualTo(maxConcurrentCallsPerConnection);

final List<Header> headers = Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService/doit"),
CONTENT_TYPE_HEADER,
TE_HEADER);

clientFrameWriter.headers(1, headers);
clientFrameWriter.headers(3, headers);
clientFrameWriter.flush();

assertThat(clientFrameReader.nextFrame(clientFramesRead)).isTrue();
verify(clientFramesRead).rstStream(3, ErrorCode.REFUSED_STREAM);
}

private void initTransport() throws Exception {
serverTransport = new OkHttpServerTransport(
new OkHttpServerTransport.Config(serverBuilder, Arrays.asList()),
Expand Down

0 comments on commit 8050723

Please sign in to comment.