Skip to content

Commit

Permalink
Don't try to write proxified request until SslHandler has handshaked, c…
Browse files Browse the repository at this point in the history
…lose #1559

Motivation:

We're trying to write the proxyfied request as soon as we've installed the SslHandler.
Behavior seems broken and handshake times out for large request payloads.

Modification:

Delay request until Sslhandler has handshaked.

Result:

No more handshake timeouts. Note that we can still have remotely closed exceptions if remote peer closed sockets while we're uploading.
  • Loading branch information
slandelle committed Jul 10, 2018
1 parent 6e4c1a6 commit 3737fb2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
Expand Up @@ -334,13 +334,18 @@ private SslHandler createSslHandler(String peerHost, int peerPort) {
return sslHandler; return sslHandler;
} }


public void updatePipelineForHttpTunneling(ChannelPipeline pipeline, Uri requestUri) { public Future<Channel> updatePipelineForHttpTunneling(ChannelPipeline pipeline, Uri requestUri) {

Future<Channel> whenHanshaked = null;

if (pipeline.get(HTTP_CLIENT_CODEC) != null) if (pipeline.get(HTTP_CLIENT_CODEC) != null)
pipeline.remove(HTTP_CLIENT_CODEC); pipeline.remove(HTTP_CLIENT_CODEC);


if (requestUri.isSecured()) { if (requestUri.isSecured()) {
if (!isSslHandlerConfigured(pipeline)) { if (!isSslHandlerConfigured(pipeline)) {
pipeline.addBefore(AHC_HTTP_HANDLER, SSL_HANDLER, createSslHandler(requestUri.getHost(), requestUri.getExplicitPort())); SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort());
whenHanshaked = sslHandler.handshakeFuture();
pipeline.addBefore(AHC_HTTP_HANDLER, SSL_HANDLER, sslHandler);
} }
pipeline.addAfter(SSL_HANDLER, HTTP_CLIENT_CODEC, newHttpClientCodec()); pipeline.addAfter(SSL_HANDLER, HTTP_CLIENT_CODEC, newHttpClientCodec());


Expand All @@ -352,6 +357,7 @@ public void updatePipelineForHttpTunneling(ChannelPipeline pipeline, Uri request
pipeline.addAfter(AHC_HTTP_HANDLER, AHC_WS_HANDLER, wsHandler); pipeline.addAfter(AHC_HTTP_HANDLER, AHC_WS_HANDLER, wsHandler);
pipeline.remove(AHC_HTTP_HANDLER); pipeline.remove(AHC_HTTP_HANDLER);
} }
return whenHanshaked;
} }


public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtualHost, boolean hasSocksProxyHandler) { public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtualHost, boolean hasSocksProxyHandler) {
Expand Down
Expand Up @@ -14,6 +14,7 @@
package org.asynchttpclient.netty.handler.intercept; package org.asynchttpclient.netty.handler.intercept;


import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import org.asynchttpclient.Request; import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
Expand Down Expand Up @@ -47,10 +48,16 @@ public boolean exitAfterHandlingConnect(Channel channel,
Uri requestUri = request.getUri(); Uri requestUri = request.getUri();
LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, requestUri.getScheme()); LOGGER.debug("Connecting to proxy {} for scheme {}", proxyServer, requestUri.getScheme());


channelManager.updatePipelineForHttpTunneling(channel.pipeline(), requestUri); Future<Channel> whenHandshaked = channelManager.updatePipelineForHttpTunneling(channel.pipeline(), requestUri);

future.setReuseChannel(true); future.setReuseChannel(true);
future.setConnectAllowed(false); future.setConnectAllowed(false);
requestSender.drainChannelAndExecuteNextRequest(channel, future, new RequestBuilder(future.getTargetRequest()).build()); Request targetRequest = new RequestBuilder(future.getTargetRequest()).build();
if (whenHandshaked == null) {
requestSender.drainChannelAndExecuteNextRequest(channel, future, targetRequest);
} else {
requestSender.drainChannelAndExecuteNextRequest(channel, future, targetRequest, whenHandshaked);
}


return true; return true;
} }
Expand Down
Expand Up @@ -460,7 +460,7 @@ private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) {


public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) { public void abort(Channel channel, NettyResponseFuture<?> future, Throwable t) {


if (channel != null) { if (channel != null && channel.isActive()) {
channelManager.closeChannel(channel); channelManager.closeChannel(channel);
} }


Expand Down Expand Up @@ -604,7 +604,8 @@ public boolean isClosed() {
return clientState.isClosed(); return clientState.isClosed();
} }


public void drainChannelAndExecuteNextRequest(final Channel channel, final NettyResponseFuture<?> future, public void drainChannelAndExecuteNextRequest(final Channel channel,
final NettyResponseFuture<?> future,
Request nextRequest) { Request nextRequest) {
Channels.setAttribute(channel, new OnLastHttpContentCallback(future) { Channels.setAttribute(channel, new OnLastHttpContentCallback(future) {
@Override @Override
Expand All @@ -613,4 +614,24 @@ public void call() {
} }
}); });
} }

public void drainChannelAndExecuteNextRequest(final Channel channel,
final NettyResponseFuture<?> future,
Request nextRequest,
Future<Channel> whenHandshaked) {
Channels.setAttribute(channel, new OnLastHttpContentCallback(future) {
@Override
public void call() {
whenHandshaked.addListener(f -> {
if (f.isSuccess()) {
sendNextRequest(nextRequest, future);
} else {
future.abort(f.cause());
}
}
);
}
});
}

} }

0 comments on commit 3737fb2

Please sign in to comment.