Skip to content

Commit

Permalink
Simplify the pool and now let the connection waiter the task to initi…
Browse files Browse the repository at this point in the history
…alize the conneection and recycle the connection if the waiter is not interested anymore by the connection
  • Loading branch information
vietj committed Apr 9, 2018
1 parent 5de0b15 commit 7fd5454
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 107 deletions.
29 changes: 17 additions & 12 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -12,8 +12,6 @@
package io.vertx.core.http.impl;

import io.netty.channel.Channel;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.Pool;
import io.vertx.core.http.impl.pool.Waiter;
Expand Down Expand Up @@ -103,7 +101,6 @@ public Endpoint(Pool<HttpClientConnection> pool, Object metric) {
}

void getConnection(String peerHost, boolean ssl, int port, String host,
Handler<HttpConnection> connectionHandler,
BiFunction<ContextInternal, HttpClientConnection, Boolean> onSuccess,
BiConsumer<ContextInternal, Throwable> onFailure) {
EndpointKey key = new EndpointKey(ssl, port, peerHost, host);
Expand Down Expand Up @@ -131,26 +128,34 @@ void getConnection(String peerHost, boolean ssl, int port, String host,
}
if (endpoint.pool.getConnection(new Waiter<HttpClientConnection>(client.getVertx().getOrCreateContext()) {
@Override
public void handleFailure(ContextInternal ctx, Throwable failure) {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
onFailure.accept(ctx, failure);
}
@Override
public void handleConnection(ContextInternal ctx, HttpClientConnection conn) {

/*
@Override
public void initConnection(ContextInternal ctx, HttpClientConnection conn) {
if (connectionHandler != null) {
ctx.executeFromIO(v -> {
connectionHandler.handle(conn);
});
}
}
@Override
public void handleFailure(ContextInternal ctx, Throwable failure) {
*/

if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
onFailure.accept(ctx, failure);
}
@Override
public boolean handleConnection(ContextInternal ctx, HttpClientConnection conn) throws Exception {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);

boolean claimed = onSuccess.apply(ctx, conn);
if (!claimed) {
conn.recycle();
}
return onSuccess.apply(ctx, conn);
}
})) {
break;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java
Expand Up @@ -78,6 +78,7 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC

private boolean paused;
private Buffer pausedChunk;
private boolean initialized;

Http1xClientConnection(ConnectionListener<HttpClientConnection> listener,
HttpVersion version,
Expand Down Expand Up @@ -735,4 +736,16 @@ public void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClie
}
handler.handle(Future.succeededFuture(currentRequest));
}

@Override
public void recycle() {
listener.onRecycle(true);
}

@Override
public synchronized boolean checkInitialized() {
boolean ret = initialized;
initialized = true;
return ret;
}
}
16 changes: 16 additions & 0 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -42,6 +42,7 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon

private final ConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
private boolean initialized;
final HttpClientMetrics metrics;
final Object queueMetric;

Expand All @@ -58,6 +59,13 @@ public Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
this.listener = listener;
}

@Override
public synchronized boolean checkInitialized() {
boolean ret = initialized;
initialized = true;
return ret;
}

