Skip to content

Commit

Permalink
Add a limit of the number of pipe-lined requests over an HTTP/1 clien…
Browse files Browse the repository at this point in the history
…t connection
  • Loading branch information
vietj committed May 31, 2016
1 parent aa269df commit 10dab41
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 64 deletions.
6 changes: 5 additions & 1 deletion src/main/asciidoc/dataobjects.adoc
Expand Up @@ -668,7 +668,7 @@ Set to <code>true</code> when an <i>h2c</i> connection is established using an H
+++
Set the maximum pool size for HTTP/2 connections
+++
|[[http2MaxStreams]]`http2MaxStreams`|`Number (int)`|
|[[http2MultiplexingLimit]]`http2MultiplexingLimit`|`Number (int)`|
+++
Set a client limit of the number concurrent streams for each HTTP/2 connection, this limits the number
of streams the client can create for a connection. The effective number of streams for a
Expand Down Expand Up @@ -740,6 +740,10 @@ Set the trust options in pfx format
+++
Set whether pipe-lining is enabled on the client
+++
|[[pipeliningLimit]]`pipeliningLimit`|`Number (int)`|
+++
Set the limit of pending requests a pipe-lined HTTP/1 connection can send.
+++
|[[protocolVersion]]`protocolVersion`|`link:enums.html#HttpVersion[HttpVersion]`|
+++
Set the protocol version.
Expand Down
14 changes: 9 additions & 5 deletions src/main/asciidoc/java/http.adoc
Expand Up @@ -1571,29 +1571,33 @@ By default pipe-lining is disabled.

When pipe-lining is enabled requests will be written to connections without waiting for previous responses to return.

The number of pipe-lined requests over a single connection is limited by `link:../../apidocs/io/vertx/core/http/HttpClientOptions.html#setPipeliningLimit-int-[setPipeliningLimit]`.
This option defines the maximum number of http requests sent to the server awaiting for a response. This limit ensures the
fairness of the distribution of the client requests over the connections to the same server.

=== HTTP/2 multiplexing

HTTP/2 advocates to use a single connection to a server, by default the http client uses a single
connection for each server, all the streams to the same server are multiplexed on the same connection.
connection for each server, all the streams to the same server are multiplexed over the same connection.

When the clients needs to use more than a single connection and use pooling, the `link:../../apidocs/io/vertx/core/http/HttpClientOptions.html#setHttp2MaxPoolSize-int-[setHttp2MaxPoolSize]`
shall be used.

When it is desirable to limit the number of concurrent streams per server and use a connection
pool instead of a single connection, `link:../../apidocs/io/vertx/core/http/HttpClientOptions.html#setHttp2MaxStreams-int-[setHttp2MaxStreams]`
When it is desirable to limit the number of multiplexed streams per connection and use a connection
pool instead of a single connection, `link:../../apidocs/io/vertx/core/http/HttpClientOptions.html#setHttp2MultiplexingLimit-int-[setHttp2MultiplexingLimit]`
can be used.

[source,java]
----
HttpClientOptions clientOptions = new HttpClientOptions().
setHttp2MaxStreams(10).
setHttp2MultiplexingLimit(10).
setHttp2MaxPoolSize(3);
// Uses up to 3 connections and up to 10 streams per connection
HttpClient client = vertx.createHttpClient(clientOptions);
----

The maximum streams for a connection is a setting set on the client that limits the streams
The multiplexing limit for a connection is a setting set on the client that limits the number of streams
of a single connection. The effective value can be even lower if the server sets a lower limit
with the `link:../../apidocs/io/vertx/core/http/Http2Settings.html#setMaxConcurrentStreams-long-[SETTINGS_MAX_CONCURRENT_STREAMS]` setting.

