Skip to content

Commit

Permalink
Introduce connection factory for connection manager
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 2, 2016
1 parent 951133b commit 6547b21
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
Expand All @@ -37,6 +38,8 @@
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.http.HttpClientOptions;
Expand Down Expand Up @@ -247,6 +250,27 @@ public synchronized void connectionClosed() {
}
}

private void doConnect(EventLoop eventLoop, String host, int port, Handler<AsyncResult<Channel>> channelHandler) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
applyConnectionOptions(options, bootstrap);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// Nothing to do but as we handle this in the channelHandler callback
}
});
AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener(res -> {
if (res.succeeded()) {
channelHandler.handle(Future.succeededFuture(res.result().channel()));
} else {
channelHandler.handle(Future.failedFuture(res.cause()));
}
});
}

protected void internalConnect(HttpVersion version, String host, int port, Waiter waiter) {
ContextImpl context;
if (waiter.context == null) {
Expand All @@ -255,13 +279,14 @@ protected void internalConnect(HttpVersion version, String host, int port, Waite
} else {
context = waiter.context;
}
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.nettyEventLoop());
bootstrap.channel(NioSocketChannel.class);
sslHelper.validate(vertx);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {

doConnect(context.nettyEventLoop(), host, port, res -> {

if (res.succeeded()) {
Channel ch = res.result();

// Configure pipeline
ChannelPipeline pipeline = ch.pipeline();
boolean useAlpn = options.isUseAlpn();
if (useAlpn) {
Expand All @@ -278,10 +303,6 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
fallbackToHttp1x(ch, context, HttpVersion.HTTP_1_0, port, host, waiter);
}
}
@Override
protected void handshakeFailure(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ConnQueue.this.handshakeFailure(context, ch, cause, waiter);
}
});
} else {
if (options.isSsl()) {
Expand Down Expand Up @@ -328,26 +349,23 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
applyHttp1xConnectionOptions(pipeline, context);
}
}
}
});
applyConnectionOptions(options, bootstrap);
AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener(res -> {
if (res.succeeded()) {
Channel ch = res.result().channel();
if (!options.isUseAlpn()) {
if (options.isSsl()) {
// TCP connected, so now we must do the SSL handshake
SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
io.netty.util.concurrent.Future<Channel> fut = sslHandler.handshakeFuture();
fut.addListener(fut2 -> {
if (fut2.isSuccess()) {

//
if (options.isSsl()) {
// TCP connected, so now we must do the SSL handshake
SslHandler sslHandler = ch.pipeline().get(SslHandler.class);
io.netty.util.concurrent.Future<Channel> fut = sslHandler.handshakeFuture();
fut.addListener(fut2 -> {
if (fut2.isSuccess()) {
if (!options.isUseAlpn()) {
http1xConnected(version, context, port, host, ch, waiter);
} else {
handshakeFailure(context, ch, fut2.cause(), waiter);
}
});
} else {
} else {
handshakeFailure(context, ch, fut2.cause(), waiter);
}
});
} else {
if (!options.isUseAlpn()) {
if (ch.pipeline().get(HttpClientUpgradeHandler.class) != null) {
// Upgrade handler do nothing
} else {
Expand Down

0 comments on commit 6547b21

Please sign in to comment.