Skip to content

Commit

Permalink
Improve connection pool to correctly handle concurrency connection up…
Browse files Browse the repository at this point in the history
…dates
  • Loading branch information
vietj committed Nov 11, 2017
1 parent 0e72898 commit 4c69f70
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 69 deletions.
Expand Up @@ -460,7 +460,7 @@ public void reset(long code) {
private void requestEnded() { private void requestEnded() {
context.runOnContext(v -> { context.runOnContext(v -> {
if (pipelining && requests.size() < pipeliningLimit) { if (pipelining && requests.size() < pipeliningLimit) {
listener.onRecycle(this); listener.recycle(this);
} }
}); });
} }
Expand All @@ -471,7 +471,7 @@ private void responseEnded() {
} else { } else {
context.runOnContext(v -> { context.runOnContext(v -> {
if (currentRequest == null) { if (currentRequest == null) {
listener.onRecycle(this); listener.recycle(this);
} }
}); });
} }
Expand Down Expand Up @@ -630,7 +630,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf); connection.handleMessageReceived(buf);
} }
}.removeHandler(sock -> listener.onClose(this))); }.removeHandler(sock -> listener.closed(this)));
return socket; return socket;
} }


Expand Down
Expand Up @@ -70,8 +70,8 @@ public Channel channel() {
} }


@Override @Override
protected void onConcurrencyChange() { protected void concurrencyChanged() {
listener.onRecycle(this); listener.concurrencyChanged(this);
} }


@Override @Override
Expand All @@ -82,7 +82,7 @@ public HttpClientMetrics metrics() {
@Override @Override
void onStreamClosed(Http2Stream nettyStream) { void onStreamClosed(Http2Stream nettyStream) {
super.onStreamClosed(nettyStream); super.onStreamClosed(nettyStream);
listener.onRecycle(this); listener.recycle(this);
} }


public synchronized HttpClientStream createStream() throws Http2Exception { public synchronized HttpClientStream createStream() throws Http2Exception {
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -91,12 +91,14 @@ static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
private boolean closed; private boolean closed;
private boolean goneAway; private boolean goneAway;
private int windowSize; private int windowSize;
private long maxConcurrentStreams;


public Http2ConnectionBase(ContextImpl context, VertxHttp2ConnectionHandler handler) { public Http2ConnectionBase(ContextImpl context, VertxHttp2ConnectionHandler handler) {
super(context.owner(), handler.context(), context); super(context.owner(), handler.context(), context);
this.handler = handler; this.handler = handler;
this.handlerContext = chctx; this.handlerContext = chctx;
this.windowSize = handler.connection().local().flowController().windowSize(handler.connection().connectionStream()); this.windowSize = handler.connection().local().flowController().windowSize(handler.connection().connectionStream());
this.maxConcurrentStreams = io.vertx.core.http.Http2Settings.DEFAULT_MAX_CONCURRENT_STREAMS;
} }


VertxInternal vertx() { VertxInternal vertx() {
Expand Down Expand Up @@ -210,12 +212,17 @@ public synchronized void onSettingsAckRead(ChannelHandlerContext ctx) {
protected void onConnect() { protected void onConnect() {
} }


protected void onConcurrencyChange() { protected void concurrencyChanged() {
} }


@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
boolean update = !Objects.equals(remoteSettings.maxConcurrentStreams(), settings.maxConcurrentStreams()); boolean changed = false;
if (settings.maxConcurrentStreams() != null) {
long val = settings.maxConcurrentStreams();
changed = val != maxConcurrentStreams;
maxConcurrentStreams = val;
}
remoteSettings = settings; remoteSettings = settings;
synchronized (this) { synchronized (this) {
Handler<io.vertx.core.http.Http2Settings> handler = remoteSettingsHandler; Handler<io.vertx.core.http.Http2Settings> handler = remoteSettingsHandler;
Expand All @@ -225,8 +232,8 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
}); });
} }
} }
if (update) { if (changed) {
onConcurrencyChange(); concurrencyChanged();
} }
} }


