Skip to content

Commit

Permalink
Rewrite the pool implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 13, 2017
1 parent 27bb6e4 commit a57fb6b
Show file tree
Hide file tree
Showing 15 changed files with 1,147 additions and 593 deletions.
Expand Up @@ -460,7 +460,7 @@ public void reset(long code) {
private void requestEnded() {
context.runOnContext(v -> {
if (pipelining && requests.size() < pipeliningLimit) {
listener.recycle(this);
listener.onRecycle(this);
}
});
}
Expand All @@ -471,7 +471,7 @@ private void responseEnded() {
} else {
context.runOnContext(v -> {
if (currentRequest == null) {
listener.recycle(this);
listener.onRecycle(this);
}
});
}
Expand Down Expand Up @@ -630,7 +630,7 @@ protected void handleMessage(NetSocketImpl connection, ContextImpl context, Chan
ByteBuf buf = (ByteBuf) msg;
connection.handleMessageReceived(buf);
}
}.removeHandler(sock -> listener.closed(this)));
}.removeHandler(sock -> listener.onClose(this)));
return socket;
}

Expand Down
Expand Up @@ -49,7 +49,6 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
private final HttpClientImpl client;
final HttpClientMetrics metrics;
final Object queueMetric;
int streamCount; // Exclusively used by the HTTP/2 connection pool

public Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
Object queueMetric,
Expand All @@ -70,8 +69,8 @@ public Channel channel() {
}

