Skip to content

Commit

Permalink
Rework to manage the waiters list outside of the pool implementation …
Browse files Browse the repository at this point in the history
…in the queue manager
  • Loading branch information
vietj committed Nov 10, 2017
1 parent d8199d1 commit 8d19a38
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 67 deletions.
6 changes: 6 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -86,6 +86,7 @@ class ClientConnection extends Http1xConnectionBase implements HttpClientConnect
private HttpClientRequestImpl requestForResponse; private HttpClientRequestImpl requestForResponse;
private WebSocketImpl ws; private WebSocketImpl ws;


private int count;
private boolean reset; private boolean reset;
private boolean paused; private boolean paused;
private Buffer pausedChunk; private Buffer pausedChunk;
Expand All @@ -106,6 +107,11 @@ class ClientConnection extends Http1xConnectionBase implements HttpClientConnect
this.pipeliningLimit = client.getOptions().getPipeliningLimit(); this.pipeliningLimit = client.getOptions().getPipeliningLimit();
} }


@Override
public int use() {
return count++;
}

public HttpClientMetrics metrics() { public HttpClientMetrics metrics() {
return metrics; return metrics;
} }
Expand Down
85 changes: 64 additions & 21 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -33,14 +33,12 @@
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
Expand Down Expand Up @@ -192,7 +190,7 @@ public void close() {
* pool if the server does not support HTTP/2 or after negotiation. In this situation * pool if the server does not support HTTP/2 or after negotiation. In this situation
* all waiters on this queue will use HTTP/1.1 connections. * all waiters on this queue will use HTTP/1.1 connections.
*/ */
public class ConnQueue { class ConnQueue {


private final QueueManager mgr; private final QueueManager mgr;
private final String peerHost; private final String peerHost;
Expand Down Expand Up @@ -223,16 +221,10 @@ public class ConnQueue {
this.metric = metrics != null ? metrics.createEndpoint(host, port, maxSize) : null; this.metric = metrics != null ? metrics.createEndpoint(host, port, maxSize) : null;
} }


public synchronized void getConnection(Waiter waiter) { synchronized void getConnection(Waiter waiter) {
HttpClientConnection conn = pool.pollConnection(); HttpClientConnection conn = pool.pollConnection();
if (conn != null && conn.isValid()) { if (conn != null) {
ContextImpl context = waiter.context; conn.getContext().runOnContext(v -> deliver(conn, waiter));
if (context == null) {
context = conn.getContext();
} else if (context != conn.getContext()) {
ConnectionManager.log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
}
context.runOnContext(v -> deliverStream(conn, waiter));
} else { } else {
if (pool.canCreateConnection(connCount)) { if (pool.canCreateConnection(connCount)) {
// Create a new connection // Create a new connection
Expand All @@ -251,11 +243,7 @@ public synchronized void getConnection(Waiter waiter) {
} }
} }


/** /*
* Handle the connection if the waiter is not cancelled, otherwise recycle the connection.
*
* @param conn the connection
*/
void deliverStream(HttpClientConnection conn, Waiter waiter) { void deliverStream(HttpClientConnection conn, Waiter waiter) {
if (!conn.isValid()) { if (!conn.isValid()) {
// The connection has been closed - closed connections can be in the pool // The connection has been closed - closed connections can be in the pool
Expand All @@ -275,8 +263,9 @@ void deliverStream(HttpClientConnection conn, Waiter waiter) {
waiter.handleStream(stream); waiter.handleStream(stream);
} }
} }
*/


void closeAllConnections() { private void closeAllConnections() {
pool.closeAllConnections(); pool.closeAllConnections();
} }


Expand All @@ -290,7 +279,7 @@ private void createNewConnection(Waiter waiter) {
} }
createNewConnection(context, ar -> { createNewConnection(context, ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
pool.init(ar.result(), waiter); deliver(ar.result(), waiter);
} else { } else {


// If no specific exception handler is provided, fall back to the HttpClient's exception handler. // If no specific exception handler is provided, fall back to the HttpClient's exception handler.
Expand All @@ -315,6 +304,60 @@ private void createNewConnection(ContextImpl context, Handler<AsyncResult<HttpCl
connector.connect(this, bootstrap, context, peerHost, ssl, pool.version(), host, port, handler); connector.connect(this, bootstrap, context, peerHost, ssl, pool.version(), host, port, handler);
} }


void recycle(HttpClientConnection conn) {
pool.doRecycle(conn);
checkPending();
}

private void deliver(HttpClientConnection conn, Waiter waiter) {
if (conn.isValid()) {
if (!waiter.isCancelled()) {
deliverInternal(conn, waiter);
} else {
// Should check connection is still VALID
recycle(conn);
}
checkPending();
} else {
getConnection(waiter);
}
}

private void deliverInternal(HttpClientConnection conn, Waiter waiter) {
conn.getContext().executeFromIO(() -> {
HttpClientStream stream;
try {
stream = pool.createStream(conn);
} catch (Exception e) {
getConnection(waiter);
return;
}
if (conn.use() == 0) {
waiter.handleConnection(conn);
}
waiter.handleStream(stream);
});
}

private void checkPending() {
while (true) {
Waiter waiter = waiters.peek();
if (waiter == null) {
break;
}
if (waiter.isCancelled()) {
waiters.poll();
} else {
HttpClientConnection conn = pool.pollConnection();
if (conn == null) {
break;
}
waiters.poll();
deliverInternal(conn, waiter);
}
}
}

/** /**
* @return the next non-canceled waiters in the queue * @return the next non-canceled waiters in the queue
*/ */
Expand Down Expand Up @@ -411,12 +454,12 @@ interface Pool<C extends HttpClientConnection> {
*/ */
boolean canCreateConnection(int connCount); boolean canCreateConnection(int connCount);


void init(C conn, Waiter waiter);

void closeAllConnections(); void closeAllConnections();


void recycle(C conn); void recycle(C conn);


void doRecycle(C conn);

HttpClientStream createStream(C conn) throws Exception; HttpClientStream createStream(C conn) throws Exception;


} }
Expand Down
25 changes: 10 additions & 15 deletions src/main/java/io/vertx/core/http/impl/Http1xPool.java
Expand Up @@ -89,7 +89,10 @@ public HttpVersion version() {


@Override @Override
public ClientConnection pollConnection() { public ClientConnection pollConnection() {
return availableConnections.poll(); ClientConnection conn;
while ((conn = availableConnections.poll()) != null && !conn.isValid()) {
}
return conn;
} }


@Override @Override
Expand All @@ -103,23 +106,15 @@ public HttpClientStream createStream(ClientConnection conn) {
} }


@Override @Override
public void init(ClientConnection conn, Waiter waiter) { public void doRecycle(ClientConnection conn) {
conn.getContext().executeFromIO(() -> { synchronized (queue) {
waiter.handleConnection(conn); // Return to set of available from here to not return it several times
queue.deliverStream(conn, waiter); availableConnections.add(conn);
}); }
} }


public void recycle(ClientConnection conn) { public void recycle(ClientConnection conn) {
synchronized (queue) { queue.recycle(conn);
Waiter waiter = queue.getNextWaiter();
if (waiter != null) {
queue.deliverStream(conn, waiter);
} else {
// Return to set of available from here to not return it several times
availableConnections.add(conn);
}
}
} }


@Override @Override
Expand Down
Expand Up @@ -51,6 +51,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
final HttpClientMetrics metrics; final HttpClientMetrics metrics;
final Object queueMetric; final Object queueMetric;
int streamCount; int streamCount;
private int count;
final Handler<AsyncResult<HttpClientConnection>> resultHandler; final Handler<AsyncResult<HttpClientConnection>> resultHandler;


public Http2ClientConnection(Http2Pool http2Pool, public Http2ClientConnection(Http2Pool http2Pool,
Expand All @@ -66,6 +67,11 @@ public Http2ClientConnection(Http2Pool http2Pool,
this.resultHandler = resultHandler; this.resultHandler = resultHandler;
} }


@Override
public int use() {
return count++;
}

@Override @Override
protected void handleConnection() { protected void handleConnection() {
resultHandler.handle(Future.succeededFuture(this)); resultHandler.handle(Future.succeededFuture(this));
Expand Down
29 changes: 6 additions & 23 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -79,6 +79,7 @@ public boolean canCreateConnection(int connCount) {
@Override @Override
public Http2ClientConnection pollConnection() { public Http2ClientConnection pollConnection() {
for (Http2ClientConnection conn : allConnections) { for (Http2ClientConnection conn : allConnections) {
// Julien : check conn is valid ?
if (canReserveStream(conn)) { if (canReserveStream(conn)) {
conn.streamCount++; conn.streamCount++;
return conn; return conn;
Expand Down Expand Up @@ -119,33 +120,11 @@ public void createConn(ContextImpl context, Channel ch, Handler<AsyncResult<Http
} }
} }


@Override
public void init(Http2ClientConnection conn, Waiter waiter) {
conn.getContext().executeFromIO(() -> {
synchronized (queue) {
conn.streamCount++;
waiter.handleConnection(conn); // Should make same tests than in deliverRequest
queue.deliverStream(conn, waiter);
checkPending(conn);
}
});
}

private boolean canReserveStream(Http2ClientConnection handler) { private boolean canReserveStream(Http2ClientConnection handler) {
int maxConcurrentStreams = Math.min(handler.handler.connection().local().maxActiveStreams(), maxConcurrency); int maxConcurrentStreams = Math.min(handler.handler.connection().local().maxActiveStreams(), maxConcurrency);
return handler.streamCount < maxConcurrentStreams; return handler.streamCount < maxConcurrentStreams;
} }


void checkPending(Http2ClientConnection conn) {
synchronized (queue) {
Waiter waiter;
while (canReserveStream(conn) && (waiter = queue.getNextWaiter()) != null) {
conn.streamCount++;
queue.deliverStream(conn, waiter);
}
}
}

void discard(Http2ClientConnection conn) { void discard(Http2ClientConnection conn) {
synchronized (queue) { synchronized (queue) {
if (allConnections.remove(conn)) { if (allConnections.remove(conn)) {
Expand All @@ -159,9 +138,13 @@ void discard(Http2ClientConnection conn) {


@Override @Override
public void recycle(Http2ClientConnection conn) { public void recycle(Http2ClientConnection conn) {
queue.recycle(conn);
}

@Override
public void doRecycle(Http2ClientConnection conn) {
synchronized (queue) { synchronized (queue) {
conn.streamCount--; conn.streamCount--;
checkPending(conn);
} }
} }


Expand Down
Expand Up @@ -30,6 +30,8 @@ interface HttpClientConnection extends HttpConnection {


void reportBytesRead(long s); void reportBytesRead(long s);


int use();

/** /**
* Check if the connection is valid for creating streams. The connection might be closed or a {@literal GOAWAY} * Check if the connection is valid for creating streams. The connection might be closed or a {@literal GOAWAY}
* frame could have been sent or received. * frame could have been sent or received.
Expand Down
Expand Up @@ -640,7 +640,7 @@ private synchronized void connect(Handler<HttpVersion> headersCompletionHandler)
throw new IllegalStateException("You must provide a rawMethod when using an HttpMethod.OTHER method"); throw new IllegalStateException("You must provide a rawMethod when using an HttpMethod.OTHER method");
} }


Waiter waiter = new Waiter(this, vertx.getContext()) { Waiter waiter = new Waiter(this, vertx.getOrCreateContext()) {


@Override @Override
void handleFailure(Throwable failure) { void handleFailure(Throwable failure) {
Expand Down
11 changes: 4 additions & 7 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -944,6 +944,7 @@ public void testConnectionHandler() throws Exception {


@Test @Test
public void testConnectionShutdownInConnectionHandler() throws Exception { public void testConnectionShutdownInConnectionHandler() throws Exception {
waitFor(2);
AtomicInteger serverStatus = new AtomicInteger(); AtomicInteger serverStatus = new AtomicInteger();
server.connectionHandler(conn -> { server.connectionHandler(conn -> {
if (serverStatus.getAndIncrement() == 0) { if (serverStatus.getAndIncrement() == 0) {
Expand Down Expand Up @@ -971,20 +972,16 @@ public void testConnectionShutdownInConnectionHandler() throws Exception {
conn.shutdownHandler(v -> { conn.shutdownHandler(v -> {
assertOnIOContext(ctx); assertOnIOContext(ctx);
clientStatus.compareAndSet(1, 2); clientStatus.compareAndSet(1, 2);
complete();
}); });
if (clientStatus.getAndIncrement() == 0) { if (clientStatus.getAndIncrement() == 0) {
conn.shutdown(); conn.shutdown();
} }
}); });
req1.exceptionHandler(err -> { req1.exceptionHandler(err -> {
fail(); complete();
});
req1.handler(resp -> {
assertEquals(2, clientStatus.getAndIncrement());
resp.endHandler(v -> {
testComplete();
});
}); });
req1.handler(resp -> fail());
req1.end(); req1.end();
await(); await();
} }
Expand Down

0 comments on commit 8d19a38

Please sign in to comment.