diff --git a/src/main/asciidoc/dataobjects.adoc b/src/main/asciidoc/dataobjects.adoc
index 5d2854569d7..b7c7ab9089e 100644
--- a/src/main/asciidoc/dataobjects.adoc
+++ b/src/main/asciidoc/dataobjects.adoc
@@ -758,6 +758,10 @@ Set the list of protocol versions to provide to the server during the Applicatio
+++
Set the connect timeout
+++
+|[[connectionRecyclePolicy]]`connectionRecyclePolicy`|`link:enums.html#ConnectionRecyclePolicy[ConnectionRecyclePolicy]`|
++++
+set to connectionRecyclePolicy
the policy in which connections are recycled
++++
|[[crlPaths]]`crlPaths`|`Array of String`|
+++
Add a CRL path
diff --git a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
index 75d6daedd4c..6ae1230a241 100644
--- a/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
+++ b/src/main/generated/io/vertx/core/http/HttpClientOptionsConverter.java
@@ -35,6 +35,9 @@ static void fromJson(JsonObject json, HttpClientOptions obj) {
});
obj.setAlpnVersions(list);
}
+ if (json.getValue("connectionRecyclePolicy") instanceof String) {
+ obj.setConnectionRecyclePolicy(io.vertx.core.http.impl.pool.Pool.ConnectionRecyclePolicy.valueOf((String)json.getValue("connectionRecyclePolicy")));
+ }
if (json.getValue("decoderInitialBufferSize") instanceof Number) {
obj.setDecoderInitialBufferSize(((Number)json.getValue("decoderInitialBufferSize")).intValue());
}
@@ -121,6 +124,9 @@ static void toJson(HttpClientOptions obj, JsonObject json) {
obj.getAlpnVersions().forEach(item -> array.add(item.name()));
json.put("alpnVersions", array);
}
+ if (obj.getConnectionRecyclePolicy() != null) {
+ json.put("connectionRecyclePolicy", obj.getConnectionRecyclePolicy().name());
+ }
json.put("decoderInitialBufferSize", obj.getDecoderInitialBufferSize());
if (obj.getDefaultHost() != null) {
json.put("defaultHost", obj.getDefaultHost());
diff --git a/src/main/java/io/vertx/core/http/HttpClientOptions.java b/src/main/java/io/vertx/core/http/HttpClientOptions.java
index e50dbe547b0..4f30899ab04 100755
--- a/src/main/java/io/vertx/core/http/HttpClientOptions.java
+++ b/src/main/java/io/vertx/core/http/HttpClientOptions.java
@@ -13,6 +13,7 @@
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.impl.pool.Pool;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.*;
@@ -160,6 +161,11 @@ public class HttpClientOptions extends ClientOptionsBase {
*/
public static final int DEFAULT_DECODER_INITIAL_BUFFER_SIZE = 128;
+ /**
+ * Default policy on how to recycle HTTP connections in the connection POOL
+ */
+ public static final Pool.ConnectionRecyclePolicy DEFAULT_CONNECTION_RECYCLE_POLICY = Pool.ConnectionRecyclePolicy.FIFO;
+
private boolean verifyHost = true;
private int maxPoolSize;
private boolean keepAlive;
@@ -188,6 +194,7 @@ public class HttpClientOptions extends ClientOptionsBase {
private int maxRedirects;
private boolean forceSni;
private int decoderInitialBufferSize;
+ private Pool.ConnectionRecyclePolicy connectionRecyclePolicy;
/**
* Default constructor
@@ -231,6 +238,7 @@ public HttpClientOptions(HttpClientOptions other) {
this.maxRedirects = other.maxRedirects;
this.forceSni = other.forceSni;
this.decoderInitialBufferSize = other.getDecoderInitialBufferSize();
+ this.connectionRecyclePolicy = other.getConnectionRecyclePolicy();
}
/**
@@ -283,6 +291,7 @@ private void init() {
maxRedirects = DEFAULT_MAX_REDIRECTS;
forceSni = DEFAULT_FORCE_SNI;
decoderInitialBufferSize = DEFAULT_DECODER_INITIAL_BUFFER_SIZE;
+ connectionRecyclePolicy = DEFAULT_CONNECTION_RECYCLE_POLICY;
}
@Override
@@ -1041,6 +1050,23 @@ public HttpClientOptions setDecoderInitialBufferSize(int decoderInitialBufferSiz
return this;
}
+ /**
+ * set to {@code connectionRecyclePolicy} the policy in which connections are recycled
+ * @param connectionRecyclePolicy the connection recycle policy
+ * @returna reference to this, so the API can be used fluently
+ */
+ public HttpClientOptions setConnectionRecyclePolicy(Pool.ConnectionRecyclePolicy connectionRecyclePolicy) {
+ this.connectionRecyclePolicy = connectionRecyclePolicy;
+ return this;
+ }
+
+ /**
+ * @return the current connections recycle policy
+ */
+ public Pool.ConnectionRecyclePolicy getConnectionRecyclePolicy() {
+ return connectionRecyclePolicy;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
index db7a1019677..ccc6984c9c2 100644
--- a/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
+++ b/src/main/java/io/vertx/core/http/impl/ConnectionManager.java
@@ -37,18 +37,21 @@ class ConnectionManager {
private final Map endpointMap = new ConcurrentHashMap<>();
private final HttpVersion version;
private final long maxSize;
+ private final Pool.ConnectionRecyclePolicy connectionPoolRecyclePolicy;
private long timerID;
ConnectionManager(HttpClientImpl client,
HttpClientMetrics metrics,
HttpVersion version,
long maxSize,
- int maxWaitQueueSize) {
+ int maxWaitQueueSize,
+ Pool.ConnectionRecyclePolicy connectionRecyclePolicy) {
this.client = client;
this.maxWaitQueueSize = maxWaitQueueSize;
this.metrics = metrics;
this.maxSize = maxSize;
this.version = version;
+ this.connectionPoolRecyclePolicy = connectionRecyclePolicy;
}
synchronized void start(boolean checkExpired) {
@@ -127,7 +130,8 @@ void getConnection(String peerHost, boolean ssl, int port, String host,
endpointMap.remove(key);
},
connectionMap::put,
- connectionMap::remove);
+ connectionMap::remove,
+ connectionPoolRecyclePolicy);
return new Endpoint(pool, metric);
});
Object metric;
diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
index bc0f92ae234..51cbdfb8c51 100644
--- a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
+++ b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
@@ -155,8 +155,9 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) {
throw new IllegalStateException("Cannot have pipelining with no keep alive");
}
long maxWeight = options.getMaxPoolSize() * options.getHttp2MaxPoolSize();
- websocketCM = new ConnectionManager(this, metrics, HttpVersion.HTTP_1_1, maxWeight, options.getMaxWaitQueueSize());
- httpCM = new ConnectionManager(this, metrics, options.getProtocolVersion(), maxWeight, options.getMaxWaitQueueSize());
+ websocketCM = new ConnectionManager(this, metrics, HttpVersion.HTTP_1_1, maxWeight, options.getMaxWaitQueueSize(), options.getConnectionRecyclePolicy());
+
+ httpCM = new ConnectionManager(this, metrics, options.getProtocolVersion(), maxWeight, options.getMaxWaitQueueSize(), options.getConnectionRecyclePolicy());
proxyType = options.getProxyOptions() != null ? options.getProxyOptions().getType() : null;
httpCM.start(options.getKeepAliveTimeout() > 0 || options.getHttp2KeepAliveTimeout() > 0);
websocketCM.start(options.getKeepAliveTimeout() > 0 || options.getHttp2KeepAliveTimeout() > 0);
diff --git a/src/main/java/io/vertx/core/http/impl/pool/Pool.java b/src/main/java/io/vertx/core/http/impl/pool/Pool.java
index 7fbf205b3f2..a12db9160c2 100644
--- a/src/main/java/io/vertx/core/http/impl/pool/Pool.java
+++ b/src/main/java/io/vertx/core/http/impl/pool/Pool.java
@@ -45,9 +45,9 @@
* can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary.
*
* When a connection is created an {@link #initialWeight} is added to the current weight.
- * When the channel is connected the {@link ConnectionListener#onConnectSuccess} callback
+ * When the channel is connected, the {@link ConnectionListener#onConnectSuccess} callback
* provides the initial weight returned by the connect method and the actual connection weight so it can be used to
- * correct the current weight. When the channel fails to connect the {@link ConnectionListener#onConnectFailure} failure
+ * correct the current weight. When the channel fails to connect, the {@link ConnectionListener#onConnectFailure} failure
* provides the initial weight so it can be used to correct the current weight.
*
* When a connection is recycled and reaches its full capacity (i.e {@code Holder#concurrency == Holder#capacity},
@@ -59,7 +59,7 @@
* When a waiter asks for a connection, it is either added to the queue (when it's not empty) or attempted to be
* served (from the pool or by creating a new connection) or failed. The {@link #waitersCount} is the number
* of total waiters (the waiters in {@link #waitersQueue} but also the inflight) so we know if we can close the pool
- * or not. The {@link #waitersCount} is incremented when a waiter wants to acquire a connection succesfully (i.e
+ * or not. The {@link #waitersCount} is incremented when a waiter wants to acquire a connection successfully (i.e
* it is either added to the queue or served from the pool) and decremented when the it gets a reply (either with
* a connection or with a failure).
*
@@ -89,6 +89,16 @@ public String toString() {
return "Holder[removed=" + removed + ",capacity=" + capacity + ",concurrency=" + concurrency + ",expirationTimestamp=" + expirationTimestamp + "]";
}
}
+
+ /**
+ * Defines the policy in which connections will be reused in the pool
+ *
+ */
+ public enum ConnectionRecyclePolicy {
+ FIFO, // Maintains the available connection as a queue, keep connections for longer
+ LIFO // Maintains the available connection as a stack, allowing more connections to expire
+ }
+
private static final Logger log = LoggerFactory.getLogger(Pool.class);
private final ConnectionProvider connector;
@@ -108,13 +118,16 @@ public String toString() {
private boolean closed;
private final Handler poolClosed;
+ private final ConnectionRecyclePolicy connectionRecyclePolicy;
+
public Pool(ConnectionProvider connector,
int queueMaxSize,
long initialWeight,
long maxWeight,
Handler poolClosed,
BiConsumer connectionAdded,
- BiConsumer connectionRemoved) {
+ BiConsumer connectionRemoved,
+ ConnectionRecyclePolicy connectionRecyclePolicy) {
this.maxWeight = maxWeight;
this.initialWeight = initialWeight;
this.connector = connector;
@@ -123,6 +136,7 @@ public Pool(ConnectionProvider connector,
this.available = new ArrayDeque<>();
this.connectionAdded = connectionAdded;
this.connectionRemoved = connectionRemoved;
+ this.connectionRecyclePolicy = connectionRecyclePolicy;
}
public synchronized int waitersInQueue() {
@@ -350,13 +364,23 @@ private void recycleConnection(Holder conn, int c, long timestamp) {
connector.close(conn.connection);
} else {
if (conn.capacity == 0) {
- available.add(conn);
+ returnConnectionToPool(conn);
}
conn.expirationTimestamp = timestamp;
conn.capacity = newCapacity;
}
}
+ private void returnConnectionToPool(Holder conn) {
+ switch (this.connectionRecyclePolicy) {
+ case FIFO:
+ available.add(conn);
+ break;
+ case LIFO:
+ available.offerFirst(conn);
+ }
+ }
+
private void initConnection(Holder holder, ContextInternal context, long concurrency, C conn, Channel channel, long weight) {
this.weight += initialWeight - weight;
holder.context = context;