@Override
protected void concurrencyChanged() {
listener.concurrencyChanged(this);
protected void concurrencyChanged(long concurrency) {
listener.onConcurrencyChange(this, concurrency);
}

@Override
Expand All @@ -82,7 +81,7 @@ public HttpClientMetrics metrics() {
@Override
void onStreamClosed(Http2Stream nettyStream) {
super.onStreamClosed(nettyStream);
listener.recycle(this);
listener.onRecycle(this);
}

public synchronized HttpClientStream createStream() throws Http2Exception {
Expand Down
20 changes: 13 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -84,7 +84,7 @@ static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
private final ArrayDeque<Runnable> updateSettingsHandlers = new ArrayDeque<>(4);
private final ArrayDeque<Handler<AsyncResult<Buffer>>> pongHandlers = new ArrayDeque<>();
private Http2Settings localSettings = new Http2Settings();
private Http2Settings remoteSettings = new Http2Settings();
private Http2Settings remoteSettings;
private Handler<GoAway> goAwayHandler;
private Handler<Void> shutdownHandler;
private Handler<Buffer> pingHandler;
Expand Down Expand Up @@ -212,16 +212,22 @@ public synchronized void onSettingsAckRead(ChannelHandlerContext ctx) {
protected void onConnect() {
}

protected void concurrencyChanged() {
protected void concurrencyChanged(long concurrency) {
}

@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
boolean changed = false;
if (settings.maxConcurrentStreams() != null) {
long val = settings.maxConcurrentStreams();
changed = val != maxConcurrentStreams;
boolean changed;
Long val = settings.maxConcurrentStreams();
if (val != null) {
if (remoteSettings != null) {
changed = val != maxConcurrentStreams;
} else {
changed = false;
}
maxConcurrentStreams = val;
} else {
changed = false;
}
remoteSettings = settings;
synchronized (this) {
Expand All @@ -233,7 +239,7 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
}
}
if (changed) {
concurrencyChanged();
concurrencyChanged(maxConcurrentStreams);
}
}

Expand Down
63 changes: 35 additions & 28 deletions src/main/java/io/vertx/core/http/impl/HttpChannelConnector.java
Expand Up @@ -25,7 +25,6 @@
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
Expand All @@ -51,34 +50,41 @@ class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
private final HttpClientMetrics metrics;
private final SSLHelper sslHelper;
private final HttpVersion version;
private final long weight;
private final long http1Weight;
private final long http2Weight;
private final long http2MaxConcurrency;

HttpChannelConnector(HttpClientImpl client) {
this.client = client;
this.options = client.getOptions();
this.metrics = client.metrics();
this.sslHelper = client.getSslHelper();
this.version = options.getProtocolVersion();
this.http1Weight = client.getOptions().getHttp2MaxPoolSize();
this.http2Weight = client.getOptions().getMaxPoolSize();
this.weight = version == HttpVersion.HTTP_2 ? http2Weight : http1Weight;
this.http2MaxConcurrency = client.getOptions().getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : client.getOptions().getHttp2MultiplexingLimit();
}

@Override
public Channel channel(HttpClientConnection conn) {
return conn.channel();
public boolean isValid(HttpClientConnection conn) {
return conn.isValid();
}

@Override
public void close(HttpClientConnection conn) {
conn.close();
}

public void connect(
public long connect(
ConnectionListener<HttpClientConnection> listener,
Object endpointMetric,
ContextImpl context,
String peerHost,
boolean ssl,
String host,
int port,
Handler<AsyncResult<HttpClientConnection>> handler) {
int port) {

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.nettyEventLoop());
Expand Down Expand Up @@ -109,19 +115,19 @@ public void connect(
if (useAlpn) {
if ("h2".equals(protocol)) {
applyHttp2ConnectionOptions(ch.pipeline());
http2Connected(listener, endpointMetric, context, ch, handler);
http2Connected(listener, endpointMetric, context, ch);
} else {
applyHttp1xConnectionOptions(ch.pipeline());
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
http1xConnected(listener, fallbackProtocol, host, port, true, endpointMetric, context, ch, handler);
http1xConnected(listener, fallbackProtocol, host, port, true, endpointMetric, context, ch);
}
} else {
applyHttp1xConnectionOptions(ch.pipeline());
http1xConnected(listener, version, host, port, true, endpointMetric, context, ch, handler);
http1xConnected(listener, version, host, port, true, endpointMetric, context, ch);
}
} else {
handshakeFailure(ch, fut.cause(), handler);
handshakeFailure(ch, fut.cause(), listener);
}
});
} else {
Expand Down Expand Up @@ -149,7 +155,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
p.remove(this);
// Upgrade handler will remove itself
applyHttp1xConnectionOptions(ch.pipeline());
http1xConnected(listener, HttpVersion.HTTP_1_1, host, port, false, endpointMetric, context, ch, handler);
http1xConnected(listener, HttpVersion.HTTP_1_1, host, port, false, endpointMetric, context, ch);
}
}
@Override
Expand All @@ -165,7 +171,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
applyHttp2ConnectionOptions(pipeline);
http2Connected(listener, endpointMetric, context, ch, handler);
http2Connected(listener, endpointMetric, context, ch);
}
};
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
Expand All @@ -188,18 +194,20 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
// Upgrade handler do nothing
} else {
if (version == HttpVersion.HTTP_2 && !client.getOptions().isHttp2ClearTextUpgrade()) {
http2Connected(listener, endpointMetric, context, ch, handler);
http2Connected(listener, endpointMetric, context, ch);
} else {
http1xConnected(listener, version, host, port, false, endpointMetric, context, ch, handler);
http1xConnected(listener, version, host, port, false, endpointMetric, context, ch);
}
}
}
} else {
connectFailed(null, handler, res.cause());
connectFailed(null, listener, res.cause());
}
};

channelProvider.connect(client.getVertx(), bootstrap, client.getOptions().getProxyOptions(), SocketAddress.inetSocketAddress(port, host), channelInitializer, channelHandler);

return weight;
}

private void applyConnectionOptions(Bootstrap bootstrap) {
Expand Down Expand Up @@ -231,12 +239,12 @@ private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
}
}

