Skip to content

Commit

Permalink
Revert "Move the pipeline configuration in the pool themselves and do…
Browse files Browse the repository at this point in the history
… it when the connection is provided to the pool"

This reverts commit b9397e0.
  • Loading branch information
vietj committed Oct 7, 2016
1 parent b6ace9d commit ca1032a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
35 changes: 34 additions & 1 deletion src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -28,11 +28,14 @@
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler; import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.Http2Exception; import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult; import io.vertx.core.AsyncResult;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.http.ConnectionPoolTooBusyException;
Expand Down Expand Up @@ -75,6 +78,7 @@ public class ConnectionManager {
private final boolean pipelining; private final boolean pipelining;
private final int maxWaitQueueSize; private final int maxWaitQueueSize;
private final int http2MaxConcurrency; private final int http2MaxConcurrency;
private final boolean logEnabled;
private final ChannelConnector connector; private final ChannelConnector connector;
private final HttpClientMetrics metrics; private final HttpClientMetrics metrics;


Expand All @@ -87,6 +91,7 @@ public class ConnectionManager {
this.pipelining = client.getOptions().isPipelining(); this.pipelining = client.getOptions().isPipelining();
this.maxWaitQueueSize = client.getOptions().getMaxWaitQueueSize(); this.maxWaitQueueSize = client.getOptions().getMaxWaitQueueSize();
this.http2MaxConcurrency = options.getHttp2MultiplexingLimit() < 1 ? Integer.MAX_VALUE : options.getHttp2MultiplexingLimit(); this.http2MaxConcurrency = options.getHttp2MultiplexingLimit() < 1 ? Integer.MAX_VALUE : options.getHttp2MultiplexingLimit();
this.logEnabled = client.getOptions().getLogActivity();
this.connector = new ChannelConnector(); this.connector = new ChannelConnector();
this.metrics = metrics; this.metrics = metrics;
} }
Expand Down Expand Up @@ -168,7 +173,7 @@ public class ConnQueue {
this.mgr = mgr; this.mgr = mgr;
if (version == HttpVersion.HTTP_2) { if (version == HttpVersion.HTTP_2) {
maxSize = options.getHttp2MaxPoolSize(); maxSize = options.getHttp2MaxPoolSize();
pool = (Pool)new Http2Pool(this, client, ConnectionManager.this.metrics, mgr.connectionMap, http2MaxConcurrency, options.getHttp2MaxPoolSize(), options.getHttp2ConnectionWindowSize()); pool = (Pool)new Http2Pool(this, client, ConnectionManager.this.metrics, mgr.connectionMap, http2MaxConcurrency, logEnabled, options.getHttp2MaxPoolSize(), options.getHttp2ConnectionWindowSize());
} else { } else {
maxSize = options.getMaxPoolSize(); maxSize = options.getMaxPoolSize();
pool = (Pool)new Http1xPool(client, ConnectionManager.this.metrics, options, this, mgr.connectionMap, version, options.getMaxPoolSize()); pool = (Pool)new Http1xPool(client, ConnectionManager.this.metrics, options, this, mgr.connectionMap, version, options.getMaxPoolSize());
Expand Down Expand Up @@ -396,8 +401,10 @@ protected void connect(
@Override @Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) { if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
applyHttp2ConnectionOptions(pipeline);
queue.http2Connected(context, ch, waiter, false); queue.http2Connected(context, ch, waiter, false);
} else { } else {
applyHttp1xConnectionOptions(queue, ch.pipeline(), context);
HttpVersion fallbackProtocol = ApplicationProtocolNames.HTTP_1_1.equals(protocol) ? HttpVersion fallbackProtocol = ApplicationProtocolNames.HTTP_1_1.equals(protocol) ?
HttpVersion.HTTP_1_1 : HttpVersion.HTTP_1_0; HttpVersion.HTTP_1_1 : HttpVersion.HTTP_1_0;
queue.fallbackToHttp1x(ch, context, fallbackProtocol, port, host, waiter); queue.fallbackToHttp1x(ch, context, fallbackProtocol, port, host, waiter);
Expand Down Expand Up @@ -430,19 +437,25 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
p.remove(httpCodec); p.remove(httpCodec);
p.remove(this); p.remove(this);
// Upgrade handler will remove itself // Upgrade handler will remove itself
applyHttp1xConnectionOptions(queue, ch.pipeline(), context);
queue.fallbackToHttp1x(ch, context, HttpVersion.HTTP_1_1, port, host, waiter); queue.fallbackToHttp1x(ch, context, HttpVersion.HTTP_1_1, port, host, waiter);
} }
} }
} }
VertxHttp2ClientUpgradeCodec upgradeCodec = new VertxHttp2ClientUpgradeCodec(client.getOptions().getInitialSettings()) { VertxHttp2ClientUpgradeCodec upgradeCodec = new VertxHttp2ClientUpgradeCodec(client.getOptions().getInitialSettings()) {
@Override @Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception { public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
applyHttp2ConnectionOptions(pipeline);
queue.http2Connected(context, ch, waiter, true); queue.http2Connected(context, ch, waiter, true);
} }
}; };
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536); HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
ch.pipeline().addLast(httpCodec, upgradeHandler, new UpgradeRequestHandler()); ch.pipeline().addLast(httpCodec, upgradeHandler, new UpgradeRequestHandler());
} else {
applyHttp2ConnectionOptions(pipeline);
} }
} else {
applyHttp1xConnectionOptions(queue, pipeline, context);
} }
} }
}; };
Expand Down Expand Up @@ -508,5 +521,25 @@ void applyConnectionOptions(HttpClientOptions options, Bootstrap bootstrap) {
bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress()); bootstrap.option(ChannelOption.SO_REUSEADDR, options.isReuseAddress());
} }

