diff --git a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java index 1e7164caf9e..41cd7aa759f 100644 --- a/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java @@ -460,7 +460,7 @@ public void reset(long code) { private void requestEnded() { context.runOnContext(v -> { if (pipelining && requests.size() < pipeliningLimit) { - listener.recycle(this); + listener.onRecycle(this); } }); } @@ -471,7 +471,7 @@ private void responseEnded() { } else { context.runOnContext(v -> { if (currentRequest == null) { - listener.recycle(this); + listener.onRecycle(this); } }); } @@ -630,7 +630,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan ByteBuf buf = (ByteBuf) msg; connection.handleMessageReceived(buf); } - }.removeHandler(sock -> listener.closed(this))); + }.removeHandler(sock -> listener.onClose(this))); return socket; } diff --git a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java index 8fb106644d5..caa4dd10ac0 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java @@ -49,7 +49,6 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon private final HttpClientImpl client; final HttpClientMetrics metrics; final Object queueMetric; - int streamCount; // Exclusively used by the HTTP/2 connection pool public Http2ClientConnection(ConnectionListener listener, Object queueMetric, @@ -70,8 +69,8 @@ public Channel channel() { } @Override - protected void concurrencyChanged() { - listener.concurrencyChanged(this); + protected void concurrencyChanged(long concurrency) { + listener.onConcurrencyChange(this, concurrency); } @Override @@ -82,7 +81,7 @@ public HttpClientMetrics metrics() { @Override void onStreamClosed(Http2Stream nettyStream) { super.onStreamClosed(nettyStream); - listener.recycle(this); + listener.onRecycle(this); } public synchronized HttpClientStream createStream() throws Http2Exception { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java index 6cb20ab7dc0..70e65a2c796 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java @@ -84,7 +84,7 @@ static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) { private final ArrayDeque updateSettingsHandlers = new ArrayDeque<>(4); private final ArrayDeque>> pongHandlers = new ArrayDeque<>(); private Http2Settings localSettings = new Http2Settings(); - private Http2Settings remoteSettings = new Http2Settings(); + private Http2Settings remoteSettings; private Handler goAwayHandler; private Handler shutdownHandler; private Handler pingHandler; @@ -212,16 +212,22 @@ public synchronized void onSettingsAckRead(ChannelHandlerContext ctx) { protected void onConnect() { } - protected void concurrencyChanged() { + protected void concurrencyChanged(long concurrency) { } @Override public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { - boolean changed = false; - if (settings.maxConcurrentStreams() != null) { - long val = settings.maxConcurrentStreams(); - changed = val != maxConcurrentStreams; + boolean changed; + Long val = settings.maxConcurrentStreams(); + if (val != null) { + if (remoteSettings != null) { + changed = val != maxConcurrentStreams; + } else { + changed = false; + } maxConcurrentStreams = val; + } else { + changed = false; } remoteSettings = settings; synchronized (this) { @@ -233,7 +239,7 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { } } if (changed) { - concurrencyChanged(); + concurrencyChanged(maxConcurrentStreams); } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java index 4561dc27c89..7370f2d39fc 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java +++ b/src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java @@ -25,7 +25,6 @@ import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.IdleStateHandler; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpVersion; @@ -51,6 +50,10 @@ class HttpChannelConnector implements ConnectionProvider { private final HttpClientMetrics metrics; private final SSLHelper sslHelper; private final HttpVersion version; + private final long weight; + private final long http1Weight; + private final long http2Weight; + private final long http2MaxConcurrency; HttpChannelConnector(HttpClientImpl client) { this.client = client; @@ -58,11 +61,15 @@ class HttpChannelConnector implements ConnectionProvider { this.metrics = client.metrics(); this.sslHelper = client.getSslHelper(); this.version = options.getProtocolVersion(); + this.http1Weight = client.getOptions().getHttp2MaxPoolSize(); + this.http2Weight = client.getOptions().getMaxPoolSize(); + this.weight = version == HttpVersion.HTTP_2 ? http2Weight : http1Weight; + this.http2MaxConcurrency = client.getOptions().getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : client.getOptions().getHttp2MultiplexingLimit(); } @Override - public Channel channel(HttpClientConnection conn) { - return conn.channel(); + public boolean isValid(HttpClientConnection conn) { + return conn.isValid(); } @Override @@ -70,15 +77,14 @@ public void close(HttpClientConnection conn) { conn.close(); } - public void connect( + public long connect( ConnectionListener listener, Object endpointMetric, ContextImpl context, String peerHost, boolean ssl, String host, - int port, - Handler> handler) { + int port) { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(context.nettyEventLoop()); @@ -109,19 +115,19 @@ public void connect( if (useAlpn) { if ("h2".equals(protocol)) { applyHttp2ConnectionOptions(ch.pipeline()); - http2Connected(listener, endpointMetric, context, ch, handler); + http2Connected(listener, endpointMetric, context, ch); } else { applyHttp1xConnectionOptions(ch.pipeline()); HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1; - http1xConnected(listener, fallbackProtocol, host, port, true, endpointMetric, context, ch, handler); + http1xConnected(listener, fallbackProtocol, host, port, true, endpointMetric, context, ch); } } else { applyHttp1xConnectionOptions(ch.pipeline()); - http1xConnected(listener, version, host, port, true, endpointMetric, context, ch, handler); + http1xConnected(listener, version, host, port, true, endpointMetric, context, ch); } } else { - handshakeFailure(ch, fut.cause(), handler); + handshakeFailure(ch, fut.cause(), listener); } }); } else { @@ -149,7 +155,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception p.remove(this); // Upgrade handler will remove itself applyHttp1xConnectionOptions(ch.pipeline()); - http1xConnected(listener, HttpVersion.HTTP_1_1, host, port, false, endpointMetric, context, ch, handler); + http1xConnected(listener, HttpVersion.HTTP_1_1, host, port, false, endpointMetric, context, ch); } } @Override @@ -165,7 +171,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc @Override public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception { applyHttp2ConnectionOptions(pipeline); - http2Connected(listener, endpointMetric, context, ch, handler); + http2Connected(listener, endpointMetric, context, ch); } }; HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536); @@ -188,18 +194,20 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons // Upgrade handler do nothing } else { if (version == HttpVersion.HTTP_2 && !client.getOptions().isHttp2ClearTextUpgrade()) { - http2Connected(listener, endpointMetric, context, ch, handler); + http2Connected(listener, endpointMetric, context, ch); } else { - http1xConnected(listener, version, host, port, false, endpointMetric, context, ch, handler); + http1xConnected(listener, version, host, port, false, endpointMetric, context, ch); } } } } else { - connectFailed(null, handler, res.cause()); + connectFailed(null, listener, res.cause()); } }; channelProvider.connect(client.getVertx(), bootstrap, client.getOptions().getProxyOptions(), SocketAddress.inetSocketAddress(port, host), channelInitializer, channelHandler); + + return weight; } private void applyConnectionOptions(Bootstrap bootstrap) { @@ -231,12 +239,12 @@ private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) { } } - private void handshakeFailure(Channel ch, Throwable cause, Handler> handler) { + private void handshakeFailure(Channel ch, Throwable cause, ConnectionListener listener) { SSLHandshakeException sslException = new SSLHandshakeException("Failed to create SSL connection"); if (cause != null) { sslException.initCause(cause); } - connectFailed(ch, handler, sslException); + connectFailed(ch, listener, sslException); } private void http1xConnected(ConnectionListener listener, @@ -246,8 +254,7 @@ private void http1xConnected(ConnectionListener listener, boolean ssl, Object endpointMetric, ContextImpl context, - Channel ch, - Handler> handler) { + Channel ch) { Http1xClientHandler clientHandler = new Http1xClientHandler( listener, context, @@ -259,10 +266,10 @@ private void http1xConnected(ConnectionListener listener, endpointMetric, client.metrics()); clientHandler.addHandler(conn -> { - handler.handle(Future.succeededFuture(conn)); + listener.onConnectSuccess(conn, 1, ch, context, weight, http1Weight, Long.MAX_VALUE); }); clientHandler.removeHandler(conn -> { - listener.closed(conn); + listener.onClose(conn); }); ch.pipeline().addLast("handler", clientHandler); } @@ -270,8 +277,7 @@ private void http1xConnected(ConnectionListener listener, private void http2Connected(ConnectionListener listener, Object endpointMetric, ContextImpl context, - Channel ch, - Handler> resultHandler) { + Channel ch) { try { boolean upgrade; upgrade = ch.pipeline().get(SslHandler.class) == null && options.isHttp2ClearTextUpgrade(); @@ -291,26 +297,27 @@ private void http2Connected(ConnectionListener listener, Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName()); conn.metric(metric); } - resultHandler.handle(Future.succeededFuture(conn)); + long concurrency = conn.remoteSettings().getMaxConcurrentStreams(); + listener.onConnectSuccess(conn, concurrency, ch, context, weight, http2Weight, http2MaxConcurrency); }); handler.removeHandler(conn -> { if (metrics != null) { metrics.endpointDisconnected(endpointMetric, conn.metric()); } - listener.closed(conn); + listener.onClose(conn); }); } catch (Exception e) { - connectFailed(ch, resultHandler, e); + connectFailed(ch, listener, e); } } - private void connectFailed(Channel ch, Handler> connectionExceptionHandler, Throwable t) { + private void connectFailed(Channel ch, ConnectionListener listener, Throwable t) { if (ch != null) { try { ch.close(); } catch (Exception ignore) { } } - connectionExceptionHandler.handle(Future.failedFuture(t)); + listener.onConnectFailure(t, weight); } } diff --git a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java index fdd4127f0f7..0cab0e3549b 100644 --- a/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java +++ b/src/main/java/io/vertx/core/http/impl/HttpClientImpl.java @@ -33,8 +33,8 @@ import io.vertx.core.http.WebSocket; import io.vertx.core.http.WebsocketVersion; import io.vertx.core.http.impl.pool.ConnectionManager; -import io.vertx.core.http.impl.pool.ConnectionPool; import io.vertx.core.http.impl.pool.ConnectionProvider; +import io.vertx.core.http.impl.pool.PoolOptions; import io.vertx.core.http.impl.pool.Waiter; import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextInternal; @@ -160,7 +160,10 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) { } ConnectionProvider connector = new HttpChannelConnector(this); - Function> poolFactory = sa -> new HttpConnectionPool(options.getProtocolVersion(), options); + Function poolFactory = sa -> { + long ms = options.getMaxPoolSize() * options.getHttp2MaxPoolSize(); + return new PoolOptions().setMaxSize(ms); + }; websocketCM = new ConnectionManager<>(metrics, connector, poolFactory, options.getMaxWaitQueueSize()); httpCM = new ConnectionManager<>(metrics, connector, poolFactory, options.getMaxWaitQueueSize()); diff --git a/src/main/java/io/vertx/core/http/impl/HttpConnectionPool.java b/src/main/java/io/vertx/core/http/impl/HttpConnectionPool.java deleted file mode 100644 index 122d6faf036..00000000000 --- a/src/main/java/io/vertx/core/http/impl/HttpConnectionPool.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright (c) 2011-2013 The original author or authors - * ------------------------------------------------------ - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.core.http.impl; - -import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.HttpVersion; -import io.vertx.core.http.impl.pool.ConnectionManager; -import io.vertx.core.http.impl.pool.ConnectionPool; -import io.vertx.core.impl.ContextImpl; - -import java.util.*; - -/** - * The logic for the connection pool for HTTP: - * - * - HTTP/1.x pools several connections - * - HTTP/2 uses can multiplex on a single connections but can handle several connections - *