private void handshakeFailure(Channel ch, Throwable cause, Handler<AsyncResult<HttpClientConnection>> handler) {
private void handshakeFailure(Channel ch, Throwable cause, ConnectionListener<HttpClientConnection> listener) {
SSLHandshakeException sslException = new SSLHandshakeException("Failed to create SSL connection");
if (cause != null) {
sslException.initCause(cause);
}
connectFailed(ch, handler, sslException);
connectFailed(ch, listener, sslException);
}

private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
Expand All @@ -246,8 +254,7 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
boolean ssl,
Object endpointMetric,
ContextImpl context,
Channel ch,
Handler<AsyncResult<HttpClientConnection>> handler) {
Channel ch) {
Http1xClientHandler clientHandler = new Http1xClientHandler(
listener,
context,
Expand All @@ -259,19 +266,18 @@ private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
endpointMetric,
client.metrics());
clientHandler.addHandler(conn -> {
handler.handle(Future.succeededFuture(conn));
listener.onConnectSuccess(conn, 1, ch, context, weight, http1Weight, Long.MAX_VALUE);
});
clientHandler.removeHandler(conn -> {
listener.closed(conn);
listener.onClose(conn);
});
ch.pipeline().addLast("handler", clientHandler);
}

private void http2Connected(ConnectionListener<HttpClientConnection> listener,
Object endpointMetric,
ContextImpl context,
Channel ch,
Handler<AsyncResult<HttpClientConnection>> resultHandler) {
Channel ch) {
try {
boolean upgrade;
upgrade = ch.pipeline().get(SslHandler.class) == null && options.isHttp2ClearTextUpgrade();
Expand All @@ -291,26 +297,27 @@ private void http2Connected(ConnectionListener<HttpClientConnection> listener,
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(metric);
}
resultHandler.handle(Future.succeededFuture(conn));
long concurrency = conn.remoteSettings().getMaxConcurrentStreams();
listener.onConnectSuccess(conn, concurrency, ch, context, weight, http2Weight, http2MaxConcurrency);
});
handler.removeHandler(conn -> {
if (metrics != null) {
metrics.endpointDisconnected(endpointMetric, conn.metric());
}
listener.closed(conn);
listener.onClose(conn);
});
} catch (Exception e) {
connectFailed(ch, resultHandler, e);
connectFailed(ch, listener, e);
}
}

private void connectFailed(Channel ch, Handler<AsyncResult<HttpClientConnection>> connectionExceptionHandler, Throwable t) {
private void connectFailed(Channel ch, ConnectionListener<HttpClientConnection> listener, Throwable t) {
if (ch != null) {
try {
ch.close();
} catch (Exception ignore) {
}
}
connectionExceptionHandler.handle(Future.failedFuture(t));
listener.onConnectFailure(t, weight);
}
}
7 changes: 5 additions & 2 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -33,8 +33,8 @@
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.core.http.impl.pool.ConnectionManager;
import io.vertx.core.http.impl.pool.ConnectionPool;
import io.vertx.core.http.impl.pool.ConnectionProvider;
import io.vertx.core.http.impl.pool.PoolOptions;
import io.vertx.core.http.impl.pool.Waiter;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.ContextInternal;
Expand Down Expand Up @@ -160,7 +160,10 @@ public HttpClientImpl(VertxInternal vertx, HttpClientOptions options) {
}

ConnectionProvider<HttpClientConnection> connector = new HttpChannelConnector(this);
Function<SocketAddress, ConnectionPool<HttpClientConnection>> poolFactory = sa -> new HttpConnectionPool(options.getProtocolVersion(), options);
Function<SocketAddress, PoolOptions> poolFactory = sa -> {
long ms = options.getMaxPoolSize() * options.getHttp2MaxPoolSize();
return new PoolOptions().setMaxSize(ms);
};

websocketCM = new ConnectionManager<>(metrics, connector, poolFactory, options.getMaxWaitQueueSize());
httpCM = new ConnectionManager<>(metrics, connector, poolFactory, options.getMaxWaitQueueSize());
Expand Down

0 comments on commit a57fb6b

Please sign in to comment.