Skip to content

Commit

Permalink
Refactor to use 2 connection queue map
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Apr 4, 2016
1 parent 50ef49e commit 05e4036
Showing 1 changed file with 38 additions and 13 deletions.
51 changes: 38 additions & 13 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -80,7 +80,8 @@ public class ConnectionManager {
private final boolean keepAlive;
private final boolean pipelining;
private final int maxWaitQueueSize;
private final Map<TargetAddress, ConnQueue> connQueues = new ConcurrentHashMap<>();
private final Map<TargetAddress, ConnQueue<ClientConnection>> connQueues = new ConcurrentHashMap<>();
private final Map<TargetAddress, ConnQueue<Http2ClientConnection>> connQueues2 = new ConcurrentHashMap<>();

ConnectionManager(HttpClientImpl client) {
this.client = client;
Expand All @@ -97,10 +98,21 @@ public void getConnection(int port, String host, Waiter waiter) {
waiter.handleFailure(new IllegalStateException("Cannot have pipelining with no keep alive"));
} else {
TargetAddress address = new TargetAddress(host, port);
ConnQueue connQueue = connQueues.get(address);
Map<TargetAddress, ConnQueue<HttpClientConnection>> abc;
ConnQueue connQueue;
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
connQueue = connQueues2.get(address);
} else {
connQueue = connQueues.get(address);
}
if (connQueue == null) {
connQueue = new ConnQueue(address);
ConnQueue prev = connQueues.putIfAbsent(address, connQueue);
connQueue = new ConnQueue(options.getProtocolVersion(), address);
ConnQueue prev;
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
prev = connQueues2.putIfAbsent(address, connQueue);
} else {
prev = connQueues.putIfAbsent(address, connQueue);
}
if (prev != null) {
connQueue = prev;
}
Expand All @@ -114,6 +126,10 @@ public void close() {
queue.closeAllConnections();
}
connQueues.clear();
for (ConnQueue queue: connQueues2.values()) {
queue.closeAllConnections();
}
connQueues2.clear();
for (ClientConnection conn : connectionMap.values()) {
conn.close();
}
Expand Down Expand Up @@ -153,17 +169,16 @@ public int hashCode() {
}
}

public class ConnQueue {
public class ConnQueue<C extends HttpClientConnection> {

private final TargetAddress address;
private final Queue<Waiter> waiters = new ArrayDeque<>();
private int connCount;
private Pool pool;
private final Pool pool;

ConnQueue(TargetAddress address) {
ConnQueue(HttpVersion version, TargetAddress address) {
this.address = address;

if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
if (version == HttpVersion.HTTP_2) {
pool = new Http2Pool(this, client, connectionMap2);
} else {
pool = new Http1xPool(this);
Expand Down Expand Up @@ -217,7 +232,11 @@ public synchronized void connectionClosed() {
createNewConnection(waiter);
} else if (connCount == 0) {
// No waiters and no connections - remove the ConnQueue
connQueues.remove(address);
if (options.getProtocolVersion() == HttpVersion.HTTP_2) {
connQueues2.remove(address);
} else {
connQueues.remove(address);
}
}
}

Expand Down Expand Up @@ -346,11 +365,17 @@ private void handshakeFailure(ContextImpl context, Channel ch, Throwable cause,
private void fallbackToHttp1x(Channel ch, ContextImpl context, HttpVersion fallbackVersion, int port, String host, Waiter waiter) {
// Fallback
// change the pool to Http1xPool
synchronized (ConnQueue.this) {
pool = new Http1xPool(ConnQueue.this);
ConnQueue<ClientConnection> http1Queue = connQueues.get(address);
if (http1Queue == null) {
http1Queue = new ConnQueue<>(HttpVersion.HTTP_1_1, address);
ConnQueue<ClientConnection> prev = connQueues.putIfAbsent(address, http1Queue);
if (prev != null) {
http1Queue = prev;
}
}
applyHttp1xConnectionOptions(ch.pipeline(), context);
http1xConnected(fallbackVersion, context, port, host, ch, waiter);
http1Queue.http1xConnected(fallbackVersion, context, port, host, ch, waiter);
// Should remove this queue as it may be empty ????
}

private void http1xConnected(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
Expand Down

0 comments on commit 05e4036

Please sign in to comment.