- * When this pool is initialized with an HTTP/2 pool, this pool can be changed to an HTTP/1/1 - * pool if the remote server does not support HTTP/2 or after ALPN negotiation. - *

- * In this case further connections will be HTTP/1.1 connections, until the pool is closed. - */ -class HttpConnectionPool implements ConnectionPool { - - private ConnectionPool current; - private final HttpClientOptions options; - private HttpVersion version; - - HttpConnectionPool(HttpVersion version, HttpClientOptions options) { - this.version = version; - this.options = options; - if (version == HttpVersion.HTTP_2) { - current = new Http2(); - } else { - current = new Http1x(); - } - } - - private void fallbackToHttp1x(HttpVersion version) { - this.current = new Http1x(); - this.version = version; - } - - public HttpVersion version() { - return version; - } - - @Override - public boolean isValid(HttpClientConnection conn) { - return conn.isValid(); - } - - @Override - public ContextImpl getContext(HttpClientConnection conn) { - return conn.getContext(); - } - - @Override - public boolean canBorrow(int connCount) { - return current.canBorrow(connCount); - } - - @Override - public HttpClientConnection pollConnection() { - return current.pollConnection(); - } - - @Override - public boolean canCreateConnection(int connCount) { - return current.canCreateConnection(connCount); - } - - @Override - public int maxSize() { - return current.maxSize(); - } - - @Override - public boolean initConnection(HttpClientConnection conn) { - if (conn instanceof Http1xClientConnection && current instanceof Http2) { - fallbackToHttp1x(((Http1xClientConnection) conn).version()); - } - return ((ConnectionPool) current).initConnection(conn); - } - - @Override - public void recycleConnection(HttpClientConnection conn) { - ((ConnectionPool) current).recycleConnection(conn); - } - - @Override - public void evictConnection(HttpClientConnection conn) { - ((ConnectionPool) current).evictConnection(conn); - } - - @Override - public void close() { - } - - class Http2 implements ConnectionPool { - - // Pools must locks on the queue object to keep a single lock - private final Set allConnections = new HashSet<>(); - private final int maxConcurrency; - private final int maxSockets; - - Http2() { - this.maxConcurrency = options.getHttp2MultiplexingLimit() < 1 ? Integer.MAX_VALUE : options.getHttp2MultiplexingLimit(); - this.maxSockets = options.getHttp2MaxPoolSize(); - } - - @Override - public int maxSize() { - return maxSockets; - } - - @Override - public boolean canCreateConnection(int connCount) { - // We create at most one connection concurrently when all others when - // all others are busy - return connCount == allConnections.size() && connCount < maxSockets; - } - - @Override - public boolean canBorrow(int connCount) { - for (Http2ClientConnection conn : allConnections) { - if (canReserveStream(conn)) { - return true; - } - } - return canCreateConnection(connCount); - } - - @Override - public Http2ClientConnection pollConnection() { - for (Http2ClientConnection conn : allConnections) { - // Julien : check conn is valid ? - if (canReserveStream(conn)) { - conn.streamCount++; - return conn; - } - } - return null; - } - - private boolean canReserveStream(Http2ClientConnection handler) { - int maxConcurrentStreams = Math.min(handler.handler.connection().local().maxActiveStreams(), maxConcurrency); - return handler.streamCount < maxConcurrentStreams; - } - - @Override - public void evictConnection(Http2ClientConnection conn) { - allConnections.remove(conn); - } - - @Override - public boolean initConnection(Http2ClientConnection conn) { - allConnections.add(conn); - if (canReserveStream(conn)) { - conn.streamCount++; - return true; - } else { - return false; - } - } - - @Override - public void recycleConnection(Http2ClientConnection conn) { - conn.streamCount--; - } - - @Override - public boolean isValid(Http2ClientConnection conn) { - return conn.isValid(); - } - - @Override - public ContextImpl getContext(Http2ClientConnection conn) { - return conn.getContext(); - } - - @Override - public void close() { - } - } - - private class Http1x implements ConnectionPool { - - private final Set allConnections = new HashSet<>(); - private final Queue availableConnections = new ArrayDeque<>(); - private final int maxSockets; - - Http1x() { - this.maxSockets = options.getMaxPoolSize(); - } - - @Override - public int maxSize() { - return maxSockets; - } - - @Override - public boolean canBorrow(int connCount) { - return connCount < maxSockets; - } - - @Override - public Http1xClientConnection pollConnection() { - Http1xClientConnection conn; - while ((conn = availableConnections.poll()) != null && !conn.isValid()) { - } - return conn; - } - - @Override - public boolean canCreateConnection(int connCount) { - return connCount < maxSockets; - } - - @Override - public boolean initConnection(Http1xClientConnection conn) { - allConnections.add(conn); - return true; - } - - @Override - public void recycleConnection(Http1xClientConnection conn) { - availableConnections.add(conn); - } - - public void closeAllConnections() { - Set copy = new HashSet<>(allConnections); - allConnections.clear(); - // Close outside sync block to avoid potential deadlock - for (Http1xClientConnection conn : copy) { - try { - conn.close(); - } catch (Throwable t) { - ConnectionManager.log.error("Failed to close connection", t); - } - } - } - - @Override - public void evictConnection(Http1xClientConnection conn) { - allConnections.remove(conn); - availableConnections.remove(conn); - } - - @Override - public boolean isValid(Http1xClientConnection conn) { - return conn.isValid(); - } - - @Override - public ContextImpl getContext(Http1xClientConnection conn) { - return conn.getContext(); - } - - @Override - public void close() { - } - } -} diff --git a/src/main/java/io/vertx/core/http/impl/pool/ConnectionHolder.java b/src/main/java/io/vertx/core/http/impl/pool/ConnectionHolder.java new file mode 100644 index 00000000000..60050eba707 --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/pool/ConnectionHolder.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2011-2014 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.core.http.impl.pool; + +import io.netty.channel.Channel; +import io.vertx.core.impl.ContextImpl; + +public class ConnectionHolder { + + volatile C connection; + long concurrency; + int inflight; + Channel channel; + ContextImpl context; + long weight; + long maxConcurrency; + + public C connection() { + return connection; + } + +} diff --git a/src/main/java/io/vertx/core/http/impl/pool/ConnectionListener.java b/src/main/java/io/vertx/core/http/impl/pool/ConnectionListener.java index f9eba3b87f5..581c8c11d7c 100644 --- a/src/main/java/io/vertx/core/http/impl/pool/ConnectionListener.java +++ b/src/main/java/io/vertx/core/http/impl/pool/ConnectionListener.java @@ -15,30 +15,28 @@ */ package io.vertx.core.http.impl.pool; +import io.netty.channel.Channel; +import io.vertx.core.impl.ContextImpl; + /** - * The listener is used by the {@link ConnectionProvider} to interact with the connection manager. + * The listener is used by the {@link ConnectionProvider} to interact with the connection pool. */ public interface ConnectionListener { - /** - * The connection concurrency changed. - * - * @param conn the connection - */ - void concurrencyChanged(C conn); - - /** - * The connection can be recycled. - * - * @param conn the connection - */ - void recycle(C conn); - - /** - * The connection is closed. - * - * @param conn the connection - */ - void closed(C conn); + void onConnectSuccess(C conn, + long concurrency, + Channel channel, + ContextImpl context, + long oldWeight, + long newWeight, + long maxConcurrency); + + void onConnectFailure(Throwable err, long weight); + + void onConcurrencyChange(C conn, long concurrency); + + void onRecycle(C conn); + + void onClose(C conn); } diff --git a/src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java b/src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java index 09305965bc9..b723f1b8ecd 100644 --- a/src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java +++ b/src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java @@ -17,17 +17,12 @@ package io.vertx.core.http.impl.pool; import io.netty.channel.Channel; -import io.vertx.core.http.ConnectionPoolTooBusyException; -import io.vertx.core.impl.ContextImpl; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.core.net.SocketAddress; import io.vertx.core.spi.metrics.HttpClientMetrics; -import java.util.ArrayDeque; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -41,27 +36,27 @@ public class ConnectionManager { private final int maxWaitQueueSize; private final HttpClientMetrics metrics; // Shall be removed later combining the PoolMetrics with HttpClientMetrics private final ConnectionProvider connector; - private final Function> poolFactory; + private final Function optionsProvider; private final Map connectionMap = new ConcurrentHashMap<>(); - private final Map endpointMap = new ConcurrentHashMap<>(); + private final Map> endpointMap = new ConcurrentHashMap<>(); public ConnectionManager(HttpClientMetrics metrics, ConnectionProvider connector, - Function> poolFactory, + Function optionsProvider, int maxWaitQueueSize) { this.maxWaitQueueSize = maxWaitQueueSize; this.metrics = metrics; this.connector = connector; - this.poolFactory = poolFactory; + this.optionsProvider = optionsProvider; } - private static final class ConnectionKey { + private static final class EndpointKey { private final boolean ssl; private final int port; private final String host; - ConnectionKey(boolean ssl, int port, String host) { + EndpointKey(boolean ssl, int port, String host) { this.ssl = ssl; this.host = host; this.port = port; @@ -72,7 +67,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ConnectionKey that = (ConnectionKey) o; + EndpointKey that = (EndpointKey) o; if (ssl != that.ssl) return false; if (port != that.port) return false; @@ -90,17 +85,25 @@ public int hashCode() { } } - private Endpoint getConnQueue(String peerHost, boolean ssl, int port, String host) { - ConnectionKey key = new ConnectionKey(ssl, port, peerHost); - return endpointMap.computeIfAbsent(key, targetAddress -> { - ConnectionPool pool = poolFactory.apply(SocketAddress.inetSocketAddress(port, host)); - return new Endpoint( connector, peerHost, host, port, ssl, key, pool); - }); + private Pool getConnQueue(String peerHost, boolean ssl, int port, String host) { + EndpointKey key = new EndpointKey(ssl, port, peerHost); + PoolOptions options = optionsProvider.apply(SocketAddress.inetSocketAddress(port, host)); + return endpointMap.computeIfAbsent(key, targetAddress -> new Pool<>( + connector, + metrics, + maxWaitQueueSize, + peerHost, + host, + port, + ssl, + options.getMaxSize(), v -> endpointMap.remove(key), + holder -> connectionMap.put(holder.channel, holder.connection), + holder -> connectionMap.remove(holder.channel))); } public void getConnection(String peerHost, boolean ssl, int port, String host, Waiter waiter) { while (true) { - Endpoint connQueue = getConnQueue(peerHost, ssl, port, host); + Pool connQueue = getConnQueue(peerHost, ssl, port, host); if (connQueue.getConnection(waiter)) { break; } @@ -113,172 +116,4 @@ public void close() { connector.close(conn); } } - - /** - * The endpoint is a queue of waiters and it delegates to the connection pool, the pooling strategy. - * - * An endpoint is synchronized and should be executed only from the event loop, the underlying pool - * relies and the synchronization performed by the endpoint. - */ - private class Endpoint implements ConnectionListener { - - private final String peerHost; - private final boolean ssl; - private final int port; - private final String host; - private final ConnectionKey key; - private final ConnectionPool pool; - private final ConnectionProvider connector; - private final Object metric; - - private int connCount; - private final Queue> waiters = new ArrayDeque<>(); - private boolean closed; - - - private Endpoint(ConnectionProvider connector, - String peerHost, - String host, - int port, - boolean ssl, - ConnectionKey key, - ConnectionPool pool) { - this.key = key; - this.host = host; - this.port = port; - this.ssl = ssl; - this.peerHost = peerHost; - this.connector = connector; - this.pool = pool; - this.metric = metrics != null ? metrics.createEndpoint(host, port, pool.maxSize()) : null; - } - - private synchronized boolean getConnection(Waiter waiter) { - if (closed) { - return false; - } - // Enqueue - if (maxWaitQueueSize < 0 || waiters.size() < maxWaitQueueSize || pool.canBorrow(connCount)) { - if (metrics != null) { - waiter.metric = metrics.enqueueRequest(metric); - } - waiters.add(waiter); - checkPending(); - } else { - waiter.handleFailure(null, new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + maxWaitQueueSize)); - } - return true; - } - - private synchronized void checkPending() { - while (true) { - Waiter waiter = waiters.peek(); - if (waiter == null) { - break; - } - if (metric != null) { - metrics.dequeueRequest(metric, waiter.metric); - } - C conn = pool.pollConnection(); - if (conn != null) { - waiters.poll(); - ContextImpl ctx = pool.getContext(conn); - ctx.nettyEventLoop().execute(() -> { - if (pool.isValid(conn)) { - boolean handled = deliverInternal(conn, waiter); - if (!handled) { - pool.recycleConnection(conn); - checkPending(); - } - } else { - waiters.add(waiter); - closed(conn); - } - }); - } else if (pool.canCreateConnection(connCount)) { - waiters.poll(); - createConnection(waiter); - } else { - break; - } - } - } - - private void createConnection(Waiter waiter) { - connCount++; - connector.connect(this, metric, waiter.context, peerHost, ssl, host, port, ar -> { - if (ar.succeeded()) { - initConnection(waiter, ar.result()); - checkPending(); - } else { - synchronized (Endpoint.this) { - waiter.handleFailure(waiter.context, ar.cause()); - connCount--; - } - checkPending(); - checkClose(); - } - }); - } - - @Override - public void concurrencyChanged(C conn) { - checkPending(); - } - - @Override - public void recycle(C conn) { - pool.recycleConnection(conn); - checkPending(); - } - - @Override - public synchronized void closed(C conn) { - closeConnection(conn); - checkPending(); - checkClose(); - } - - private synchronized void closeConnection(C conn) { - Channel channel = connector.channel(conn); - connectionMap.remove(channel); - pool.evictConnection(conn); - connCount--; - } - - private synchronized void initConnection(Waiter waiter, C conn) { - connectionMap.put(connector.channel(conn), conn); - boolean ok = pool.initConnection(conn); - waiter.initConnection(conn); - if (ok) { - boolean handled = deliverInternal(conn, waiter); - if (!handled) { - pool.recycleConnection(conn); - } - } else { - waiters.add(waiter); - } - } - - private boolean deliverInternal(C conn, Waiter waiter) { - try { - return waiter.handleConnection(conn); - } catch (Exception e) { - // e.printStackTrace(); - return true; - } - } - - synchronized void checkClose() { - if (connCount == 0) { - // No waiters and no connections - remove the ConnQueue - endpointMap.remove(key); - if (metrics != null) { - metrics.closeEndpoint(host, port, metric); - } - pool.close(); - closed = true; - } - } - } } diff --git a/src/main/java/io/vertx/core/http/impl/pool/ConnectionPool.java b/src/main/java/io/vertx/core/http/impl/pool/ConnectionPool.java deleted file mode 100644 index f231f539715..00000000000 --- a/src/main/java/io/vertx/core/http/impl/pool/ConnectionPool.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2011-2013 The original author or authors - * ------------------------------------------------------ - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * and Apache License v2.0 which accompanies this distribution. - * - * The Eclipse Public License is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * The Apache License v2.0 is available at - * http://www.opensource.org/licenses/apache2.0.php - * - * You may elect to redistribute this code under either of these licenses. - */ -package io.vertx.core.http.impl.pool; - -import io.vertx.core.impl.ContextImpl; - -/** - * The logic for the connection pool. - */ -public interface ConnectionPool { - - int maxSize(); - - boolean canBorrow(int connCount); - - C pollConnection(); - - /** - * Determine when a new connection should be created - * - * @param connCount the actual connection count including the one being created - * @return true whether or not a new connection can be created - */ - boolean canCreateConnection(int connCount); - - boolean initConnection(C conn); - - void recycleConnection(C conn); - - void evictConnection(C conn); - - boolean isValid(C conn); - - ContextImpl getContext(C conn); - - void close(); - -} diff --git a/src/main/java/io/vertx/core/http/impl/pool/ConnectionProvider.java b/src/main/java/io/vertx/core/http/impl/pool/ConnectionProvider.java index 4c6d03eb590..9467df968ca 100644 --- a/src/main/java/io/vertx/core/http/impl/pool/ConnectionProvider.java +++ b/src/main/java/io/vertx/core/http/impl/pool/ConnectionProvider.java @@ -15,10 +15,6 @@ */ package io.vertx.core.http.impl.pool; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; import io.vertx.core.impl.ContextImpl; /** @@ -26,17 +22,16 @@ */ public interface ConnectionProvider { - void connect( + long connect( ConnectionListener listener, Object endpointMetric, ContextImpl context, String peerHost, boolean ssl, String host, - int port, - Handler> handler); + int port); - Channel channel(C conn); + boolean isValid(C conn); void close(C conn); diff --git a/src/main/java/io/vertx/core/http/impl/pool/Pool.java b/src/main/java/io/vertx/core/http/impl/pool/Pool.java new file mode 100644 index 00000000000..a91dc3a9dc1 --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/pool/Pool.java @@ -0,0 +1,270 @@ +/* + * Copyright (c) 2011-2014 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.core.http.impl.pool; + +import io.netty.channel.Channel; +import io.vertx.core.Handler; +import io.vertx.core.http.ConnectionPoolTooBusyException; +import io.vertx.core.impl.ContextImpl; +import io.vertx.core.spi.metrics.HttpClientMetrics; + +import java.util.*; + +/** + * The endpoint is a queue of waiters and it delegates to the connection pool, the pooling strategy. + * + * An endpoint is synchronized and should be executed only from the event loop, the underlying pool + * relies and the synchronization performed by the endpoint. + */ +public class Pool { + + private final HttpClientMetrics metrics; + private final String peerHost; + private final boolean ssl; + private final int port; + private final String host; + private final ConnectionProvider connector; + private final Object metric; + private final long maxWeight; + private final int queueMaxSize; + + private final Queue> waiters = new ArrayDeque<>(); + private final Set> all; + private final Deque> available; + private boolean closed; + private long capacity; + private long weight; + private final Handler poolClosed; + private final Handler> connectionAdded; + private final Handler> connectionRemoved; + + public Pool(ConnectionProvider connector, + HttpClientMetrics metrics, + int queueMaxSize, + String peerHost, + String host, + int port, + boolean ssl, + long maxWeight, + Handler poolClosed, + Handler> connectionAdded, + Handler> connectionRemoved) { + this.maxWeight = maxWeight; + this.host = host; + this.port = port; + this.ssl = ssl; + this.peerHost = peerHost; + this.connector = connector; + this.queueMaxSize = queueMaxSize; + this.metrics = metrics; + this.metric = metrics != null ? metrics.createEndpoint(host, port, 10 /* fixme */) : null; + this.poolClosed = poolClosed; + this.all = new HashSet<>(); + this.available = new ArrayDeque<>(); + this.connectionAdded = connectionAdded; + this.connectionRemoved = connectionRemoved; + } + + public synchronized boolean getConnection(Waiter waiter) { + if (closed) { + return false; + } + // Enqueue + if (capacity > 0 || weight < maxWeight || (queueMaxSize < 0 || waiters.size() < queueMaxSize)) { + if (metrics != null) { + waiter.metric = metrics.enqueueRequest(metric); + } + waiters.add(waiter); + checkPending(); + } else { + waiter.handleFailure(null, new ConnectionPoolTooBusyException("Connection pool reached max wait queue size of " + queueMaxSize)); + } + return true; + } + + private void checkPending() { + while (true) { + Waiter waiter = waiters.peek(); + if (waiter == null) { + break; + } + if (metric != null) { + metrics.dequeueRequest(metric, waiter.metric); + } + if (capacity > 0) { + capacity--; + ConnectionHolder conn = available.peek(); + if (++conn.inflight == conn.concurrency) { + available.poll(); + } + waiters.poll(); + ContextImpl ctx = conn.context; + ctx.nettyEventLoop().execute(() -> { + if (connector.isValid(conn.connection)) { + boolean handled = deliverInternal(conn, waiter); + if (!handled) { + synchronized (Pool.this) { + recycleConnection(conn); + checkPending(); + } + } + } else { + synchronized (Pool.this) { + waiters.add(waiter); + closed(conn); + } + } + }); + } else if (weight < maxWeight) { + waiters.poll(); + weight += createConnection(waiter); + } else { + break; + } + } + } + + private long createConnection(Waiter waiter) { + ConnectionHolder holder = new ConnectionHolder<>(); + ConnectionListener listener = new ConnectionListener() { + @Override + public void onConnectSuccess(C conn, long concurrency, Channel channel, ContextImpl context, long oldWeight, long newWeight, long maxConcurrency) { + synchronized (Pool.this) { + weight += newWeight - oldWeight; + holder.context = context; + holder.concurrency = Math.min(concurrency, maxConcurrency); + holder.connection = conn; + holder.channel = channel; + holder.weight = newWeight; + holder.maxConcurrency = maxConcurrency; + initConnection(waiter, holder); + checkPending(); + } + } + @Override + public void onConnectFailure(Throwable err, long weight) { + waiter.handleFailure(waiter.context, err); + synchronized (Pool.this) { + Pool.this.weight -= weight; + all.remove(holder); + checkPending(); + checkClose(); + } + } + @Override + public void onConcurrencyChange(C conn, long concurrency) { + synchronized (Pool.this) { + concurrency = Math.min(concurrency, holder.maxConcurrency); + if (holder.concurrency < concurrency) { + long diff = concurrency - holder.concurrency; + capacity += diff; + if (holder.inflight == holder.concurrency) { + available.add(holder); + } + holder.concurrency = concurrency; + checkPending(); + } else { + throw new UnsupportedOperationException("Not yet implemented"); + } + } + } + @Override + public void onRecycle(C conn) { + Pool.this.recycle(holder); + } + @Override + public void onClose(C conn) { + Pool.this.closed(holder); + } + }; + all.add(holder); + return connector.connect(listener, metric, waiter.context, peerHost, ssl, host, port); + } + + private synchronized void recycle(ConnectionHolder conn) { + recycleConnection(conn); + checkPending(); + } + + private void recycleConnection(ConnectionHolder conn) { + // if (conn.inflight == 0) { + // throw new IllegalStateException("Attempt to recycle a connection more than permitted"); + //} + capacity++; + if (conn.inflight == conn.concurrency) { + available.add(conn); + } + conn.inflight--; + } + + private synchronized void closed(ConnectionHolder conn) { + closeConnection(conn); + checkPending(); + checkClose(); + } + + private void closeConnection(ConnectionHolder conn) { + connectionRemoved.handle(conn); + all.remove(conn); + long remaining = conn.concurrency - conn.inflight; + if (remaining > 0) { + available.remove(conn); + capacity -= remaining; + } + weight -= conn.weight; + // Remove if capacity > 0 + } + + private synchronized void initConnection(Waiter waiter, ConnectionHolder holder) { + connectionAdded.handle(holder); + all.add(holder); + waiter.initConnection(holder.connection); + if (holder.concurrency > 0) { + holder.inflight++; + if (holder.inflight < holder.concurrency) { + capacity += holder.concurrency - holder.inflight; + available.add(holder); + } + boolean consumed = deliverInternal(holder, waiter); + if (!consumed) { + recycleConnection(holder); + } + } else { + waiters.add(waiter); + } + } + + private boolean deliverInternal(ConnectionHolder conn, Waiter waiter) { + try { + return waiter.handleConnection(conn.connection); + } catch (Exception e) { + // Handle this case gracefully + e.printStackTrace(); + return true; + } + } + + private void checkClose() { + if (all.isEmpty()) { + // No waiters and no connections - remove the ConnQueue + if (metrics != null) { + metrics.closeEndpoint(host, port, metric); + } + closed = true; + poolClosed.handle(null); + } + } +} diff --git a/src/main/java/io/vertx/core/http/impl/pool/PoolOptions.java b/src/main/java/io/vertx/core/http/impl/pool/PoolOptions.java new file mode 100644 index 00000000000..945985bca8a --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/pool/PoolOptions.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2011-2014 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.core.http.impl.pool; + +public class PoolOptions { + + private long maxSize; + + public long getMaxSize() { + return maxSize; + } + + public PoolOptions setMaxSize(long maxSize) { + this.maxSize = maxSize; + return this; + } +} diff --git a/src/test/java/io/vertx/test/core/Http2ClientTest.java b/src/test/java/io/vertx/test/core/Http2ClientTest.java index 20da0fcf882..e6921d5de39 100644 --- a/src/test/java/io/vertx/test/core/Http2ClientTest.java +++ b/src/test/java/io/vertx/test/core/Http2ClientTest.java @@ -1746,14 +1746,13 @@ public void testMaxConcurrencySingleConnection() throws Exception { @Test public void testMaxConcurrencyMultipleConnections() throws Exception { - testMaxConcurrency(3, 5); + testMaxConcurrency(2, 1); } private void testMaxConcurrency(int poolSize, int maxConcurrency) throws Exception { int rounds = 1 + poolSize; int maxRequests = poolSize * maxConcurrency; int totalRequests = maxRequests + maxConcurrency; - Set serverConns = new HashSet<>(); server.connectionHandler(conn -> { serverConns.add(conn); @@ -1779,7 +1778,6 @@ private void testMaxConcurrency(int poolSize, int maxConcurrency) throws Excepti setHttp2MaxPoolSize(poolSize). setHttp2MultiplexingLimit(maxConcurrency)); AtomicInteger respCount = new AtomicInteger(); - Set clientConnections = Collections.synchronizedSet(new HashSet<>()); for (int j = 0;j < rounds;j++) { for (int i = 0;i < maxConcurrency;i++) { diff --git a/src/test/java/io/vertx/test/core/net/ConnectionManagerTest.java b/src/test/java/io/vertx/test/core/net/ConnectionManagerTest.java new file mode 100644 index 00000000000..8aebb5fdd26 --- /dev/null +++ b/src/test/java/io/vertx/test/core/net/ConnectionManagerTest.java @@ -0,0 +1,707 @@ +/* + * Copyright (c) 2011-2014 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + */ +package io.vertx.test.core.net; + +import io.netty.channel.Channel; +import io.netty.channel.embedded.EmbeddedChannel; +import io.vertx.core.http.impl.pool.*; +import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; +import io.vertx.core.net.SocketAddress; +import io.vertx.test.core.VertxTestBase; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +public class ConnectionManagerTest extends VertxTestBase { + + private static final SocketAddress TEST_ADDRESS = SocketAddress.inetSocketAddress(8080, "localhost"); + + class FakeConnectionManager { + + private final FakeConnectionProvider connector; + private final int queueMaxSize; + private final int maxPoolSize; + private Pool delegate; + private int size; + private Set active = new HashSet<>(); + private boolean closed = true; + private int seq; + + FakeConnectionManager(int queueMaxSize, int maxPoolSize, FakeConnectionProvider connector) { + this.queueMaxSize = queueMaxSize; + this.maxPoolSize = maxPoolSize; + this.connector = connector; + } + + synchronized int sequence() { + return seq; + } + + synchronized boolean closed() { + return closed; + } + + synchronized boolean contains(FakeConnection conn) { + return active.contains(conn); + } + + synchronized int size() { + return size; + } + + void getConnection(Waiter waiter) { + synchronized (this) { + if (closed) { + seq++; + closed = false; + delegate = new Pool<>( + connector, + null, + queueMaxSize, + "localhost", + "localhost", + 8080, + false, + maxPoolSize, + v -> { + synchronized (FakeConnectionManager.this) { + closed = true; + } + }, conn -> { + synchronized (FakeConnectionManager.this) { + active.add(conn.connection()); + size++; + } + }, conn -> { + synchronized (FakeConnectionManager.this) { + size--; + active.remove(conn.connection()); + } + } + ); + } + } + delegate.getConnection(waiter); + } + } + + @Test + public void testConnectPoolEmpty() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 4, connector); + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + assertWaitUntil(waiter::isComplete); + waiter.assertInitialized(conn); + waiter.assertSuccess(conn); + } + + @Test + public void testConnectPoolEmptyWaiterCancelledAfterConnectRequest() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 3, connector); + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + waiter.cancel(); + conn.connect(); + waitUntil(() -> mgr.size() == 1); + waiter.assertInitialized(conn); + assertTrue(waiter.isComplete()); + assertFalse(waiter.isSuccess()); + assertFalse(waiter.isFailure()); + assertTrue(mgr.contains(conn)); + } + + @Test + public void testConnectionFailure() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 3, connector); + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + Exception expected = new Exception(); + conn.fail(expected); + assertWaitUntil(waiter::isComplete); + waiter.assertFailure(expected); + assertTrue(waiter.isFailure()); + } + + @Test + public void testRecycleConnection() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 1, connector); + FakeWaiter waiter1 = new FakeWaiter(); + mgr.getConnection(waiter1); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + assertWaitUntil(waiter1::isComplete); + FakeWaiter waiter2 = new FakeWaiter(); + mgr.getConnection(waiter2); + connector.assertRequests(TEST_ADDRESS, 0); + waiter1.recycle(); + assertWaitUntil(waiter2::isComplete); + waiter2.assertSuccess(conn); + } + + /* + @Test + public void testRecycleClosedConnection() { + FakeConnnectionPool pool = new FakeConnnectionPool(1); + FakeConnectionProvider provider = new FakeConnectionProvider(); + ConnectionManager mgr = new ConnectionManager<>(null, provider, pool, 3); + FakeWaiter waiter1 = new FakeWaiter(); + mgr.getConnection("localhost", false, 8080, "localhost", waiter1); + FakeConnection conn = provider.assertRequest(TEST_ADDRESS); + conn.connect(); + assertWaitUntil(waiter1::isComplete); + FakeWaiter waiter2 = new FakeWaiter(); + mgr.getConnection("localhost", false, 8080, "localhost", waiter2); + provider.assertRequests(TEST_ADDRESS, 0); + conn.close(); + conn.recycle(); + // assertWaitUntil(waiter2::isComplete); + // waiter2.assertSuccess(conn); + } + */ + @Test + public void testRecycleInvalidConnection() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 1, connector); + FakeWaiter waiter1 = new FakeWaiter(); + mgr.getConnection(waiter1); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + waitUntil(waiter1::isComplete); + FakeWaiter waiter2 = new FakeWaiter(); + mgr.getConnection(waiter2); + conn.invalidate(); + waiter1.recycle(); + waitUntil(() -> connector.requests(TEST_ADDRESS) == 1); + assertFalse(mgr.closed()); + FakeConnection conn2 = connector.assertRequest(TEST_ADDRESS); + conn2.connect(); + waitUntil(waiter2::isSuccess); + } + + @Test + public void testWaiterThrowsException() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 1, connector); + Exception failure = new Exception(); + FakeWaiter waiter = new FakeWaiter() { + @Override + public synchronized boolean handleConnection(FakeConnection conn) throws Exception { + throw failure; + } + }; + mgr.getConnection(waiter); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + assertEquals(0, mgr.size()); + } + + @Test + public void testEndpointLifecycle() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 1, connector); + FakeWaiter waiter1 = new FakeWaiter(); + mgr.getConnection(waiter1); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + waitUntil(waiter1::isSuccess); + conn.close(); + waitUntil(mgr::closed); + FakeWaiter waiter2 = new FakeWaiter(); + mgr.getConnection(waiter2); + assertEquals(2, mgr.sequence()); + } + + @Test + public void testDontCloseEndpointWithInflightRequest() { + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(3, 2, connector); + FakeWaiter waiter1 = new FakeWaiter(); + mgr.getConnection(waiter1); + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.connect(); + waitUntil(waiter1::isComplete); + FakeWaiter waiter2 = new FakeWaiter(); + mgr.getConnection(waiter2); + conn.close(); + waitUntil(() -> !mgr.contains(conn)); + assertFalse(mgr.closed()); + } + + @Test + public void testInitialConcurrency() { + int n = 10; + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(-1, 1, connector); + List waiters = new ArrayList<>(); + for (int i = 0; i < n; i++) { + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + waiters.add(waiter); + } + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.concurrency(n).connect(); + waiters.forEach(waiter -> { + waitUntil(waiter::isSuccess); + }); + waiters.forEach(FakeWaiter::recycle); + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + waitUntil(waiter::isComplete); + } + + @Test + public void testInitialNoConcurrency() { + int n = 10; + FakeConnectionProvider connector = new FakeConnectionProvider(); + FakeConnectionManager mgr = new FakeConnectionManager(-1, 1, connector); + List waiters = new ArrayList<>(); + for (int i = 0; i < n; i++) { + FakeWaiter waiter = new FakeWaiter(); + mgr.getConnection(waiter); + waiters.add(waiter); + } + FakeConnection conn = connector.assertRequest(TEST_ADDRESS); + conn.concurrency(0).connect().awaitConnected(); + conn.concurrency(n - 1); + waitUntil(() -> waiters.stream().filter(FakeWaiter::isSuccess).count() == n - 1); + waiters.stream().filter(FakeWaiter::isSuccess).findFirst().get().recycle(); + waiters.forEach(waiter -> { + waitUntil(waiter::isSuccess); + }); + } + + @Test + public void testStress() { + int numActors = 16; + int numConnections = 1000; + + FakeConnectionProvider connector = new FakeConnectionProvider() { + @Override + public long connect(ConnectionListener listener, Object endpointMetric, ContextImpl context, String peerHost, boolean ssl, String host, int port) { + int i = ThreadLocalRandom.current().nextInt(100); + FakeConnection conn = new FakeConnection(context, listener); + if (i < 10) { + conn.fail(new Exception("Could not connect")); + } else { + conn.connect(); + } + return 1; + } + }; + FakeConnectionManager mgr = new FakeConnectionManager(-1, 16, connector); + + Thread[] actors = new Thread[numActors]; + for (int i = 0; i < numActors; i++) { + actors[i] = new Thread(() -> { + CountDownLatch latch = new CountDownLatch(numConnections); + for (int i1 = 0; i1 < numConnections; i1++) { + mgr.getConnection(new Waiter((ContextImpl) vertx.getOrCreateContext()) { + @Override + public void handleFailure(ContextInternal ctx, Throwable failure) { + latch.countDown(); + } + + @Override + public void initConnection(FakeConnection conn) { + } + + @Override + public boolean handleConnection(FakeConnection conn) throws Exception { + int action = ThreadLocalRandom.current().nextInt(100); + if (action < -1) { + latch.countDown(); + return false; + } /* else if (i < 30) { + latch.countDown(); + throw new Exception(); + } */ else { + vertx.setTimer(10, id -> { + latch.countDown(); + if (action < 15) { + conn.close(); + } else { + conn.recycle(); + } + }); + return true; + } + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("DONE"); + }); + actors[i].start(); + } + + for (int i = 0; i < actors.length; i++) { + try { + actors[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + class FakeWaiter extends Waiter { + + private FakeConnection init; + private boolean cancelled; + private boolean completed; + private Object result; + + FakeWaiter() { + super((ContextImpl) vertx.getOrCreateContext()); + } + + synchronized boolean cancel() { + if (completed) { + return false; + } else { + cancelled = true; + return true; + } + } + + synchronized void assertInitialized(FakeConnection conn) { + assertSame(conn, init); + } + + synchronized void assertSuccess(FakeConnection conn) { + assertSame(conn, result); + } + + synchronized void assertFailure(Throwable failure) { + assertSame(failure, result); + } + + synchronized boolean isComplete() { + return completed; + } + + synchronized boolean isSuccess() { + return completed && result instanceof FakeConnection; + } + + synchronized boolean isFailure() { + return completed && result instanceof Throwable; + } + + @Override + public synchronized void handleFailure(ContextInternal ctx, Throwable failure) { + assertFalse(completed); + completed = true; + result = failure; + } + + @Override + public synchronized void initConnection(FakeConnection conn) { + assertNull(init); + init = conn; + } + + @Override + public synchronized boolean handleConnection(FakeConnection conn) throws Exception { + assertFalse(completed); + completed = true; + if (cancelled) { + return false; + } else { + synchronized (conn) { + conn.inflight++; + } + result = conn; + return true; + } + } + + long recycle() { + FakeConnection conn = (FakeConnection) result; + return conn.recycle(); + } + } + + /* + class FakeConnnectionPool implements ConnectionPool, Function> { + + private final SocketAddress address = SocketAddress.inetSocketAddress(8080, "localhost"); + private final int maxSize; + private final ArrayDeque available = new ArrayDeque<>(); + private final Set all = new HashSet<>(); + private boolean closed = true; + private int sequence; + + FakeConnnectionPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public Deque available() { + return available; + } + + @Override + public Set all() { + return all; + } + + synchronized int size() { + return available.size(); + } + + synchronized boolean contains(FakeConnection conn) { + Deque> a = (Deque>)(Deque) available; + for (ConnectionHolder b : a) { + if (b.connection() == conn) { + return true; + } + } + return false; + } + + synchronized int sequence() { + return sequence; + } + + @Override + public synchronized FakeConnnectionPool apply(SocketAddress socketAddress) { + if (!socketAddress.equals(address)) { + throw new AssertionError(); + } + if (!closed) { + throw new AssertionError(); + } + closed = false; + sequence++; + return this; + } + + @Override + public synchronized int maxSize() { + if (closed) { + throw new AssertionError(); + } + return maxSize; + } + + @Override + public synchronized boolean canBorrow(int connCount) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized FakeConnection pollConnection() { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized boolean canCreateConnection(int connCount) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized boolean initConnection(FakeConnection conn) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void recycleConnection(FakeConnection conn) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void evictConnection(FakeConnection conn) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized boolean isValid(FakeConnection conn) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized ContextImpl getContext(FakeConnection conn) { + throw new UnsupportedOperationException(); + } + + public synchronized void close() { + if (closed) { + throw new AssertionError(); + } + closed = true; + available.clear(); + all.clear(); + } + + synchronized boolean isClosed() { + return closed; + } + } + */ + class FakeConnection { + + private static final int DISCONNECTED = 0; + private static final int CONNECTING = 1; + private static final int CONNECTED = 2; + private static final int CLOSED = 3; + + private final ContextImpl context; + private final ConnectionListener listener; + private final Channel channel = new EmbeddedChannel(); + + private long inflight; + private long concurrency = 1; + private int status = DISCONNECTED; + private boolean valid = true; + + FakeConnection(ContextImpl context, ConnectionListener listener) { + this.context = context; + this.listener = listener; + } + + synchronized FakeConnection invalidate() { + valid = false; + return this; + } + + synchronized void close() { + if (status != CONNECTED) { + throw new IllegalStateException(); + } + status = CLOSED; + listener.onClose(this); + } + + synchronized long recycle() { + long i = inflight--; + listener.onRecycle(this); + return i; + } + + synchronized FakeConnection concurrency(long value) { + if (value < 0) { + throw new IllegalArgumentException("Invalid concurrency"); + } + if (status == CONNECTED) { + if (concurrency != value) { + concurrency = value; + listener.onConcurrencyChange(this, value); + } + } else { + concurrency = value; + } + return this; + } + + FakeConnection awaitConnected() { + waitUntil(() -> { + synchronized (FakeConnection.this) { + return status == CONNECTED; + } + }); + return this; + } + + synchronized FakeConnection connect() { + if (status != DISCONNECTED) { + throw new IllegalStateException(); + } + status = CONNECTING; + context.nettyEventLoop().execute(() -> { + synchronized (FakeConnection.this) { + listener.onConnectSuccess(this, concurrency, channel, context, 1, 1, Long.MAX_VALUE); + status = CONNECTED; + } + }); + return this; + } + + void fail(Throwable err) { + context.nettyEventLoop().execute(() -> listener.onConnectFailure(err, 1)); + } + + synchronized boolean isValid() { + return valid; + } + } + + class FakeConnectionProvider implements ConnectionProvider { + + private final Map> requestMap = new HashMap<>(); + + void assertRequests(SocketAddress address, int expectedSize) { + ArrayDeque requests = requestMap.get(address); + if (expectedSize == 0) { + assertTrue(requests == null || requests.size() == 0); + } else { + assertNotNull(requests); + assertEquals(expectedSize, requests.size()); + } + } + + @Override + public boolean isValid(FakeConnection conn) { + return conn.isValid(); + } + + int requests(SocketAddress address) { + ArrayDeque requests = requestMap.get(address); + return requests == null ? 0 : requests.size(); + } + + FakeConnection assertRequest(SocketAddress address) { + ArrayDeque requests = requestMap.get(address); + assertNotNull(requests); + assertTrue(requests.size() > 0); + FakeConnection request = requests.poll(); + assertNotNull(request); + return request; + } + + @Override + public long connect(ConnectionListener listener, + Object endpointMetric, + ContextImpl context, + String peerHost, + boolean ssl, + String host, + int port) { + ArrayDeque list = requestMap.computeIfAbsent(SocketAddress.inetSocketAddress(port, host), address -> new ArrayDeque<>()); + list.add(new FakeConnection(context, listener)); + return 1; + } + + @Override + public void close(FakeConnection conn) { + + } + } +}