Skip to content

Commit

Permalink
Use the proxy again as it's simpler and could be decoupled this way
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 2, 2016
1 parent 7983fb1 commit 540df78
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 47 deletions.
63 changes: 20 additions & 43 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -20,12 +20,14 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -270,67 +272,42 @@ private void doConnect(EventLoop eventLoop, String host, int port, Handler<Async
bootstrap.group(eventLoop); bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class); bootstrap.channel(NioSocketChannel.class);
applyConnectionOptions(options, bootstrap); applyConnectionOptions(options, bootstrap);

if (options.isSsl() && !options.isUseAlpn() && options.getProxyHost() != null) { if (options.isSsl() && !options.isUseAlpn() && options.getProxyHost() != null) {
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
@Override @Override
protected void initChannel(Channel ch) throws Exception { protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
String proxyHost = options.getProxyHost(); String proxyHost = options.getProxyHost();
int proxyPort = options.getProxyPort(); int proxyPort = options.getProxyPort();
log.debug("using proxy: " + proxyHost); String proxyUsername = options.getProxyUsername();
String proxyPassword = options.getProxyPassword();
InetSocketAddress proxyAddr = new InetSocketAddress(proxyHost, proxyPort); InetSocketAddress proxyAddr = new InetSocketAddress(proxyHost, proxyPort);
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, options.getMaxChunkSize(), false, false)); HttpProxyHandler proxy;
if (proxyUsername != null && proxyPassword != null) {
proxy = new HttpProxyHandler(proxyAddr, proxyUsername, proxyPassword);
} else {
proxy = new HttpProxyHandler(proxyAddr);
}
HttpClientCodec codec = new HttpClientCodec(4096, 8192, options.getMaxChunkSize(), false, false);
pipeline.addLast("proxy", proxy);
pipeline.addLast("codec", codec);
pipeline.addLast(new ChannelInboundHandlerAdapter() { pipeline.addLast(new ChannelInboundHandlerAdapter() {
HttpResponseStatus status;
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (msg instanceof HttpResponse) { if (evt instanceof ProxyConnectionEvent) {
status = ((HttpResponse) msg).status(); pipeline.remove(proxy);
} pipeline.remove(codec);
if (msg instanceof LastHttpContent) {
pipeline.remove("codec");
pipeline.remove(this); pipeline.remove(this);
if (status == null) { channelHandler.handle(Future.succeededFuture(ch));
throw new VertxException("");
} else {
int sc = status.code();
if (sc == 200) {
channelHandler.handle(Future.succeededFuture(ch));
} else {
throw new VertxException("Could not connect " + sc);
}
}
} }
} }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
channelHandler.handle(Future.failedFuture(cause));
}
}); });
} }
}); });
AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, options.getProxyPort(), options.getProxyHost(), bootstrap); AsyncResolveBindConnectHelper<ChannelFuture> future = AsyncResolveBindConnectHelper.doConnect(vertx, port, host, bootstrap);
future.addListener(res -> { future.addListener(res -> {
if (res.failed()) { if (res.failed()) {
channelHandler.handle(Future.failedFuture(res.cause())); channelHandler.handle(Future.failedFuture(res.cause()));
} else {
FullHttpRequest req = new DefaultFullHttpRequest(
io.netty.handler.codec.http.HttpVersion.HTTP_1_0, HttpMethod.CONNECT,
host + ':' + port,
Unpooled.EMPTY_BUFFER, false);
req.headers().set(HttpHeaderNames.HOST, options.getProxyHost() + ':' + options.getProxyPort());
String proxyUsername = options.getProxyUsername();
String proxyPassword = options.getProxyPassword();
if (proxyUsername != null && proxyPassword != null) {
String authorization = "Basic " + new String(Base64.getEncoder().encode((proxyUsername + ':' + proxyPassword).getBytes(StandardCharsets.UTF_8)), StandardCharsets.US_ASCII);
req.headers().set(HttpHeaderNames.PROXY_AUTHORIZATION, authorization);
}
res.result().channel().writeAndFlush(req).addListener(f -> {
if (!f.isSuccess()) {
// Handle this case
}
});
} }
}); });
} else { } else {
Expand Down Expand Up @@ -427,7 +404,7 @@ public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeRespons
} }
} else { } else {
applyHttp1xConnectionOptions(pipeline, context); applyHttp1xConnectionOptions(pipeline, context);
} }
} }


// //
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/vertx/test/core/ConnectHttpProxy.java
Expand Up @@ -85,13 +85,13 @@ public void start(Vertx vertx, Handler<Void> finishedHandler) {
NetSocket serverSocket = request.netSocket(); NetSocket serverSocket = request.netSocket();
NetClientOptions netOptions = new NetClientOptions(); NetClientOptions netOptions = new NetClientOptions();
NetClient netClient = vertx.createNetClient(netOptions); NetClient netClient = vertx.createNetClient(netOptions);
log.debug("connecting to " + host + ":" + port); System.out.println("connecting to " + host + ":" + port);
netClient.connect(port, host, result -> { netClient.connect(port, host, result -> {
if (result.succeeded()) { if (result.succeeded()) {
log.debug("connected"); System.out.println("connected");
NetSocket clientSocket = result.result(); NetSocket clientSocket = result.result();
serverSocket.write("HTTP/1.0 200 Connection established\n\n"); serverSocket.write("HTTP/1.0 200 Connection established\n\n");
log.debug("starting pumps"); System.out.println("starting pumps");
serverSocket.closeHandler(v -> clientSocket.close()); serverSocket.closeHandler(v -> clientSocket.close());
clientSocket.closeHandler(v -> serverSocket.close()); clientSocket.closeHandler(v -> serverSocket.close());
Pump.pump(serverSocket, clientSocket).start(); Pump.pump(serverSocket, clientSocket).start();
Expand All @@ -103,7 +103,7 @@ public void start(Vertx vertx, Handler<Void> finishedHandler) {
} }
}); });
server.listen(server -> { server.listen(server -> {
log.debug("proxy server started"); System.out.println("proxy server started");
finishedHandler.handle(null); finishedHandler.handle(null);
}); });
} }
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/vertx/test/core/HttpTLSTest.java
Expand Up @@ -471,6 +471,7 @@ void run(boolean shouldPass) {
}); });
req.exceptionHandler(t -> { req.exceptionHandler(t -> {
if (shouldPass) { if (shouldPass) {
t.printStackTrace();
HttpTLSTest.this.fail("Should not throw exception"); HttpTLSTest.this.fail("Should not throw exception");
} else { } else {
testComplete(); testComplete();
Expand Down

0 comments on commit 540df78

Please sign in to comment.