Expand Down
Expand Up @@ -47,8 +47,8 @@ public static void fromJson(JsonObject json, HttpClientOptions obj) {
if (json.getValue("http2MaxPoolSize") instanceof Number) {
obj.setHttp2MaxPoolSize(((Number)json.getValue("http2MaxPoolSize")).intValue());
}
if (json.getValue("http2MaxStreams") instanceof Number) {
obj.setHttp2MaxStreams(((Number)json.getValue("http2MaxStreams")).intValue());
if (json.getValue("http2MultiplexingLimit") instanceof Number) {
obj.setHttp2MultiplexingLimit(((Number)json.getValue("http2MultiplexingLimit")).intValue());
}
if (json.getValue("initialSettings") instanceof JsonObject) {
obj.setInitialSettings(new io.vertx.core.http.Http2Settings((JsonObject)json.getValue("initialSettings")));
Expand All @@ -71,6 +71,9 @@ public static void fromJson(JsonObject json, HttpClientOptions obj) {
if (json.getValue("pipelining") instanceof Boolean) {
obj.setPipelining((Boolean)json.getValue("pipelining"));
}
if (json.getValue("pipeliningLimit") instanceof Number) {
obj.setPipeliningLimit(((Number)json.getValue("pipeliningLimit")).intValue());
}
if (json.getValue("protocolVersion") instanceof String) {
obj.setProtocolVersion(io.vertx.core.http.HttpVersion.valueOf((String)json.getValue("protocolVersion")));
}
Expand All @@ -96,7 +99,7 @@ public static void toJson(HttpClientOptions obj, JsonObject json) {
json.put("defaultPort", obj.getDefaultPort());
json.put("h2cUpgrade", obj.isH2cUpgrade());
json.put("http2MaxPoolSize", obj.getHttp2MaxPoolSize());
json.put("http2MaxStreams", obj.getHttp2MaxStreams());
json.put("http2MultiplexingLimit", obj.getHttp2MultiplexingLimit());
if (obj.getInitialSettings() != null) {
json.put("initialSettings", obj.getInitialSettings().toJson());
}
Expand All @@ -106,6 +109,7 @@ public static void toJson(HttpClientOptions obj, JsonObject json) {
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
json.put("maxWebsocketFrameSize", obj.getMaxWebsocketFrameSize());
json.put("pipelining", obj.isPipelining());
json.put("pipeliningLimit", obj.getPipeliningLimit());
if (obj.getProtocolVersion() != null) {
json.put("protocolVersion", obj.getProtocolVersion().name());
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/examples/HTTP2Examples.java
Expand Up @@ -285,7 +285,7 @@ public void example28(HttpConnection connection) {
public void useMaxStreams(Vertx vertx) {

HttpClientOptions clientOptions = new HttpClientOptions().
setHttp2MaxStreams(10).
setHttp2MultiplexingLimit(10).
setHttp2MaxPoolSize(3);

// Uses up to 3 connections and up to 10 streams per connection
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/io/vertx/core/http/HttpClientOptions.java
Expand Up @@ -42,7 +42,7 @@
public class HttpClientOptions extends ClientOptionsBase {

/**
* The default maximum number of connections a client will pool = 5
* The default maximum number of HTTP/1 connections a client will pool = 5
*/
public static final int DEFAULT_MAX_POOL_SIZE = 5;

Expand All @@ -52,9 +52,9 @@ public class HttpClientOptions extends ClientOptionsBase {
public static final int DEFAULT_HTTP2_MAX_POOL_SIZE = 1;

/**
* The default maximum number of concurrent stream per connection for HTTP/2 = -1
* The default maximum number of concurrent streams per connection for HTTP/2 = -1
*/
public static final int DEFAULT_HTTP2_MAX_STREAMS = -1;
public static final int DEFAULT_HTTP2_MULTIPLEXING_LIMIT = -1;

/**
* Default value of whether keep-alive is enabled = true
Expand All @@ -66,6 +66,11 @@ public class HttpClientOptions extends ClientOptionsBase {
*/
public static final boolean DEFAULT_PIPELINING = false;

/**
* The default maximum number of requests an HTTP/1.1 pipe-lined connection can send = 10
*/
public static final int DEFAULT_PIPELINING_LIMIT = 10;

/**
* Default value of whether the client will attempt to use compression = false
*/
Expand Down Expand Up @@ -119,9 +124,10 @@ public class HttpClientOptions extends ClientOptionsBase {
private boolean verifyHost = true;
private int maxPoolSize;
private boolean keepAlive;
private int pipeliningLimit;
private boolean pipelining;
private int http2MaxPoolSize;
private int http2MaxStreams;
private int http2MultiplexingLimit;

private boolean tryUseCompression;
private int maxWebsocketFrameSize;
Expand Down Expand Up @@ -153,8 +159,9 @@ public HttpClientOptions(HttpClientOptions other) {
this.maxPoolSize = other.getMaxPoolSize();
this.keepAlive = other.isKeepAlive();
this.pipelining = other.isPipelining();
this.pipeliningLimit = other.getPipeliningLimit();
this.http2MaxPoolSize = other.getHttp2MaxPoolSize();
this.http2MaxStreams = other.http2MaxStreams;
this.http2MultiplexingLimit = other.http2MultiplexingLimit;
this.tryUseCompression = other.isTryUseCompression();
this.maxWebsocketFrameSize = other.maxWebsocketFrameSize;
this.defaultHost = other.defaultHost;
Expand Down Expand Up @@ -183,7 +190,8 @@ private void init() {
maxPoolSize = DEFAULT_MAX_POOL_SIZE;
keepAlive = DEFAULT_KEEP_ALIVE;
pipelining = DEFAULT_PIPELINING;
http2MaxStreams = DEFAULT_HTTP2_MAX_STREAMS;
pipeliningLimit = DEFAULT_PIPELINING_LIMIT;
http2MultiplexingLimit = DEFAULT_HTTP2_MULTIPLEXING_LIMIT;
http2MaxPoolSize = DEFAULT_HTTP2_MAX_POOL_SIZE;
tryUseCompression = DEFAULT_TRY_USE_COMPRESSION;
maxWebsocketFrameSize = DEFAULT_MAX_WEBSOCKET_FRAME_SIZE;
Expand Down Expand Up @@ -362,8 +370,8 @@ public HttpClientOptions setMaxPoolSize(int maxPoolSize) {
* @return the maximum number of concurrent streams for an HTTP/2 connection, {@code -1} means
* the value sent by the server
*/
public int getHttp2MaxStreams() {
return http2MaxStreams;
public int getHttp2MultiplexingLimit() {
return http2MultiplexingLimit;
}

/**
Expand All @@ -374,11 +382,11 @@ public int getHttp2MaxStreams() {
* Setting the value to {@code -1} means to use the value sent by the server's initial settings.
* {@code -1} is the default value.
*
* @param http2MaxStreams the maximum concurrent for an HTTP/2 connection
* @param http2MultiplexingLimit the maximum concurrent for an HTTP/2 connection
* @return a reference to this, so the API can be used fluently
*/
public HttpClientOptions setHttp2MaxStreams(int http2MaxStreams) {
this.http2MaxStreams = http2MaxStreams;
public HttpClientOptions setHttp2MultiplexingLimit(int http2MultiplexingLimit) {
this.http2MultiplexingLimit = http2MultiplexingLimit;
return this;
}

Expand Down Expand Up @@ -445,6 +453,24 @@ public HttpClientOptions setPipelining(boolean pipelining) {
return this;
}

/**
* @return the limit of pending requests a pipe-lined HTTP/1 connection can send
*/
public int getPipeliningLimit() {
return pipeliningLimit;
}

/**
* Set the limit of pending requests a pipe-lined HTTP/1 connection can send.
*
* @param pipeliningLimit the limit of pending requests
* @return a reference to this, so the API can be used fluently
*/
public HttpClientOptions setPipeliningLimit(int pipeliningLimit) {
this.pipeliningLimit = pipeliningLimit;
return this;
}

/**
* Is hostname verification (for SSL/TLS) enabled?
*
Expand Down Expand Up @@ -699,7 +725,7 @@ public boolean equals(Object o) {
if (defaultPort != that.defaultPort) return false;
if (keepAlive != that.keepAlive) return false;
if (maxPoolSize != that.maxPoolSize) return false;
if (http2MaxStreams != that.http2MaxStreams) return false;
if (http2MultiplexingLimit != that.http2MultiplexingLimit) return false;
if (maxWebsocketFrameSize != that.maxWebsocketFrameSize) return false;
if (pipelining != that.pipelining) return false;
if (tryUseCompression != that.tryUseCompression) return false;
Expand All @@ -720,7 +746,7 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + (verifyHost ? 1 : 0);
result = 31 * result + maxPoolSize;
result = 31 * result + http2MaxStreams;
result = 31 * result + http2MultiplexingLimit;
result = 31 * result + (keepAlive ? 1 : 0);
result = 31 * result + (pipelining ? 1 : 0);
result = 31 * result + (tryUseCompression ? 1 : 0);
Expand Down
Expand Up @@ -524,7 +524,7 @@ public synchronized void endRequest() {
metrics.requestEnd(currentRequest.metric());
}
currentRequest = null;
pool.recycle(this);
pool.requestEnded(this);
}

@Override
Expand Down
Expand Up @@ -87,7 +87,7 @@ public class ConnectionManager {
this.keepAlive = client.getOptions().isKeepAlive();
this.pipelining = client.getOptions().isPipelining();
this.maxWaitQueueSize = client.getOptions().getMaxWaitQueueSize();
this.http2MaxConcurrency = options.getHttp2MaxStreams() < 1 ? Integer.MAX_VALUE : options.getHttp2MaxStreams();
this.http2MaxConcurrency = options.getHttp2MultiplexingLimit() < 1 ? Integer.MAX_VALUE : options.getHttp2MultiplexingLimit();
this.logEnabled = client.getOptions().getLogActivity();
this.connector = new ChannelConnector();
this.metrics = metrics;
Expand Down
47 changes: 25 additions & 22 deletions src/main/java/io/vertx/core/http/impl/Http1xPool.java
Expand Up @@ -41,6 +41,7 @@ public class Http1xPool implements ConnectionManager.Pool<ClientConnection> {
private final Map<Channel, HttpClientConnection> connectionMap;
private final boolean pipelining;
private final boolean keepAlive;
private final int pipeliningLimit;
private final boolean ssl;
private final HttpVersion version;
private final Set<ClientConnection> allConnections = new HashSet<>();
Expand All @@ -55,6 +56,7 @@ public Http1xPool(HttpClientImpl client, HttpClientMetrics metrics, HttpClientOp
this.metrics = metrics;
this.pipelining = options.isPipelining();
this.keepAlive = options.isKeepAlive();
this.pipeliningLimit = options.getPipeliningLimit();
this.ssl = options.isSsl();
this.connectionMap = connectionMap;
this.maxSockets = maxSockets;
Expand All @@ -81,39 +83,40 @@ public HttpClientStream createStream(ClientConnection conn) {
return conn;
}

// Called when the request has ended
public void recycle(ClientConnection conn) {
if (pipelining) {
doRecycle(conn);
}
}

// Called when the response has ended
public void responseEnded(ClientConnection conn, boolean close) {
if ((pipelining || keepAlive) && !close && conn.getCurrentRequest() == null) {
doRecycle(conn);
} else {
// Close it now
conn.close();
}
}

private void doRecycle(ClientConnection conn) {
synchronized (queue) {
Waiter waiter = queue.getNextWaiter();
if (waiter != null) {
Context context = waiter.context;
if (context == null) {
context = conn.getContext();
}
context.runOnContext(v -> queue.deliverStream(conn, waiter));
queue.deliverStream(conn, waiter);
} else if (conn.getOutstandingRequestCount() == 0) {
// Return to set of available from here to not return it several times
availableConnections.add(conn);
}
}
}

void requestEnded(ClientConnection conn) {
ContextImpl context = conn.getContext();
context.runOnContext(v -> {
if (pipelining && conn.getOutstandingRequestCount() < pipeliningLimit) {
recycle(conn);
}
});
}

void responseEnded(ClientConnection conn, boolean close) {
if (!keepAlive || close) {
conn.close();
} else {
ContextImpl ctx = conn.getContext();
ctx.runOnContext(v -> {
if (conn.getCurrentRequest() == null) {
recycle(conn);
}
});
}
}

void createConn(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
ClientConnection conn = new ClientConnection(version, client, queue.metric, ch,
ssl, host, port, context, this, metrics);
Expand Down
12 changes: 8 additions & 4 deletions src/main/java/io/vertx/core/http/package-info.java
Expand Up @@ -1206,24 +1206,28 @@
*
* When pipe-lining is enabled requests will be written to connections without waiting for previous responses to return.
*
* The number of pipe-lined requests over a single connection is limited by {@link io.vertx.core.http.HttpClientOptions#setPipeliningLimit}.
* This option defines the maximum number of http requests sent to the server awaiting for a response. This limit ensures the
* fairness of the distribution of the client requests over the connections to the same server.
*
* === HTTP/2 multiplexing
*
* HTTP/2 advocates to use a single connection to a server, by default the http client uses a single
* connection for each server, all the streams to the same server are multiplexed on the same connection.
* connection for each server, all the streams to the same server are multiplexed over the same connection.
*
* When the clients needs to use more than a single connection and use pooling, the {@link io.vertx.core.http.HttpClientOptions#setHttp2MaxPoolSize(int)}
* shall be used.
*
* When it is desirable to limit the number of concurrent streams per server and use a connection
* pool instead of a single connection, {@link io.vertx.core.http.HttpClientOptions#setHttp2MaxStreams(int)}
* When it is desirable to limit the number of multiplexed streams per connection and use a connection
* pool instead of a single connection, {@link io.vertx.core.http.HttpClientOptions#setHttp2MultiplexingLimit(int)}
* can be used.
*
* [source,$lang]
* ----
* {@link examples.HTTP2Examples#useMaxStreams}
* ----
*
* The maximum streams for a connection is a setting set on the client that limits the streams
* The multiplexing limit for a connection is a setting set on the client that limits the number of streams
* of a single connection. The effective value can be even lower if the server sets a lower limit
* with the {@link io.vertx.core.http.Http2Settings#setMaxConcurrentStreams SETTINGS_MAX_CONCURRENT_STREAMS} setting.
*
Expand Down

0 comments on commit 10dab41

Please sign in to comment.