Skip to content

Commit

Permalink
Merge the QueueManager with the ConnectionManager to simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 10, 2017
1 parent c7881d0 commit ddac413
Showing 1 changed file with 26 additions and 40 deletions.
66 changes: 26 additions & 40 deletions src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java
Expand Up @@ -34,19 +34,19 @@
import java.util.function.Function;

/**
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class ConnectionManager<C> {

public static final Logger log = LoggerFactory.getLogger(ConnectionManager.class);

private final QueueManager queueManager = new QueueManager();
private final VertxInternal vertx;
private final int maxWaitQueueSize;
private final HttpClientMetrics metrics; // Shall be removed later combining the PoolMetrics with HttpClientMetrics
private final ConnectionProvider<C> connector;
private final Function<SocketAddress, ConnectionPool<C>> poolFactory;
private final Map<Channel, C> connectionMap = new ConcurrentHashMap<>();
private final Map<ConnectionKey, ConnQueue> queueMap = new ConcurrentHashMap<>();

public ConnectionManager(VertxInternal vertx,
HttpClientMetrics metrics,
Expand Down Expand Up @@ -96,51 +96,34 @@ public int hashCode() {
}
}

/**
* The queue manager manages the connection queues for a given usage, the idea is to split
* queues for HTTP requests and websockets. A websocket uses a pool of connections
* usually ugpraded from HTTP/1.1, HTTP requests may ask for HTTP/2 connections but obtain
* only HTTP/1.1 connections.
*/
private class QueueManager {

private final Map<Channel, C> connectionMap = new ConcurrentHashMap<>();
private final Map<ConnectionKey, ConnQueue> queueMap = new ConcurrentHashMap<>();

ConnQueue getConnQueue(String peerHost, boolean ssl, int port, String host) {
ConnectionKey key = new ConnectionKey(ssl, port, peerHost);
return queueMap.computeIfAbsent(key, targetAddress -> {
ConnectionPool<C> pool = poolFactory.apply(SocketAddress.inetSocketAddress(port, host));
return new ConnQueue( this, connector, peerHost, host, port, ssl, key, pool);
});
}

public void close() {
for (ConnQueue queue: queueMap.values()) {
queue.closeAllConnections();
}
queueMap.clear();
for (C conn : connectionMap.values()) {
connector.close(conn);
}
}
private ConnQueue getConnQueue(String peerHost, boolean ssl, int port, String host) {
ConnectionKey key = new ConnectionKey(ssl, port, peerHost);
return queueMap.computeIfAbsent(key, targetAddress -> {
ConnectionPool<C> pool = poolFactory.apply(SocketAddress.inetSocketAddress(port, host));
return new ConnQueue( connector, peerHost, host, port, ssl, key, pool);
});
}

public void getConnection(String peerHost, boolean ssl, int port, String host, Waiter<C> waiter) {
ConnQueue connQueue = queueManager.getConnQueue(peerHost, ssl, port, host);
ConnQueue connQueue = getConnQueue(peerHost, ssl, port, host);
connQueue.getConnection(waiter);
}

public void close() {
queueManager.close();
for (ConnQueue queue: queueMap.values()) {
queue.closeAllConnections();
}
queueMap.clear();
for (C conn : connectionMap.values()) {
connector.close(conn);
}
}

/**
* The connection queue delegates to the connection pool, the pooling strategy.
*/
class ConnQueue implements ConnectionListener<C> {

private final QueueManager mgr;
private final String peerHost;
private final boolean ssl;
private final int port;
Expand All @@ -152,16 +135,19 @@ class ConnQueue implements ConnectionListener<C> {
private final ConnectionProvider<C> connector;
final Object metric;

ConnQueue(QueueManager mgr,
ConnectionProvider<C> connector,
String peerHost, String host, int port, boolean ssl, ConnectionKey key, ConnectionPool<C> pool) {
ConnQueue(ConnectionProvider<C> connector,
String peerHost,
String host,
int port,
boolean ssl,
ConnectionKey key,
ConnectionPool<C> pool) {
this.key = key;
this.host = host;
this.port = port;
this.ssl = ssl;
this.peerHost = peerHost;
this.connector = connector;
this.mgr = mgr;
this.pool = pool;
this.metric = metrics != null ? metrics.createEndpoint(host, port, pool.maxSize()) : null;
}
Expand Down Expand Up @@ -241,13 +227,13 @@ public synchronized void onRecycle(C conn) {

@Override
public synchronized void onClose(C conn, Channel channel) {
mgr.connectionMap.remove(channel);
connectionMap.remove(channel);
pool.evictConnection(conn);
closeConnection();
}

private synchronized void initConnection(Waiter<C> waiter, C conn) {
mgr.connectionMap.put(connector.channel(conn), conn);
connectionMap.put(connector.channel(conn), conn);
pool.initConnection(conn);
pool.getContext(conn).executeFromIO(() -> {
waiter.initConnection(conn);
Expand Down Expand Up @@ -287,7 +273,7 @@ synchronized void closeConnection() {
checkPending();
if (connCount == 0) {
// No waiters and no connections - remove the ConnQueue
mgr.queueMap.remove(key);
queueMap.remove(key);
if (metrics != null) {
metrics.closeEndpoint(host, port, metric);
}
Expand Down

0 comments on commit ddac413

Please sign in to comment.