Skip to content

Commit

Permalink
Rework a bit connection pools internals based on feedback:
Browse files Browse the repository at this point in the history
- waiter aren't be added to the waiters list when there is no waiter
- extract the connection acquisition from checkPending() and use it in getConnection
- remove the all Set<Holder> and use the weight instead
- set a removed boolean on Holder to tolerate incorrect usage of ConnectionListener
- removed the validity check on the connection in favor of a pool eviction
- calling HttpConnection#goAway(code) uses the last known stream instead of max int value
  • Loading branch information
vietj committed Dec 4, 2017
1 parent 52e7a1b commit 9554c87
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 177 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/HttpConnection.java
Expand Up @@ -68,11 +68,11 @@ default HttpConnection setWindowSize(int windowSize) {
} }


/** /**
* Like {@link #goAway(long, int)} with a last stream id {@code 2^31-1}. * Like {@link #goAway(long, int)} with a last stream id {@code -1} which means to disallow any new stream creation.
*/ */
@Fluent @Fluent
default HttpConnection goAway(long errorCode) { default HttpConnection goAway(long errorCode) {
return goAway(errorCode, Integer.MAX_VALUE); return goAway(errorCode, -1);
} }


/** /**
Expand Down
Expand Up @@ -70,15 +70,16 @@ class Http1xClientConnection extends Http1xConnectionBase implements HttpClientC
private final String host; private final String host;
private final int port; private final int port;
private final Object endpointMetric; private final Object endpointMetric;
private final Deque<StreamImpl> pending = new ArrayDeque<>();
private final Deque<StreamImpl> inflight = new ArrayDeque<>();
private final HttpClientMetrics metrics; private final HttpClientMetrics metrics;
private final HttpVersion version; private final HttpVersion version;


private WebSocketClientHandshaker handshaker; private WebSocketClientHandshaker handshaker;
private WebSocketImpl ws;

private final Deque<StreamImpl> pending = new ArrayDeque<>();
private final Deque<StreamImpl> inflight = new ArrayDeque<>();
private StreamImpl currentRequest; private StreamImpl currentRequest;
private StreamImpl currentResponse; private StreamImpl currentResponse;
private WebSocketImpl ws;


private boolean paused; private boolean paused;
private Buffer pausedChunk; private Buffer pausedChunk;
Expand Down Expand Up @@ -329,7 +330,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf); connection.handleMessageReceived(buf);
} }
}.removeHandler(sock -> conn.listener.onClose())); }.removeHandler(sock -> conn.listener.onDiscard()));


return socket; return socket;
} }
Expand Down Expand Up @@ -611,10 +612,6 @@ private void handshakeComplete(ChannelHandlerContext ctx, FullHttpResponse respo
} }
} }


public boolean isValid() {
return chctx.channel().isOpen();
}

@Override @Override
public synchronized void handleInterestedOpsChanged() { public synchronized void handleInterestedOpsChanged() {
if (!isNotWritable()) { if (!isNotWritable()) {
Expand Down Expand Up @@ -712,6 +709,7 @@ protected synchronized void handleException(Throwable e) {


@Override @Override
public synchronized void close() { public synchronized void close() {
listener.onDiscard();
if (handshaker == null) { if (handshaker == null) {
super.close(); super.close();
} else { } else {
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -63,6 +63,18 @@ public Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
this.listener = listener; this.listener = listener;
} }


@Override
synchronized void onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
listener.onDiscard();
super.onGoAwaySent(lastStreamId, errorCode, debugData);
}

@Override
synchronized void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
listener.onDiscard();
super.onGoAwayReceived(lastStreamId, errorCode, debugData);
}

@Override @Override
public Channel channel() { public Channel channel() {
return chctx.channel(); return chctx.channel();
Expand Down Expand Up @@ -105,11 +117,6 @@ public void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClie
completionHandler.handle(fut); completionHandler.handle(fut);
} }


@Override
public boolean isValid() {
return !isClosed() && !isGoneAway();
}

@Override @Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception { public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
Expand Down
Expand Up @@ -131,10 +131,6 @@ synchronized boolean isClosed() {
return closed; return closed;
} }


synchronized boolean isGoneAway() {
return goneAway;
}

synchronized void onConnectionError(Throwable cause) { synchronized void onConnectionError(Throwable cause) {
synchronized (this) { synchronized (this) {
for (VertxHttp2Stream stream : streams.values()) { for (VertxHttp2Stream stream : streams.values()) {
Expand Down Expand Up @@ -344,7 +340,7 @@ public synchronized HttpConnection goAway(long errorCode, int lastStreamId, Buff
throw new IllegalArgumentException(); throw new IllegalArgumentException();
} }
if (lastStreamId < 0) { if (lastStreamId < 0) {
throw new IllegalArgumentException(); lastStreamId = handler.connection().remote().lastStreamCreated();
} }
handler.writeGoAway(errorCode, lastStreamId, debugData != null ? debugData.getByteBuf() : Unpooled.EMPTY_BUFFER); handler.writeGoAway(errorCode, lastStreamId, debugData != null ? debugData.getByteBuf() : Unpooled.EMPTY_BUFFER);
return this; return this;
Expand Down
Expand Up @@ -87,11 +87,6 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
this.port = port; this.port = port;
} }


@Override
public boolean isValid(HttpClientConnection conn) {
return conn.isValid();
}

@Override @Override
public void close(HttpClientConnection conn) { public void close(HttpClientConnection conn) {
conn.close(); conn.close();
Expand Down Expand Up @@ -295,7 +290,7 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
listener.onConnectSuccess(conn, http1MaxConcurrency, ch, context, weight, http1Weight); listener.onConnectSuccess(conn, http1MaxConcurrency, ch, context, weight, http1Weight);
}); });
clientHandler.removeHandler(conn -> { clientHandler.removeHandler(conn -> {
listener.onClose(); listener.onDiscard();
}); });
ch.pipeline().addLast("handler", clientHandler); ch.pipeline().addLast("handler", clientHandler);
} }
Expand Down Expand Up @@ -332,7 +327,7 @@ private void http2Connected(ConnectionListener<HttpClientConnection> listener,
if (metrics != null) { if (metrics != null) {
metrics.endpointDisconnected(metric, conn.metric()); metrics.endpointDisconnected(metric, conn.metric());
} }
listener.onClose(); listener.onDiscard();
}); });
} catch (Exception e) { } catch (Exception e) {
connectFailed(context, ch, listener, e); connectFailed(context, ch, listener, e);
Expand Down
Expand Up @@ -35,12 +35,6 @@ interface HttpClientConnection extends HttpConnection {


void close(); void close();


/**
* Check if the connection is valid for creating streams. The connection might be closed or a {@literal GOAWAY}
* frame could have been sent or received.
*/
boolean isValid();

