Skip to content

Commit

Permalink
The HTTP client pool should manage per host connections with a config…
Browse files Browse the repository at this point in the history
…urable LIFO or FIFO strategy

Signed-off-by: Michael Pogrebinsky <smichpog2@gmail.com>
  • Loading branch information
michaelpog committed Apr 11, 2018
1 parent 9bed0f6 commit a3a81aa
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/main/asciidoc/dataobjects.adoc
Expand Up @@ -758,6 +758,10 @@ Set the list of protocol versions to provide to the server during the Applicatio
+++
Set the connect timeout
+++
|[[connectionRecyclePolicy]]`connectionRecyclePolicy`|`link:enums.html#ConnectionRecyclePolicy[ConnectionRecyclePolicy]`|
+++
set to <code>connectionRecyclePolicy</code> the policy in which connections are recycled
+++
|[[crlPaths]]`crlPaths`|`Array of String`|
+++
Add a CRL path
Expand Down
Expand Up @@ -35,6 +35,9 @@ static void fromJson(JsonObject json, HttpClientOptions obj) {
});
obj.setAlpnVersions(list);
}
if (json.getValue("connectionRecyclePolicy") instanceof String) {
obj.setConnectionRecyclePolicy(io.vertx.core.http.impl.pool.Pool.ConnectionRecyclePolicy.valueOf((String)json.getValue("connectionRecyclePolicy")));
}
if (json.getValue("decoderInitialBufferSize") instanceof Number) {
obj.setDecoderInitialBufferSize(((Number)json.getValue("decoderInitialBufferSize")).intValue());
}
Expand Down Expand Up @@ -121,6 +124,9 @@ static void toJson(HttpClientOptions obj, JsonObject json) {
obj.getAlpnVersions().forEach(item -> array.add(item.name()));
json.put("alpnVersions", array);
}
if (obj.getConnectionRecyclePolicy() != null) {
json.put("connectionRecyclePolicy", obj.getConnectionRecyclePolicy().name());
}
json.put("decoderInitialBufferSize", obj.getDecoderInitialBufferSize());
if (obj.getDefaultHost() != null) {
json.put("defaultHost", obj.getDefaultHost());
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/vertx/core/http/HttpClientOptions.java
Expand Up @@ -13,6 +13,7 @@

import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.pool.Pool;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.*;
Expand Down Expand Up @@ -160,6 +161,11 @@ public class HttpClientOptions extends ClientOptionsBase {
*/
public static final int DEFAULT_DECODER_INITIAL_BUFFER_SIZE = 128;

/**
* Default policy on how to recycle HTTP connections in the connection POOL
*/
public static final Pool.ConnectionRecyclePolicy DEFAULT_CONNECTION_RECYCLE_POLICY = Pool.ConnectionRecyclePolicy.FIFO;

private boolean verifyHost = true;
private int maxPoolSize;
private boolean keepAlive;
Expand Down Expand Up @@ -188,6 +194,7 @@ public class HttpClientOptions extends ClientOptionsBase {
private int maxRedirects;
private boolean forceSni;
private int decoderInitialBufferSize;
private Pool.ConnectionRecyclePolicy connectionRecyclePolicy;

/**
* Default constructor
Expand Down Expand Up @@ -231,6 +238,7 @@ public HttpClientOptions(HttpClientOptions other) {
this.maxRedirects = other.maxRedirects;
this.forceSni = other.forceSni;
this.decoderInitialBufferSize = other.getDecoderInitialBufferSize();
this.connectionRecyclePolicy = other.getConnectionRecyclePolicy();
}

/**
Expand Down Expand Up @@ -283,6 +291,7 @@ private void init() {
maxRedirects = DEFAULT_MAX_REDIRECTS;
forceSni = DEFAULT_FORCE_SNI;
decoderInitialBufferSize = DEFAULT_DECODER_INITIAL_BUFFER_SIZE;
connectionRecyclePolicy = DEFAULT_CONNECTION_RECYCLE_POLICY;
}

@Override
Expand Down Expand Up @@ -1041,6 +1050,23 @@ public HttpClientOptions setDecoderInitialBufferSize(int decoderInitialBufferSiz
return this;
}

/**
* set to {@code connectionRecyclePolicy} the policy in which connections are recycled
* @param connectionRecyclePolicy the connection recycle policy
* @returna reference to this, so the API can be used fluently
*/
public HttpClientOptions setConnectionRecyclePolicy(Pool.ConnectionRecyclePolicy connectionRecyclePolicy) {
this.connectionRecyclePolicy = connectionRecyclePolicy;
return this;
}

/**
* @return the current connections recycle policy
*/
public Pool.ConnectionRecyclePolicy getConnectionRecyclePolicy() {
return connectionRecyclePolicy;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -37,18 +37,21 @@ class ConnectionManager {
private final Map<EndpointKey, Endpoint> endpointMap = new ConcurrentHashMap<>();
private final HttpVersion version;
private final long maxSize;
private final Pool.ConnectionRecyclePolicy connectionPoolRecyclePolicy;
private long timerID;

ConnectionManager(HttpClientImpl client,
HttpClientMetrics metrics,
HttpVersion version,
long maxSize,
int maxWaitQueueSize) {
int maxWaitQueueSize,
Pool.ConnectionRecyclePolicy connectionRecyclePolicy) {
this.client = client;
this.maxWaitQueueSize = maxWaitQueueSize;
this.metrics = metrics;
this.maxSize = maxSize;
this.version = version;
this.connectionPoolRecyclePolicy = connectionRecyclePolicy;
}

synchronized void start(boolean checkExpired) {
Expand Down Expand Up @@ -127,7 +130,8 @@ void getConnection(String peerHost, boolean ssl, int port, String host,
endpointMap.remove(key);
},
connectionMap::put,
connectionMap::remove);
connectionMap::remove,
connectionPoolRecyclePolicy);
return new Endpoint(pool, metric);
});
Object metric;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -155,8 +155,9 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) {
throw new IllegalStateException("Cannot have pipelining with no keep alive");
}
long maxWeight = options.getMaxPoolSize() * options.getHttp2MaxPoolSize();
websocketCM = new ConnectionManager(this, metrics, HttpVersion.HTTP_1_1, maxWeight, options.getMaxWaitQueueSize());
httpCM = new ConnectionManager(this, metrics, options.getProtocolVersion(), maxWeight, options.getMaxWaitQueueSize());
websocketCM = new ConnectionManager(this, metrics, HttpVersion.HTTP_1_1, maxWeight, options.getMaxWaitQueueSize(), options.getConnectionRecyclePolicy());

httpCM = new ConnectionManager(this, metrics, options.getProtocolVersion(), maxWeight, options.getMaxWaitQueueSize(), options.getConnectionRecyclePolicy());
proxyType = options.getProxyOptions() != null ? options.getProxyOptions().getType() : null;
httpCM.start(options.getKeepAliveTimeout() > 0 || options.getHttp2KeepAliveTimeout() > 0);
websocketCM.start(options.getKeepAliveTimeout() > 0 || options.getHttp2KeepAliveTimeout() > 0);
Expand Down
34 changes: 29 additions & 5 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -45,9 +45,9 @@
* can mix connections with different concurrency (HTTP/1 and HTTP/2) and this flexibility is necessary.
*
* When a connection is created an {@link #initialWeight} is added to the current weight.
* When the channel is connected the {@link ConnectionListener#onConnectSuccess} callback
* When the channel is connected, the {@link ConnectionListener#onConnectSuccess} callback
* provides the initial weight returned by the connect method and the actual connection weight so it can be used to
* correct the current weight. When the channel fails to connect the {@link ConnectionListener#onConnectFailure} failure
* correct the current weight. When the channel fails to connect, the {@link ConnectionListener#onConnectFailure} failure
* provides the initial weight so it can be used to correct the current weight.
*
* When a connection is recycled and reaches its full capacity (i.e {@code Holder#concurrency == Holder#capacity},
Expand All @@ -59,7 +59,7 @@
* When a waiter asks for a connection, it is either added to the queue (when it's not empty) or attempted to be
* served (from the pool or by creating a new connection) or failed. The {@link #waitersCount} is the number
* of total waiters (the waiters in {@link #waitersQueue} but also the inflight) so we know if we can close the pool
* or not. The {@link #waitersCount} is incremented when a waiter wants to acquire a connection succesfully (i.e
* or not. The {@link #waitersCount} is incremented when a waiter wants to acquire a connection successfully (i.e
* it is either added to the queue or served from the pool) and decremented when the it gets a reply (either with
* a connection or with a failure).
*
Expand Down Expand Up @@ -89,6 +89,16 @@ public String toString() {
return "Holder[removed=" + removed + ",capacity=" + capacity + ",concurrency=" + concurrency + ",expirationTimestamp=" + expirationTimestamp + "]";
}
}

/**
* Defines the policy in which connections will be reused in the pool
*
*/
public enum ConnectionRecyclePolicy {
FIFO, // Maintains the available connection as a queue, keep connections for longer
LIFO // Maintains the available connection as a stack, allowing more connections to expire
}

private static final Logger log = LoggerFactory.getLogger(Pool.class);

private final ConnectionProvider<C> connector;
Expand All @@ -108,13 +118,16 @@ public String toString() {
private boolean closed;
private final Handler<Void> poolClosed;

private final ConnectionRecyclePolicy connectionRecyclePolicy;

public Pool(ConnectionProvider<C> connector,
int queueMaxSize,
long initialWeight,
long maxWeight,
Handler<Void> poolClosed,
BiConsumer<Channel, C> connectionAdded,
BiConsumer<Channel, C> connectionRemoved) {
BiConsumer<Channel, C> connectionRemoved,
ConnectionRecyclePolicy connectionRecyclePolicy) {
this.maxWeight = maxWeight;
this.initialWeight = initialWeight;
this.connector = connector;
Expand All @@ -123,6 +136,7 @@ public Pool(ConnectionProvider<C> connector,
this.available = new ArrayDeque<>();
this.connectionAdded = connectionAdded;
this.connectionRemoved = connectionRemoved;
this.connectionRecyclePolicy = connectionRecyclePolicy;
}

public synchronized int waitersInQueue() {
Expand Down Expand Up @@ -350,13 +364,23 @@ private void recycleConnection(Holder<C> conn, int c, long timestamp) {
connector.close(conn.connection);
} else {
if (conn.capacity == 0) {
available.add(conn);
returnConnectionToPool(conn);
}
conn.expirationTimestamp = timestamp;
conn.capacity = newCapacity;
}
}

private void returnConnectionToPool(Holder<C> conn) {
switch (this.connectionRecyclePolicy) {
case FIFO:
available.add(conn);
break;
case LIFO:
available.offerFirst(conn);
}
}

private void initConnection(Holder<C> holder, ContextInternal context, long concurrency, C conn, Channel channel, long weight) {
this.weight += initialWeight - weight;
holder.context = context;
Expand Down

0 comments on commit a3a81aa

Please sign in to comment.