@Override
synchronized void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
listener.onDiscard();
Expand Down Expand Up @@ -95,6 +103,9 @@ public void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClie
synchronized (this) {
try {
Http2Connection conn = handler.connection();



Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
boolean writable = handler.encoder().flowController().isWritable(stream);
Http2ClientStream clientStream = new Http2ClientStream(this, req, stream, writable);
Expand All @@ -107,6 +118,11 @@ public void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClie
completionHandler.handle(fut);
}

@Override
public void recycle() {
listener.onRecycle(false);
}

@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
Expand Down
Expand Up @@ -34,4 +34,8 @@ interface HttpClientConnection extends HttpConnection {

ContextInternal getContext();

boolean checkInitialized();

void recycle();

}
6 changes: 2 additions & 4 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -21,7 +21,6 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
Expand Down Expand Up @@ -946,7 +945,7 @@ private void getConnectionForWebsocket(boolean ssl,
String host,
Handler<Http1xClientConnection> handler,
Handler<Throwable> connectionExceptionHandler) {
websocketCM.getConnection(host, ssl, port, host, null, (ctx, conn) -> {
websocketCM.getConnection(host, ssl, port, host, (ctx, conn) -> {
ctx.executeFromIO(v -> {
handler.handle((Http1xClientConnection) conn);
});
Expand All @@ -959,10 +958,9 @@ private void getConnectionForWebsocket(boolean ssl,
}

void getConnectionForRequest(String peerHost, boolean ssl, int port, String host,
Handler<HttpConnection> connectionHandler,
BiFunction<ContextInternal, HttpClientConnection, Boolean> onSuccess,
BiConsumer<ContextInternal, Throwable> onFailure) {
httpCM.getConnection(peerHost, ssl, port, host, connectionHandler, onSuccess, onFailure);
httpCM.getConnection(peerHost, ssl, port, host, onSuccess, onFailure);
}

/**
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -632,16 +632,27 @@ private synchronized void connect() {
peerHost = host;
}

// Capture the handler
Handler<HttpConnection> initializer = connectionHandler;

// We defer actual connection until the first part of body is written or end is called
// This gives the user an opportunity to set an exception handler before connecting so
// they can capture any exceptions on connection
client.getConnectionForRequest(peerHost, ssl, port, host, connectionHandler, (ctx, conn) -> {
client.getConnectionForRequest(peerHost, ssl, port, host, (ctx, conn) -> {
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
if (exceptionOccurred != null || reset != null) {
return false;
}
// checkContext(ctx);

//
if (!conn.checkInitialized() && initializer != null) {
ctx.executeFromIO(() -> {
initializer.handle(conn);
});
}

//
conn.createStream(HttpClientRequestImpl.this, ar -> {
if (ar.succeeded()) {
HttpClientStream stream = ar.result();
Expand Down
27 changes: 2 additions & 25 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -172,16 +172,10 @@ private boolean acquireConnection(Waiter<C> waiter) {
}
ContextInternal ctx = conn.context;
ctx.nettyEventLoop().execute(() -> {
boolean handled = deliverToWaiter(conn, waiter);
synchronized (Pool.this) {
waitersCount--;
if (!handled) {
synchronized (Pool.this) {
recycleConnection(conn, 1,false);
checkPending();
}
}
}
waiter.handleConnection(conn.context, conn.connection);
});
return true;
} else if (weight < maxWeight) {
Expand Down Expand Up @@ -216,7 +210,6 @@ public void onConnectSuccess(C conn, long concurrency, Channel channel, ContextI
initConnection(holder, context, concurrency, conn, channel, actualWeight);
}
// Init connection - state might change (i.e init could close the connection)
waiter.initConnection(context, conn);
synchronized (Pool.this) {
if (holder.capacity == 0) {
waitersQueue.add(waiter);
Expand All @@ -229,11 +222,8 @@ public void onConnectSuccess(C conn, long concurrency, Channel channel, ContextI
available.add(holder);
}
}
boolean consumed = deliverToWaiter(holder, waiter);
waiter.handleConnection(holder.context, holder.connection);
synchronized (Pool.this) {
if (!consumed) {
recycleConnection(holder, 1,false);
}
checkPending();
}
}
Expand Down Expand Up @@ -311,19 +301,6 @@ private void closeConnection(Holder<C> holder) {
weight -= holder.weight;
}

/**
* Should not be called under the pool lock.
*/
private boolean deliverToWaiter(Holder<C> conn, Waiter<C> waiter) {
try {
return waiter.handleConnection(conn.context, conn.connection);
} catch (Exception e) {
// Handle this case gracefully
e.printStackTrace();
return true;
}
}

// These methods assume to be called under synchronization

private void recycleConnection(Holder<C> conn, int c, boolean closeable) {
Expand Down
11 changes: 1 addition & 10 deletions src/main/java/io/vertx/core/http/impl/pool/Waiter.java
Expand Up @@ -32,21 +32,12 @@ protected Waiter(ContextInternal context) {
*/
public abstract void handleFailure(ContextInternal ctx, Throwable failure);

/**
* Init connection, this callback is on an event loop thread.
*
* @param ctx the context used to create the connection
* @param conn the connection
*/
public abstract void initConnection(ContextInternal ctx, C conn);

/**
* Handle connection success, this callback is on an event loop thread.
*
* @param ctx the context used to create the connection
* @param conn the connection
* @return whether the waiter uses the connection
*/
public abstract boolean handleConnection(ContextInternal ctx, C conn) throws Exception;
public abstract void handleConnection(ContextInternal ctx, C conn);

}
9 changes: 2 additions & 7 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -969,13 +969,8 @@ public void testConnectionShutdownInConnectionHandler() throws Exception {
conn.shutdown();
}
});
req1.exceptionHandler(err -> fail());
req1.handler(resp -> {
resp.bodyHandler(body -> {
assertEquals("6", body.toString());
complete();
});
});
req1.exceptionHandler(err -> complete());
req1.handler(resp -> fail("Was not expecting the response to complete"));
req1.end();
await();
}
Expand Down
7 changes: 2 additions & 5 deletions src/test/java/io/vertx/test/core/Http2Test.java
Expand Up @@ -379,11 +379,9 @@ public void testServePendingRequests() throws Exception {

@Test
public void testInitialMaxConcurrentStreamZero() throws Exception {
AtomicLong concurrency = new AtomicLong();
server.close();
server = vertx.createHttpServer(createBaseServerOptions().setInitialSettings(new Http2Settings().setMaxConcurrentStreams(0)));
server.requestHandler(req -> {
assertEquals(10, concurrency.get());
req.response().end();
});
server.connectionHandler(conn -> {
Expand All @@ -395,9 +393,8 @@ public void testInitialMaxConcurrentStreamZero() throws Exception {
client.get(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, DEFAULT_TEST_URI, resp -> {
testComplete();
}).connectionHandler(conn -> {
assertEquals(0, conn.remoteSettings().getMaxConcurrentStreams());
conn.remoteSettingsHandler(settings -> concurrency.set(settings.getMaxConcurrentStreams()));
}).setTimeout(10000).exceptionHandler(err -> fail(err)).end();
assertEquals(10, conn.remoteSettings().getMaxConcurrentStreams());
}).setTimeout(10000).exceptionHandler(this::fail).end();
await();
}

Expand Down

0 comments on commit 7fd5454

Please sign in to comment.