void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClientStream>> handler); void createStream(HttpClientRequestImpl req, Handler<AsyncResult<HttpClientStream>> handler);


ContextImpl getContext(); ContextImpl getContext();
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -1079,11 +1079,7 @@ public synchronized ReadStream<WebSocket> handler(Handler<WebSocket> handler) {
} }
getConnectionForWebsocket(ssl != null ? ssl : options.isSsl(), port, host, conn -> { getConnectionForWebsocket(ssl != null ? ssl : options.isSsl(), port, host, conn -> {
conn.exceptionHandler(connectionExceptionHandler); conn.exceptionHandler(connectionExceptionHandler);
if (conn.isValid()) { conn.toWebSocket(requestURI, headers, version, subProtocols, options.getMaxWebsocketFrameSize(), wsConnect);
conn.toWebSocket(requestURI, headers, version, subProtocols, options.getMaxWebsocketFrameSize(), wsConnect);
} else {
websocket(port, host, requestURI, headers, version, subProtocols, wsConnect);
}
}, connectionExceptionHandler, context); }, connectionExceptionHandler, context);
} }
return this; return this;
Expand Down
Expand Up @@ -682,7 +682,6 @@ private synchronized void connect() {
ctx.executeFromIO(() -> { ctx.executeFromIO(() -> {
handleException(failure); handleException(failure);
}); });

}); });
connecting = true; connecting = true;
} }
Expand Down
Expand Up @@ -66,8 +66,8 @@ void onConnectSuccess(C conn,
void onRecycle(int capacity, boolean disposable); void onRecycle(int capacity, boolean disposable);


/** /**
* Signals the connection is closed. * Signals the connection must not be used anymore by the pool.
*/ */
void onClose(); void onDiscard();


} }
Expand Up @@ -34,13 +34,6 @@ public interface ConnectionProvider<C> {
*/ */
long connect(ConnectionListener<C> listener, ContextImpl context); long connect(ConnectionListener<C> listener, ContextImpl context);


/**
* Check wether a connection is valid.
*
* @param conn the connection to check
*/
boolean isValid(C conn);

/** /**
* Close a connection. * Close a connection.
* *
Expand Down

0 comments on commit 9554c87

Please sign in to comment.