diff --git a/src/main/asciidoc/dataobjects.adoc b/src/main/asciidoc/dataobjects.adoc
index 2c7f0c4c38a..c34fbdbbe50 100644
--- a/src/main/asciidoc/dataobjects.adoc
+++ b/src/main/asciidoc/dataobjects.adoc
@@ -673,6 +673,16 @@ Set the maximum HTTP chunk size
+++
Set the maximum pool size for connections
+++
+|[[maxStreams]]`maxStreams`|`Number (int)`|
++++
+Set the maximum of concurrent concurrency for an HTTP/2 connection, this limits the number
+ of streams the client will create for a connection. The effective number of stream for a
+ connection can be lower than this value when the server has limited this value lower than
+ this value.
+
+ Setting a maximum to means the client will not limit the concurrency and the client
+ will use a single connection. is the default value.
++++
|[[maxWaitQueueSize]]`maxWaitQueueSize`|`Number (int)`|
+++
Set the maximum requests allowed in the wait queue, any requests beyond the max size will result in
diff --git a/src/main/asciidoc/java/http.adoc b/src/main/asciidoc/java/http.adoc
index 79b18e6a62c..84eeea2f967 100644
--- a/src/main/asciidoc/java/http.adoc
+++ b/src/main/asciidoc/java/http.adoc
@@ -1538,8 +1538,24 @@ When pipe-lining is enabled requests will be written to connections without wait
=== HTTP/2 multiplexing
-For HTTP/2, the http client uses a single connection for each server, all the requests to the same server are
-multiplexed on the same connection.
+HTTP/2 advocates to use a single connection to a server, by default the http client uses a single
+connection for each server, all the streams to the same server are multiplexed on the same connection.
+
+When it is desirable to limit the number of concurrent streams per server and uses a connection
+pool instead of a single connection, `link:../../apidocs/io/vertx/core/http/HttpClientOptions.html#setMaxStreams-int-[setMaxStreams]`
+can be used.
+
+[source,java]
+----
+HttpClientOptions clientOptions = new HttpClientOptions().setMaxStreams(10).setMaxPoolSize(3);
+
+// Uses up to 3 connections and up to 10 streams per connection
+HttpClient client = vertx.createHttpClient(clientOptions);
+----
+
+The maximum streams for a connection is a setting set on the client that limits the streams
+of a single connection. The effective value can be even lower if the server sets a lower limit
+with the `link:../../apidocs/io/vertx/core/http/Http2Settings.html#setMaxConcurrentStreams-long-[SETTINGS_MAX_CONCURRENT_STREAMS]` setting.
HTTP/2 connections will not be closed by the client automatically. To close them you can call `link:../../apidocs/io/vertx/core/http/HttpConnection.html#close--[close]`
or close the client instance.
diff --git a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
index 8728e24b9ac..3857b4091f7 100644
--- a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
+++ b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
@@ -56,6 +56,9 @@ public static void fromJson(JsonObject json, HttpClientOptions obj) {
if (json.getValue("maxPoolSize") instanceof Number) {
obj.setMaxPoolSize(((Number)json.getValue("maxPoolSize")).intValue());
}
+ if (json.getValue("maxStreams") instanceof Number) {
+ obj.setMaxStreams(((Number)json.getValue("maxStreams")).intValue());
+ }
if (json.getValue("maxWaitQueueSize") instanceof Number) {
obj.setMaxWaitQueueSize(((Number)json.getValue("maxWaitQueueSize")).intValue());
}
@@ -107,6 +110,7 @@ public static void toJson(HttpClientOptions obj, JsonObject json) {
json.put("keepAlive", obj.isKeepAlive());
json.put("maxChunkSize", obj.getMaxChunkSize());
json.put("maxPoolSize", obj.getMaxPoolSize());
+ json.put("maxStreams", obj.getMaxStreams());
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
json.put("maxWebsocketFrameSize", obj.getMaxWebsocketFrameSize());
json.put("pipelining", obj.isPipelining());
diff --git a/src/main/java/examples/HTTP2Examples.java b/src/main/java/examples/HTTP2Examples.java
index ef2d0f4267b..0dcf96b7808 100644
--- a/src/main/java/examples/HTTP2Examples.java
+++ b/src/main/java/examples/HTTP2Examples.java
@@ -282,4 +282,12 @@ public void example28(HttpConnection connection) {
connection.close();
});
}
+
+ public void useMaxStreams(Vertx vertx) {
+
+ HttpClientOptions clientOptions = new HttpClientOptions().setMaxStreams(10).setMaxPoolSize(3);
+
+ // Uses up to 3 connections and up to 10 streams per connection
+ HttpClient client = vertx.createHttpClient(clientOptions);
+ }
}
diff --git a/src/main/java/io/vertx/core/http/HttpClientOptions.java b/src/main/java/io/vertx/core/http/HttpClientOptions.java
index 559777f6567..ca704df5698 100644
--- a/src/main/java/io/vertx/core/http/HttpClientOptions.java
+++ b/src/main/java/io/vertx/core/http/HttpClientOptions.java
@@ -25,7 +25,6 @@
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SSLEngine;
-import io.vertx.core.net.TCPSSLOptions;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,6 +43,11 @@ public class HttpClientOptions extends ClientOptionsBase {
*/
public static final int DEFAULT_MAX_POOL_SIZE = 5;
+ /**
+ * The default maximum number of concurrent stream per connection for HTTP/2 = -1
+ */
+ public static final int DEFAULT_MAX_STREAMS = -1;
+
/**
* Default value of whether keep-alive is enabled = true
*/
@@ -108,6 +112,8 @@ public class HttpClientOptions extends ClientOptionsBase {
private int maxPoolSize;
private boolean keepAlive;
private boolean pipelining;
+ private int maxStreams;
+
private boolean tryUseCompression;
private int maxWebsocketFrameSize;
private String defaultHost;
@@ -143,6 +149,7 @@ public HttpClientOptions(HttpClientOptions other) {
this.maxPoolSize = other.getMaxPoolSize();
this.keepAlive = other.isKeepAlive();
this.pipelining = other.isPipelining();
+ this.maxStreams = other.maxStreams;
this.tryUseCompression = other.isTryUseCompression();
this.maxWebsocketFrameSize = other.maxWebsocketFrameSize;
this.defaultHost = other.defaultHost;
@@ -175,6 +182,7 @@ private void init() {
maxPoolSize = DEFAULT_MAX_POOL_SIZE;
keepAlive = DEFAULT_KEEP_ALIVE;
pipelining = DEFAULT_PIPELINING;
+ maxStreams = DEFAULT_MAX_STREAMS;
tryUseCompression = DEFAULT_TRY_USE_COMPRESSION;
maxWebsocketFrameSize = DEFAULT_MAX_WEBSOCKET_FRAME_SIZE;
defaultHost = DEFAULT_DEFAULT_HOST;
@@ -340,6 +348,31 @@ public HttpClientOptions setMaxPoolSize(int maxPoolSize) {
return this;
}
+ /**
+ * @return the maximum number of concurrent streams for an HTTP/2 connection, {@literal -1} means
+ * no limit (default value)
+ */
+ public int getMaxStreams() {
+ return maxStreams;
+ }
+
+ /**
+ * Set the maximum of concurrent concurrency for an HTTP/2 connection, this limits the number
+ * of streams the client will create for a connection. The effective number of stream for a
+ * connection can be lower than this value when the server has limited this value lower than
+ * this value.
+ *
+ * Setting a maximum to {@literal -1} means the client will not limit the concurrency and the client
+ * will use a single connection. {@literal -1} is the default value.
+ *
+ * @param maxStreams the maximum concurrent for a connection
+ * @return a reference to this, so the API can be used fluently
+ */
+ public HttpClientOptions setMaxStreams(int maxStreams) {
+ this.maxStreams = maxStreams;
+ return this;
+ }
+
/**
* Is keep alive enabled on the client?
*
@@ -705,6 +738,7 @@ public boolean equals(Object o) {
if (defaultPort != that.defaultPort) return false;
if (keepAlive != that.keepAlive) return false;
if (maxPoolSize != that.maxPoolSize) return false;
+ if (maxStreams != that.maxStreams) return false;
if (maxWebsocketFrameSize != that.maxWebsocketFrameSize) return false;
if (pipelining != that.pipelining) return false;
if (tryUseCompression != that.tryUseCompression) return false;
@@ -729,6 +763,7 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + (verifyHost ? 1 : 0);
result = 31 * result + maxPoolSize;
+ result = 31 * result + maxStreams;
result = 31 * result + (keepAlive ? 1 : 0);
result = 31 * result + (pipelining ? 1 : 0);
result = 31 * result + (tryUseCompression ? 1 : 0);
diff --git a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
index 777a85bbd3a..7d38ad97c9c 100644
--- a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
+++ b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
@@ -74,6 +74,8 @@ public class ConnectionManager {
private final boolean keepAlive;
private final boolean pipelining;
private final int maxWaitQueueSize;
+ private final int http2MaxSockets;
+ private final int http2MaxConcurrency;
ConnectionManager(HttpClientImpl client) {
this.client = client;
@@ -83,6 +85,9 @@ public class ConnectionManager {
this.keepAlive = client.getOptions().isKeepAlive();
this.pipelining = client.getOptions().isPipelining();
this.maxWaitQueueSize = client.getOptions().getMaxWaitQueueSize();
+ int maxStreams = options.getMaxStreams();
+ this.http2MaxSockets = maxStreams < 1 ? 1 : options.getMaxPoolSize();
+ this.http2MaxConcurrency = maxStreams < 1 ? Integer.MAX_VALUE : maxStreams;
}
/**
@@ -193,7 +198,7 @@ public class ConnQueue {
this.address = address;
this.mgr = mgr;
if (version == HttpVersion.HTTP_2) {
- pool = new Http2Pool(this, client, mgr.connectionMap);
+ pool = new Http2Pool(this, client, mgr.connectionMap, http2MaxSockets, http2MaxConcurrency);
} else {
pool = new Http1xPool(client, options, this, mgr.connectionMap, version);
}
@@ -502,7 +507,23 @@ static abstract class Pool {
abstract HttpVersion version();
- abstract boolean getConnection(Waiter waiter);
+ abstract C pollConnection();
+
+ boolean getConnection(Waiter waiter) {
+ C conn = pollConnection();
+ if (conn != null && conn.isValid()) {
+ ContextImpl context = waiter.context;
+ if (context == null) {
+ context = conn.getContext();
+ } else if (context != conn.getContext()) {
+ ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
+ }
+ context.runOnContext(v -> deliverStream(conn, waiter));
+ return true;
+ } else {
+ return false;
+ }
+ }
abstract void closeAllConnections();
diff --git a/src/main/java/io/vertx/core/http/impl/Http1xPool.java b/src/main/java/io/vertx/core/http/impl/Http1xPool.java
index ebe1dd81714..9e0e63d0c9e 100644
--- a/src/main/java/io/vertx/core/http/impl/Http1xPool.java
+++ b/src/main/java/io/vertx/core/http/impl/Http1xPool.java
@@ -58,20 +58,9 @@ HttpVersion version() {
return version;
}
- public boolean getConnection(Waiter waiter) {
- ClientConnection conn = availableConnections.poll();
- if (conn != null && conn.isValid()) {
- ContextImpl context = waiter.context;
- if (context == null) {
- context = conn.getContext();
- } else if (context != conn.getContext()) {
- ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
- }
- context.runOnContext(v -> deliverStream(conn, waiter));
- return true;
- } else {
- return false;
- }
+ @Override
+ ClientConnection pollConnection() {
+ return availableConnections.poll();
}
@Override
diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
index dbc4f642db2..820e76cebe6 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
@@ -54,7 +54,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
final Http2Pool http2Pool;
final HttpClientMetrics metrics;
final Object metric;
- long streamCount;
+ int streamCount;
public Http2ClientConnection(Http2Pool http2Pool,
ContextImpl context,
diff --git a/src/main/java/io/vertx/core/http/impl/Http2Pool.java b/src/main/java/io/vertx/core/http/impl/Http2Pool.java
index 7595c8412e1..c388af0588d 100644
--- a/src/main/java/io/vertx/core/http/impl/Http2Pool.java
+++ b/src/main/java/io/vertx/core/http/impl/Http2Pool.java
@@ -23,21 +23,30 @@
import io.vertx.core.http.HttpVersion;
import io.vertx.core.impl.ContextImpl;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
/**
* @author Julien Viet
*/
class Http2Pool extends ConnectionManager.Pool {
- private Http2ClientConnection connection;
+ private Queue availableConnections = new ArrayDeque<>();
+ private final Set allConnections = new HashSet<>();
private final Map connectionMap;
final HttpClientImpl client;
+ final int maxConcurrency;
- public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client, Map connectionMap) {
- super(queue, 1);
+ public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client, Map connectionMap, int maxSockets, int maxConcurrency) {
+ super(queue, maxSockets);
this.client = client;
this.connectionMap = connectionMap;
+ this.maxConcurrency = maxConcurrency;
}
@Override
@@ -45,24 +54,16 @@ HttpVersion version() {
return HttpVersion.HTTP_2;
}
- // Under sync when called
- public boolean getConnection(Waiter waiter) {
- Http2ClientConnection conn = this.connection;
- if (conn != null && canReserveStream(conn)) {
+ @Override
+ Http2ClientConnection pollConnection() {
+ Http2ClientConnection conn = availableConnections.peek();
+ if (conn != null) {
conn.streamCount++;
- ContextImpl context = waiter.context;
- if (context == null) {
- context = conn.getContext();
- } else if (context != conn.getContext()) {
- ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
+ if (!canReserveStream(conn)) {
+ availableConnections.remove();
}
- context.runOnContext(v -> {
- deliverStream(conn, waiter);
- });
- return true;
- } else {
- return false;
}
+ return conn;
}
void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) throws Http2Exception {
@@ -79,38 +80,40 @@ void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade)
handler.onHttpClientUpgrade();
}
Http2ClientConnection conn = handler.connection;
- connection = conn;
int idleTimeout = client.getOptions().getIdleTimeout();
if (idleTimeout > 0) {
p.addLast("idle", new IdleStateHandler(0, 0, idleTimeout));
}
p.addLast(handler);
+ allConnections.add(conn);
conn.streamCount++;
waiter.handleConnection(conn); // Should make same tests than in deliverRequest
deliverStream(conn, waiter);
checkPending(conn);
+ if (canReserveStream(conn)) {
+ availableConnections.add(conn);
+ }
}
}
private boolean canReserveStream(Http2ClientConnection handler) {
- int maxConcurrentStreams = handler.handler.connection().local().maxActiveStreams();
+ int maxConcurrentStreams = Math.min(handler.handler.connection().local().maxActiveStreams(), maxConcurrency);
return handler.streamCount < maxConcurrentStreams;
}
- void checkPending(Http2ClientConnection handler) {
+ void checkPending(Http2ClientConnection conn) {
synchronized (queue) {
Waiter waiter;
- while (canReserveStream(handler) && (waiter = queue.getNextWaiter()) != null) {
- handler.streamCount++;
- deliverStream(handler, waiter);
+ while (canReserveStream(conn) && (waiter = queue.getNextWaiter()) != null) {
+ conn.streamCount++;
+ deliverStream(conn, waiter);
}
}
}
void discard(Http2ClientConnection conn) {
synchronized (queue) {
- if (connection == conn) {
- connection = null;
+ if (allConnections.remove(conn)) {
queue.connectionClosed();
}
}
@@ -121,6 +124,9 @@ void recycle(Http2ClientConnection conn) {
synchronized (queue) {
conn.streamCount--;
checkPending(conn);
+ if (canReserveStream(conn)) {
+ availableConnections.add(conn);
+ }
}
}
@@ -131,13 +137,11 @@ HttpClientStream createStream(Http2ClientConnection conn) throws Exception {
@Override
void closeAllConnections() {
- Http2ClientConnection conn;
+ List toClose;
synchronized (queue) {
- conn = this.connection;
+ toClose = new ArrayList<>(allConnections);
}
// Close outside sync block to avoid deadlock
- if (conn != null) {
- conn.close();
- }
+ toClose.forEach(Http2ConnectionBase::close);
}
}
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java b/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java
index 1985409f5aa..6e0389340a6 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientConnection.java
@@ -16,14 +16,14 @@
package io.vertx.core.http.impl;
-import io.vertx.core.Context;
+import io.vertx.core.impl.ContextImpl;
/**
* @author Julien Viet
*/
interface HttpClientConnection {
- Context getContext();
+ ContextImpl getContext();
void reportBytesWritten(long numberOfBytes);
diff --git a/src/main/java/io/vertx/core/http/package-info.java b/src/main/java/io/vertx/core/http/package-info.java
index fa0da526b83..4e62c2307d4 100644
--- a/src/main/java/io/vertx/core/http/package-info.java
+++ b/src/main/java/io/vertx/core/http/package-info.java
@@ -1176,8 +1176,21 @@
*
* === HTTP/2 multiplexing
*
- * For HTTP/2, the http client uses a single connection for each server, all the requests to the same server are
- * multiplexed on the same connection.
+ * HTTP/2 advocates to use a single connection to a server, by default the http client uses a single
+ * connection for each server, all the streams to the same server are multiplexed on the same connection.
+ *
+ * When it is desirable to limit the number of concurrent streams per server and uses a connection
+ * pool instead of a single connection, {@link io.vertx.core.http.HttpClientOptions#setMaxStreams(int)}
+ * can be used.
+ *
+ * [source,$lang]
+ * ----
+ * {@link examples.HTTP2Examples#useMaxStreams}
+ * ----
+ *
+ * The maximum streams for a connection is a setting set on the client that limits the streams
+ * of a single connection. The effective value can be even lower if the server sets a lower limit
+ * with the {@link io.vertx.core.http.Http2Settings#setMaxConcurrentStreams SETTINGS_MAX_CONCURRENT_STREAMS} setting.
*
* HTTP/2 connections will not be closed by the client automatically. To close them you can call {@link io.vertx.core.http.HttpConnection#close()}
* or close the client instance.
diff --git a/src/test/java/io/vertx/test/core/Http2ClientTest.java b/src/test/java/io/vertx/test/core/Http2ClientTest.java
index 7ca74b1691e..48a011708e2 100644
--- a/src/test/java/io/vertx/test/core/Http2ClientTest.java
+++ b/src/test/java/io/vertx/test/core/Http2ClientTest.java
@@ -55,6 +55,7 @@
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamResetException;
@@ -69,7 +70,9 @@
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -1657,4 +1660,52 @@ public void testReceivePing() throws Exception {
req.end();
await();
}
+
+ @Test
+ public void testMaxConcurrencySingleConnection() throws Exception {
+ testMaxConcurrency(1, 5);
+ }
+
+ @Test
+ public void testMaxConcurrencyMultipleConnections() throws Exception {
+ testMaxConcurrency(3, 5);
+ }
+
+ private void testMaxConcurrency(int poolSize, int maxConcurrency) throws Exception {
+ int maxRequests = poolSize * maxConcurrency;
+ int totalRequests = maxRequests + maxConcurrency;
+ Set serverConn = new HashSet<>();
+ server.connectionHandler(conn -> {
+ serverConn.add(conn);
+ assertTrue(serverConn.size() <= poolSize);
+ });
+ ArrayList requests = new ArrayList<>();
+ server.requestHandler(req -> {
+ if (requests.size() < maxRequests) {
+ requests.add(req);
+ if (requests.size() == maxRequests) {
+ vertx.setTimer(300, v -> {
+ assertEquals(maxRequests, requests.size());
+ requests.forEach(r -> r.response().end());
+ });
+ }
+ } else {
+ req.response().end();
+ }
+ });
+ startServer();
+ client.close();
+ client = vertx.createHttpClient(new HttpClientOptions(clientOptions).setMaxPoolSize(poolSize).setMaxStreams(maxConcurrency));
+ AtomicInteger respCount = new AtomicInteger();
+ for (int i = 0;i < maxRequests + maxConcurrency;i++) {
+ client.getNow(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
+ resp.endHandler(v -> {
+ if (respCount.incrementAndGet() == totalRequests) {
+ testComplete();
+ }
+ });
+ });
+ }
+ await();
+ }
}