void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout()));
}
}

void applyHttp1xConnectionOptions(ConnQueue queue, ChannelPipeline pipeline, ContextImpl context) {
if (logEnabled) {
pipeline.addLast("logging", new LoggingHandler());
}
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, options.getMaxChunkSize(), false, false));
if (options.isTryUseCompression()) {
pipeline.addLast("inflater", new HttpContentDecompressor(true));
}
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout()));
}
pipeline.addLast("handler", new ClientHandler(pipeline.channel(), context, (Map)queue.mgr.connectionMap));
}
} }
} }
32 changes: 1 addition & 31 deletions src/main/java/io/vertx/core/http/impl/Http1xPool.java
Expand Up @@ -17,11 +17,6 @@
package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
Expand All @@ -48,18 +43,12 @@ public class Http1xPool implements ConnectionManager.Pool<ClientConnection> {
private final boolean keepAlive; private final boolean keepAlive;
private final int pipeliningLimit; private final int pipeliningLimit;
private final boolean ssl; private final boolean ssl;
private final boolean logEnabled;
private final int maxChunkSize;
private final boolean tryUseCompression;
private final int idleTimeout;
private final HttpVersion version; private final HttpVersion version;
private final Set<ClientConnection> allConnections = new HashSet<>(); private final Set<ClientConnection> allConnections = new HashSet<>();
private final Queue<ClientConnection> availableConnections = new ArrayDeque<>(); private final Queue<ClientConnection> availableConnections = new ArrayDeque<>();
private final int maxSockets; private final int maxSockets;


public Http1xPool(HttpClientImpl client, public Http1xPool(HttpClientImpl client, HttpClientMetrics metrics, HttpClientOptions options, ConnectionManager.ConnQueue queue,
HttpClientMetrics metrics,
HttpClientOptions options, ConnectionManager.ConnQueue queue,
Map<Channel, HttpClientConnection> connectionMap, HttpVersion version, int maxSockets) { Map<Channel, HttpClientConnection> connectionMap, HttpVersion version, int maxSockets) {
this.queue = queue; this.queue = queue;
this.version = version; this.version = version;
Expand All @@ -71,10 +60,6 @@ public Http1xPool(HttpClientImpl client,
this.ssl = options.isSsl(); this.ssl = options.isSsl();
this.connectionMap = connectionMap; this.connectionMap = connectionMap;
this.maxSockets = maxSockets; this.maxSockets = maxSockets;
this.logEnabled = client.getOptions().getLogActivity();
this.maxChunkSize = client.getOptions().getMaxChunkSize();
this.tryUseCompression = client.getOptions().isTryUseCompression();
this.idleTimeout = client.getOptions().getIdleTimeout();
} }


@Override @Override
Expand Down Expand Up @@ -132,22 +117,7 @@ void responseEnded(ClientConnection conn, boolean close) {
} }
} }


private void applyHttp1xConnectionOptions(ChannelPipeline pipeline, ContextImpl context) {
if (logEnabled) {
pipeline.addLast("logging", new LoggingHandler());
}
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize, false, false));
if (tryUseCompression) {
pipeline.addLast("inflater", new HttpContentDecompressor(true));
}
if (idleTimeout > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, idleTimeout));
}
pipeline.addLast("handler", new ClientHandler(pipeline.channel(), context, (Map)connectionMap));
}

void createConn(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) { void createConn(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
applyHttp1xConnectionOptions(ch.pipeline(), context);
ClientConnection conn = new ClientConnection(version, client, queue.metric, ch, ClientConnection conn = new ClientConnection(version, client, queue.metric, ch,
ssl, host, port, context, this, metrics); ssl, host, port, context, this, metrics);
metrics.endpointConnected(queue.metric, conn.metric()); metrics.endpointConnected(queue.metric, conn.metric());
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -47,20 +47,18 @@ class Http2Pool implements ConnectionManager.Pool<Http2ClientConnection> {
final boolean logEnabled; final boolean logEnabled;
final int maxSockets; final int maxSockets;
final int windowSize; final int windowSize;
int idleTimeout;


public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client, HttpClientMetrics metrics, public Http2Pool(ConnectionManager.ConnQueue queue, HttpClientImpl client, HttpClientMetrics metrics,
Map<Channel, ? super Http2ClientConnection> connectionMap, Map<Channel, ? super Http2ClientConnection> connectionMap,
int maxConcurrency, int maxSize, int windowSize) { int maxConcurrency, boolean logEnabled, int maxSize, int windowSize) {
this.queue = queue; this.queue = queue;
this.client = client; this.client = client;
this.metrics = metrics; this.metrics = metrics;
this.connectionMap = connectionMap; this.connectionMap = connectionMap;
this.maxConcurrency = maxConcurrency; this.maxConcurrency = maxConcurrency;
this.logEnabled = client.getOptions().getLogActivity(); this.logEnabled = logEnabled;
this.maxSockets = maxSize; this.maxSockets = maxSize;
this.windowSize = windowSize; this.windowSize = windowSize;
this.idleTimeout = client.getOptions().getIdleTimeout();
} }


@Override @Override
Expand Down Expand Up @@ -89,9 +87,6 @@ public Http2ClientConnection pollConnection() {
void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) throws Http2Exception { void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) throws Http2Exception {
ChannelPipeline p = ch.pipeline(); ChannelPipeline p = ch.pipeline();
synchronized (queue) { synchronized (queue) {
if (idleTimeout > 0) {
p.addLast("idle", new IdleStateHandler(0, 0, idleTimeout));
}
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>() VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>()
.connectionMap(connectionMap) .connectionMap(connectionMap)
.server(false) .server(false)
Expand Down

0 comments on commit ca1032a

Please sign in to comment.