Expand Down
Expand Up @@ -262,7 +262,7 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
handler.handle(Future.succeededFuture(conn)); handler.handle(Future.succeededFuture(conn));
}); });
clientHandler.removeHandler(conn -> { clientHandler.removeHandler(conn -> {
listener.onClose(conn); listener.closed(conn);
}); });
ch.pipeline().addLast("handler", clientHandler); ch.pipeline().addLast("handler", clientHandler);
} }
Expand Down Expand Up @@ -297,7 +297,7 @@ private void http2Connected(ConnectionListener<HttpClientConnection> listener,
if (metrics != null) { if (metrics != null) {
metrics.endpointDisconnected(endpointMetric, conn.metric()); metrics.endpointDisconnected(endpointMetric, conn.metric());
} }
listener.onClose(conn); listener.closed(conn);
}); });
} catch (Exception e) { } catch (Exception e) {
connectFailed(ch, resultHandler, e); connectFailed(ch, resultHandler, e);
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -957,10 +957,11 @@ private void getConnectionForWebsocket(boolean ssl,
public void initConnection(HttpClientConnection conn) { public void initConnection(HttpClientConnection conn) {
} }
@Override @Override
public void handleConnection(HttpClientConnection conn) { public boolean handleConnection(HttpClientConnection conn) {
conn.getContext().executeFromIO(() -> { conn.getContext().executeFromIO(() -> {
handler.handle((Http1xClientConnection) conn); handler.handle((Http1xClientConnection) conn);
}); });
return true;
} }
@Override @Override
public void handleFailure(ContextInternal ctx, Throwable failure) { public void handleFailure(ContextInternal ctx, Throwable failure) {
Expand All @@ -972,10 +973,6 @@ public void handleFailure(ContextInternal ctx, Throwable failure) {
connectionExceptionHandler.handle(failure); connectionExceptionHandler.handle(failure);
} }
} }
@Override
public boolean isCancelled() {
return false;
}
}); });
} }


Expand Down
15 changes: 7 additions & 8 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -667,20 +667,19 @@ public void initConnection(HttpClientConnection conn) {
} }


@Override @Override
public void handleConnection(HttpClientConnection conn) throws Exception { public boolean handleConnection(HttpClientConnection conn) throws Exception {
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
if (exceptionOccurred != null || reset != null) {
return false;
}
HttpClientStream stream; HttpClientStream stream;
stream = conn.createStream(); stream = conn.createStream();
ContextImpl context = conn.getContext(); ContextImpl context = conn.getContext();
context.executeFromIO(() -> { context.executeFromIO(() -> {
connected(stream, headersCompletionHandler); connected(stream, headersCompletionHandler);
}); });
} return true;

@Override
public boolean isCancelled() {
// No need to synchronize as the thread is the same that set exceptionOccurred to true
// exceptionOccurred=true getting the connection => it's a TimeoutException
return exceptionOccurred != null || reset != null;
} }
}; };


Expand Down
27 changes: 23 additions & 4 deletions src/main/java/io/vertx/core/http/impl/HttpConnectionPool.java
Expand Up @@ -90,11 +90,11 @@ public int maxSize() {
} }


@Override @Override
public void initConnection(HttpClientConnection conn) { public boolean initConnection(HttpClientConnection conn) {
if (conn instanceof Http1xClientConnection && current instanceof Http2) { if (conn instanceof Http1xClientConnection && current instanceof Http2) {
fallbackToHttp1x(((Http1xClientConnection) conn).version()); fallbackToHttp1x(((Http1xClientConnection) conn).version());
} }
((ConnectionPool<HttpClientConnection>) current).initConnection(conn); return ((ConnectionPool<HttpClientConnection>) current).initConnection(conn);
} }


@Override @Override
Expand All @@ -107,6 +107,10 @@ public void evictConnection(HttpClientConnection conn) {
((ConnectionPool<HttpClientConnection>) current).evictConnection(conn); ((ConnectionPool<HttpClientConnection>) current).evictConnection(conn);
} }


@Override
public void close() {
}

class Http2 implements ConnectionPool<Http2ClientConnection> { class Http2 implements ConnectionPool<Http2ClientConnection> {


// Pools must locks on the queue object to keep a single lock // Pools must locks on the queue object to keep a single lock
Expand Down Expand Up @@ -164,8 +168,14 @@ public void evictConnection(Http2ClientConnection conn) {
} }


@Override @Override
public void initConnection(Http2ClientConnection conn) { public boolean initConnection(Http2ClientConnection conn) {
allConnections.add(conn); allConnections.add(conn);
if (canReserveStream(conn)) {
conn.streamCount++;
return true;
} else {
return false;
}
} }


@Override @Override
Expand All @@ -182,6 +192,10 @@ public boolean isValid(Http2ClientConnection conn) {
public ContextImpl getContext(Http2ClientConnection conn) { public ContextImpl getContext(Http2ClientConnection conn) {
return conn.getContext(); return conn.getContext();
} }

@Override
public void close() {
}
} }


