Skip to content

Commit

Permalink
Remove code and state from Pool and make it an interface
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 25, 2016
1 parent 7cd4ed8 commit 40f4f43
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 82 deletions.
115 changes: 51 additions & 64 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -194,23 +194,34 @@ public class ConnQueue {
private final QueueManager mgr; private final QueueManager mgr;
private final TargetAddress address; private final TargetAddress address;
private final Queue<Waiter> waiters = new ArrayDeque<>(); private final Queue<Waiter> waiters = new ArrayDeque<>();
private Pool<? extends HttpClientConnection> pool; private Pool<HttpClientConnection> pool;
private int connCount; private int connCount;
private final int maxSize;


ConnQueue(HttpVersion version, QueueManager mgr, TargetAddress address) { ConnQueue(HttpVersion version, QueueManager mgr, TargetAddress address) {
this.address = address; this.address = address;
this.mgr = mgr; this.mgr = mgr;
if (version == HttpVersion.HTTP_2) { if (version == HttpVersion.HTTP_2) {
pool = new Http2Pool(this, client, mgr.connectionMap, http2MaxSockets, http2MaxConcurrency, logEnabled); maxSize = http2MaxSockets;
pool = (Pool)new Http2Pool(this, client, mgr.connectionMap, http2MaxSockets, http2MaxConcurrency, logEnabled);
} else { } else {
pool = new Http1xPool(client, options, this, mgr.connectionMap, version); maxSize = client.getOptions().getMaxPoolSize();
pool = (Pool)new Http1xPool(client, options, this, mgr.connectionMap, version);
} }
} }


public synchronized void getConnection(Waiter waiter) { public synchronized void getConnection(Waiter waiter) {
boolean served = pool.getConnection(waiter); HttpClientConnection conn = pool.pollConnection();
if (!served) { if (conn != null && conn.isValid()) {
if (connCount == pool.maxSockets) { ContextImpl context = waiter.context;
if (context == null) {
context = conn.getContext();
} else if (context != conn.getContext()) {
ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
}
context.runOnContext(v -> deliverStream(conn, waiter));
} else {
if (connCount == maxSize) {
// Wait in queue // Wait in queue
if (maxWaitQueueSize < 0 || waiters.size() < maxWaitQueueSize) { if (maxWaitQueueSize < 0 || waiters.size() < maxWaitQueueSize) {
waiters.add(waiter); waiters.add(waiter);
Expand All @@ -224,6 +235,31 @@ public synchronized void getConnection(Waiter waiter) {
} }
} }


/**
* Handle the connection if the waiter is not cancelled, otherwise recycle the connection.
*
* @param conn the connection
*/
void deliverStream(HttpClientConnection conn, Waiter waiter) {
if (!conn.isValid()) {
// The connection has been closed - closed connections can be in the pool
// Get another connection - Note that we DO NOT call connectionClosed() on the pool at this point
// that is done asynchronously in the connection closeHandler()
getConnection(waiter);
} else if (waiter.isCancelled()) {
pool.recycle(conn);
} else {
HttpClientStream stream;
try {
stream = pool.createStream(conn);
} catch (Exception e) {
getConnection(waiter);
return;
}
waiter.handleStream(stream);
}
}

void closeAllConnections() { void closeAllConnections() {
pool.closeAllConnections(); pool.closeAllConnections();
} }
Expand Down Expand Up @@ -419,22 +455,22 @@ private void handshakeFailure(ContextImpl context, Channel ch, Throwable cause,
private void fallbackToHttp1x(Channel ch, ContextImpl context, HttpVersion fallbackVersion, int port, String host, Waiter waiter) { private void fallbackToHttp1x(Channel ch, ContextImpl context, HttpVersion fallbackVersion, int port, String host, Waiter waiter) {
// change the pool to Http1xPool // change the pool to Http1xPool
synchronized (this) { synchronized (this) {
pool = new Http1xPool(client, options, this, mgr.connectionMap, fallbackVersion); pool = (Pool)new Http1xPool(client, options, this, mgr.connectionMap, fallbackVersion);
} }
applyHttp1xConnectionOptions(ch.pipeline(), context); applyHttp1xConnectionOptions(ch.pipeline(), context);
http1xConnected(fallbackVersion, context, port, host, ch, waiter); http1xConnected(fallbackVersion, context, port, host, ch, waiter);
} }


private void http1xConnected(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) { private void http1xConnected(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
context.executeFromIO(() -> context.executeFromIO(() ->
((Http1xPool)pool).createConn(version, context, port, host, ch, waiter) ((Http1xPool)(Pool)pool).createConn(version, context, port, host, ch, waiter)
); );
} }


private void http2Connected(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) { private void http2Connected(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) {
context.executeFromIO(() -> { context.executeFromIO(() -> {
try { try {
((Http2Pool)pool).createConn(context, ch, waiter, upgrade); ((Http2Pool)(Pool)pool).createConn(context, ch, waiter, upgrade);
} catch (Http2Exception e) { } catch (Http2Exception e) {
connectionFailed(context, ch, waiter::handleFailure, e); connectionFailed(context, ch, waiter::handleFailure, e);
} }
Expand Down Expand Up @@ -500,66 +536,17 @@ void applyHttp1xConnectionOptions(ChannelPipeline pipeline, ContextImpl context)
} }
} }


static abstract class Pool<C extends HttpClientConnection> { interface Pool<C extends HttpClientConnection> {


// Pools must locks on the queue object to keep a single lock HttpVersion version();
final ConnQueue queue;
final int maxSockets;


Pool(ConnQueue queue, int maxSockets) { C pollConnection();
this.queue = queue;
this.maxSockets = maxSockets;
}


abstract HttpVersion version(); void closeAllConnections();


abstract C pollConnection(); void recycle(C conn);


boolean getConnection(Waiter waiter) { HttpClientStream createStream(C conn) throws Exception;
C conn = pollConnection();
if (conn != null && conn.isValid()) {
ContextImpl context = waiter.context;
if (context == null) {
context = conn.getContext();
} else if (context != conn.getContext()) {
ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
}
context.runOnContext(v -> deliverStream(conn, waiter));
return true;
} else {
return false;
}
}

abstract void closeAllConnections();

abstract void recycle(C conn);

abstract HttpClientStream createStream(C conn) throws Exception;


/**
* Handle the connection if the waiter is not cancelled, otherwise recycle the connection.
*
* @param conn the connection
*/
void deliverStream(C conn, Waiter waiter) {
if (!conn.isValid()) {
// The connection has been closed - closed connections can be in the pool
// Get another connection - Note that we DO NOT call connectionClosed() on the pool at this point
// that is done asynchronously in the connection closeHandler()
queue.getConnection(waiter);
} else if (waiter.isCancelled()) {
recycle(conn);
} else {
HttpClientStream stream;
try {
stream = createStream(conn);
} catch (Exception e) {
queue.getConnection(waiter);
return;
}
waiter.handleStream(stream);
}
}
} }
} }
20 changes: 11 additions & 9 deletions src/main/java/io/vertx/core/http/impl/Http1xPool.java
Expand Up @@ -31,8 +31,10 @@
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
public class Http1xPool extends ConnectionManager.Pool<ClientConnection> { public class Http1xPool implements ConnectionManager.Pool<ClientConnection> {


// Pools must locks on the queue object to keep a single lock
private final ConnectionManager.ConnQueue queue;
private final HttpClientImpl client; private final HttpClientImpl client;
private final Map<Channel, HttpClientConnection> connectionMap; private final Map<Channel, HttpClientConnection> connectionMap;
private final boolean pipelining; private final boolean pipelining;
Expand All @@ -43,7 +45,7 @@ public class Http1xPool extends ConnectionManager.Pool<ClientConnection> {
private final Queue<ClientConnection> availableConnections = new ArrayDeque<>(); private final Queue<ClientConnection> availableConnections = new ArrayDeque<>();


public Http1xPool(HttpClientImpl client, HttpClientOptions options, ConnectionManager.ConnQueue queue, Map<Channel, HttpClientConnection> connectionMap, HttpVersion version) { public Http1xPool(HttpClientImpl client, HttpClientOptions options, ConnectionManager.ConnQueue queue, Map<Channel, HttpClientConnection> connectionMap, HttpVersion version) {
super(queue, client.getOptions().getMaxPoolSize()); this.queue = queue;
this.version = version; this.version = version;
this.client = client; this.client = client;
this.pipelining = options.isPipelining(); this.pipelining = options.isPipelining();
Expand All @@ -53,23 +55,23 @@ public Http1xPool(HttpClientImpl client, HttpClientOptions options, ConnectionMa
} }


@Override @Override
HttpVersion version() { public HttpVersion version() {
// Correct this // Correct this
return version; return version;
} }


@Override @Override
ClientConnection pollConnection() { public ClientConnection pollConnection() {
return availableConnections.poll(); return availableConnections.poll();
} }


@Override @Override
HttpClientStream createStream(ClientConnection conn) { public HttpClientStream createStream(ClientConnection conn) {
return conn; return conn;
} }


// Called when the request has ended // Called when the request has ended
void recycle(ClientConnection conn) { public void recycle(ClientConnection conn) {
synchronized (queue) { synchronized (queue) {
if (pipelining) { if (pipelining) {
doRecycle(conn); doRecycle(conn);
Expand Down Expand Up @@ -98,7 +100,7 @@ private void doRecycle(ClientConnection conn) {
if (context == null) { if (context == null) {
context = conn.getContext(); context = conn.getContext();
} }
context.runOnContext(v -> deliverStream(conn, waiter)); context.runOnContext(v -> queue.deliverStream(conn, waiter));
} else if (conn.getOutstandingRequestCount() == 0) { } else if (conn.getOutstandingRequestCount() == 0) {
// Return to set of available from here to not return it several times // Return to set of available from here to not return it several times
availableConnections.add(conn); availableConnections.add(conn);
Expand All @@ -116,7 +118,7 @@ void createConn(HttpVersion version, ContextImpl context, int port, String host,
} }
connectionMap.put(ch, conn); connectionMap.put(ch, conn);
waiter.handleConnection(conn); waiter.handleConnection(conn);
deliverStream(conn, waiter); queue.deliverStream(conn, waiter);
} }


// Called if the connection is actually closed, OR the connection attempt failed - in the latter case // Called if the connection is actually closed, OR the connection attempt failed - in the latter case
Expand All @@ -129,7 +131,7 @@ public synchronized void connectionClosed(ClientConnection conn) {
} }
} }


void closeAllConnections() { public void closeAllConnections() {
Set<ClientConnection> copy; Set<ClientConnection> copy;
synchronized (this) { synchronized (this) {
copy = new HashSet<>(allConnections); copy = new HashSet<>(allConnections);
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -34,8 +34,10 @@
/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
class Http2Pool extends ConnectionManager.Pool<Http2ClientConnection> { class Http2Pool implements ConnectionManager.Pool<Http2ClientConnection> {


// Pools must locks on the queue object to keep a single lock
private final ConnectionManager.ConnQueue queue;
private Queue<Http2ClientConnection> availableConnections = new ArrayDeque<>(); private Queue<Http2ClientConnection> availableConnections = new ArrayDeque<>();
private final Set<Http2ClientConnection> allConnections = new HashSet<>(); private final Set<Http2ClientConnection> allConnections = new HashSet<>();
private final Map<Channel, ? super Http2ClientConnection> connectionMap; private final Map<Channel, ? super Http2ClientConnection> connectionMap;
Expand All @@ -46,20 +48,20 @@ class Http2Pool extends ConnectionManager.Pool<Http2ClientConnection> {
public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client, public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client,
Map<Channel, ? super Http2ClientConnection> connectionMap, int maxSockets, Map<Channel, ? super Http2ClientConnection> connectionMap, int maxSockets,
int maxConcurrency, boolean logEnabled) { int maxConcurrency, boolean logEnabled) {
super(queue, maxSockets); this.queue = queue;
this.client = client; this.client = client;
this.connectionMap = connectionMap; this.connectionMap = connectionMap;
this.maxConcurrency = maxConcurrency; this.maxConcurrency = maxConcurrency;
this.logEnabled = logEnabled; this.logEnabled = logEnabled;
} }


@Override @Override
HttpVersion version() { public HttpVersion version() {
return HttpVersion.HTTP_2; return HttpVersion.HTTP_2;
} }


@Override @Override
Http2ClientConnection pollConnection() { public Http2ClientConnection pollConnection() {
Http2ClientConnection conn = availableConnections.peek(); Http2ClientConnection conn = availableConnections.peek();
if (conn != null) { if (conn != null) {
conn.streamCount++; conn.streamCount++;
Expand Down Expand Up @@ -93,7 +95,7 @@ void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade)
allConnections.add(conn); allConnections.add(conn);
conn.streamCount++; conn.streamCount++;
waiter.handleConnection(conn); // Should make same tests than in deliverRequest waiter.handleConnection(conn); // Should make same tests than in deliverRequest
deliverStream(conn, waiter); queue.deliverStream(conn, waiter);
checkPending(conn); checkPending(conn);
if (canReserveStream(conn)) { if (canReserveStream(conn)) {
availableConnections.add(conn); availableConnections.add(conn);
Expand All @@ -111,7 +113,7 @@ void checkPending(Http2ClientConnection conn) {
Waiter waiter; Waiter waiter;
while (canReserveStream(conn) && (waiter = queue.getNextWaiter()) != null) { while (canReserveStream(conn) && (waiter = queue.getNextWaiter()) != null) {
conn.streamCount++; conn.streamCount++;
deliverStream(conn, waiter); queue.deliverStream(conn, waiter);
} }
} }
} }
Expand All @@ -125,7 +127,7 @@ void discard(Http2ClientConnection conn) {
} }


@Override @Override
void recycle(Http2ClientConnection conn) { public void recycle(Http2ClientConnection conn) {
synchronized (queue) { synchronized (queue) {
conn.streamCount--; conn.streamCount--;
checkPending(conn); checkPending(conn);
Expand All @@ -136,12 +138,12 @@ void recycle(Http2ClientConnection conn) {
} }


@Override @Override
HttpClientStream createStream(Http2ClientConnection conn) throws Exception { public HttpClientStream createStream(Http2ClientConnection conn) throws Exception {
return conn.createStream(); return conn.createStream();
} }


@Override @Override
void closeAllConnections() { public void closeAllConnections() {
List<Http2ClientConnection> toClose; List<Http2ClientConnection> toClose;
synchronized (queue) { synchronized (queue) {
toClose = new ArrayList<>(allConnections); toClose = new ArrayList<>(allConnections);
Expand Down

0 comments on commit 40f4f43

Please sign in to comment.