private class Http1x implements ConnectionPool<Http1xClientConnection> { private class Http1x implements ConnectionPool<Http1xClientConnection> {
Expand Down Expand Up @@ -218,8 +232,9 @@ public boolean canCreateConnection(int connCount) {
} }


@Override @Override
public void initConnection(Http1xClientConnection conn) { public boolean initConnection(Http1xClientConnection conn) {
allConnections.add(conn); allConnections.add(conn);
return true;
} }


@Override @Override
Expand Down Expand Up @@ -255,5 +270,9 @@ public boolean isValid(Http1xClientConnection conn) {
public ContextImpl getContext(Http1xClientConnection conn) { public ContextImpl getContext(Http1xClientConnection conn) {
return conn.getContext(); return conn.getContext();
} }

@Override
public void close() {
}
} }
} }
21 changes: 19 additions & 2 deletions src/main/java/io/vertx/core/http/impl/pool/ConnectionListener.java
Expand Up @@ -20,8 +20,25 @@
*/ */
public interface ConnectionListener<C> { public interface ConnectionListener<C> {


void onRecycle(C conn); /**
* The connection concurrency changed.
*
* @param conn the connection
*/
void concurrencyChanged(C conn);


void onClose(C conn); /**
* The connection can be recycled.
*
* @param conn the connection
*/
void recycle(C conn);

/**
* The connection is closed.
*
* @param conn the connection
*/
void closed(C conn);


} }
73 changes: 42 additions & 31 deletions src/main/java/io/vertx/core/http/impl/pool/ConnectionManager.java
Expand Up @@ -17,10 +17,8 @@
package io.vertx.core.http.impl.pool; package io.vertx.core.http.impl.pool;


import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.vertx.core.Vertx;
import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress; import io.vertx.core.net.SocketAddress;
Expand Down Expand Up @@ -171,27 +169,27 @@ private synchronized void checkPending() {
if (metric != null) { if (metric != null) {
metrics.dequeueRequest(metric, waiter.metric); metrics.dequeueRequest(metric, waiter.metric);
} }
if (waiter.isCancelled()) { C conn = pool.pollConnection();
if (conn != null) {
waiters.poll(); waiters.poll();
} else { ContextImpl ctx = pool.getContext(conn);
C conn = pool.pollConnection(); ctx.nettyEventLoop().execute(() -> {
if (conn != null) { if (pool.isValid(conn)) {
waiters.poll(); boolean handled = deliverInternal(conn, waiter);
ContextImpl ctx = pool.getContext(conn); if (!handled) {
ctx.nettyEventLoop().execute(() -> { pool.recycleConnection(conn);
if (pool.isValid(conn)) { checkPending();
deliverInternal(conn, waiter);
} else {
onClose(conn);
getConnection(waiter);
} }
}); } else {
} else if (pool.canCreateConnection(connCount)) { closed(conn);
waiters.poll(); getConnection(waiter);
createConnection(waiter); }
} else { });
break; } else if (pool.canCreateConnection(connCount)) {
} waiters.poll();
createConnection(waiter);
} else {
break;
} }
} }
} }
Expand All @@ -216,13 +214,18 @@ private void createConnection(Waiter<C> waiter) {
} }


@Override @Override
public synchronized void onRecycle(C conn) { public void concurrencyChanged(C conn) {
checkPending();
}

@Override
public void recycle(C conn) {
pool.recycleConnection(conn); pool.recycleConnection(conn);
checkPending(); checkPending();
} }


@Override @Override
public synchronized void onClose(C conn) { public synchronized void closed(C conn) {
Channel channel = connector.channel(conn); Channel channel = connector.channel(conn);
connectionMap.remove(channel); connectionMap.remove(channel);
pool.evictConnection(conn); pool.evictConnection(conn);
Expand All @@ -231,34 +234,42 @@ public synchronized void onClose(C conn) {


private synchronized void initConnection(Waiter<C> waiter, C conn) { private synchronized void initConnection(Waiter<C> waiter, C conn) {
connectionMap.put(connector.channel(conn), conn); connectionMap.put(connector.channel(conn), conn);
pool.initConnection(conn); boolean ok = pool.initConnection(conn);
waiter.initConnection(conn); waiter.initConnection(conn);
if (waiter.isCancelled()) { if (ok) {
pool.recycleConnection(conn); boolean handled = deliverInternal(conn, waiter);
if (!handled) {
pool.recycleConnection(conn);
}
checkPending();
} else { } else {
deliverInternal(conn, waiter); getConnection(waiter);
} }
checkPending();
} }


private void deliverInternal(C conn, Waiter<C> waiter) { private boolean deliverInternal(C conn, Waiter<C> waiter) {
try { try {
waiter.handleConnection(conn); return waiter.handleConnection(conn);
} catch (Exception e) { } catch (Exception e) {
getConnection(waiter); e.printStackTrace();
return true;
} }
} }


// Called if the connection is actually closed OR the connection attempt failed // Called if the connection is actually closed OR the connection attempt failed
synchronized void connectionClosed() { synchronized void connectionClosed() {
connCount--; connCount--;
checkPending(); checkPending();

// CHECK WAITERS

if (connCount == 0) { if (connCount == 0) {
// No waiters and no connections - remove the ConnQueue // No waiters and no connections - remove the ConnQueue
endpointMap.remove(key); endpointMap.remove(key);
if (metrics != null) { if (metrics != null) {
metrics.closeEndpoint(host, port, metric); metrics.closeEndpoint(host, port, metric);
} }
pool.close();
} }
} }
} }
Expand Down

0 comments on commit 4c69f70

Please sign in to comment.