Skip to content

Commit

Permalink
Improve remote address handling in TimeoutHolder
Browse files Browse the repository at this point in the history
Motivation:

d86d481 reduced GC with only
generating String from InetSocketAddress.

There’s still room for improvement:
* Less String concatenations when generating message
* Stop reporting « not-connected » when timeout happens before channel
gets connected

Modifications:

 * Use a pooled StringBuilder to concatenate timeout message part
(still, IP String is not optimized as algorithm is very complex)
* Set an unresolved InetSocketAddress when creating TimeoutHolder

Result:

More informative timeout message and less allocations
  • Loading branch information
slandelle committed Apr 24, 2017
1 parent 1b35dbe commit 2ea7c06
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 49 deletions.
Expand Up @@ -114,7 +114,7 @@ public void operationComplete(Future<? super Void> future) throws Exception {
Request request = future.getTargetRequest(); Request request = future.getTargetRequest();
Uri uri = request.getUri(); Uri uri = request.getUri();


timeoutsHolder.initRemoteAddress(remoteAddress); timeoutsHolder.setResolvedRemoteAddress(remoteAddress);


// in case of proxy tunneling, we'll add the SslHandler later, after the CONNECT request // in case of proxy tunneling, we'll add the SslHandler later, after the CONNECT request
if (future.getProxyServer() == null && uri.isSecured()) { if (future.getProxyServer() == null && uri.isSecured()) {
Expand Down
Expand Up @@ -14,6 +14,7 @@
package org.asynchttpclient.netty.request; package org.asynchttpclient.netty.request;


import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT; import static io.netty.handler.codec.http.HttpHeaderNames.EXPECT;
import static java.util.Collections.singletonList;
import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions; import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions;
import static org.asynchttpclient.util.Assertions.assertNotNull; import static org.asynchttpclient.util.Assertions.assertNotNull;
import static org.asynchttpclient.util.AuthenticatorUtils.*; import static org.asynchttpclient.util.AuthenticatorUtils.*;
Expand All @@ -31,6 +32,9 @@
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;


import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
Expand Down Expand Up @@ -230,8 +234,7 @@ private <T> ListenableFuture<T> sendRequestWithOpenChannel(Request request, Prox
} }
} }


TimeoutsHolder timeoutsHolder = scheduleRequestTimeout(future); scheduleRequestTimeout(future, (InetSocketAddress) channel.remoteAddress());
timeoutsHolder.initRemoteAddress((InetSocketAddress) channel.remoteAddress());
future.setChannelState(ChannelState.POOLED); future.setChannelState(ChannelState.POOLED);
future.attachChannel(channel, false); future.attachChannel(channel, false);


Expand Down Expand Up @@ -293,9 +296,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
return future; return future;
} }


scheduleRequestTimeout(future); resolveAddresses(request, proxy, future, asyncHandler)//

RequestHostnameResolver.INSTANCE.resolve(request, proxy, asyncHandler)//
.addListener(new SimpleFutureListener<List<InetSocketAddress>>() { .addListener(new SimpleFutureListener<List<InetSocketAddress>>() {


@Override @Override
Expand All @@ -315,6 +316,37 @@ protected void onFailure(Throwable cause) {


return future; return future;
} }

private <T> Future<List<InetSocketAddress>> resolveAddresses(
Request request,//
ProxyServer proxy,//
NettyResponseFuture<T> future,//
AsyncHandler<T> asyncHandler) {

Uri uri = request.getUri();
final Promise<List<InetSocketAddress>> promise = ImmediateEventExecutor.INSTANCE.newPromise();

if (proxy != null && !proxy.isIgnoredForHost(uri.getHost())) {
int port = uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort();
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(proxy.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, toAsyncHandlerExtensions(asyncHandler));

} else {
int port = uri.getExplicitPort();

if (request.getAddress() != null) {
// bypass resolution
InetSocketAddress inetSocketAddress = new InetSocketAddress(request.getAddress(), port);
return promise.setSuccess(singletonList(inetSocketAddress));

} else {
InetSocketAddress unresolvedRemoteAddress = InetSocketAddress.createUnresolved(uri.getHost(), port);
scheduleRequestTimeout(future, unresolvedRemoteAddress);
return RequestHostnameResolver.INSTANCE.resolve(request.getNameResolver(), unresolvedRemoteAddress, toAsyncHandlerExtensions(asyncHandler));
}
}
}


private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request, AsyncHandler<T> asyncHandler, NettyRequest nettyRequest, ProxyServer proxyServer) { private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request, AsyncHandler<T> asyncHandler, NettyRequest nettyRequest, ProxyServer proxyServer) {


Expand Down Expand Up @@ -396,11 +428,10 @@ private void configureTransferAdapter(AsyncHandler<?> handler, HttpRequest httpR
TransferCompletionHandler.class.cast(handler).headers(h); TransferCompletionHandler.class.cast(handler).headers(h);
} }


private TimeoutsHolder scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture) { private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture, InetSocketAddress originalRemoteAddress) {
nettyResponseFuture.touch(); nettyResponseFuture.touch();
TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config); TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config, originalRemoteAddress);
nettyResponseFuture.setTimeoutsHolder(timeoutsHolder); nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
return timeoutsHolder;
} }


private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) { private void scheduleReadTimeout(NettyResponseFuture<?> nettyResponseFuture) {
Expand Down
Expand Up @@ -18,6 +18,7 @@


import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.util.StringBuilderPool;


public class ReadTimeoutTimerTask extends TimeoutTimerTask { public class ReadTimeoutTimerTask extends TimeoutTimerTask {


Expand Down Expand Up @@ -49,7 +50,9 @@ public void run(Timeout timeout) throws Exception {


if (durationBeforeCurrentReadTimeout <= 0L) { if (durationBeforeCurrentReadTimeout <= 0L) {
// idleConnectTimeout reached // idleConnectTimeout reached
String message = "Read timeout to " + timeoutsHolder.remoteAddress() + " after " + readTimeout + " ms"; StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
appendRemoteAddress(sb);
String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch(); long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
expire(message, durationSinceLastTouch); expire(message, durationSinceLastTouch);
// cancel request timeout sibling // cancel request timeout sibling
Expand Down
Expand Up @@ -18,6 +18,7 @@


import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.request.NettyRequestSender; import org.asynchttpclient.netty.request.NettyRequestSender;
import org.asynchttpclient.util.StringBuilderPool;


public class RequestTimeoutTimerTask extends TimeoutTimerTask { public class RequestTimeoutTimerTask extends TimeoutTimerTask {


Expand All @@ -43,7 +44,9 @@ public void run(Timeout timeout) throws Exception {
if (nettyResponseFuture.isDone()) if (nettyResponseFuture.isDone())
return; return;


String message = "Request timeout to " + timeoutsHolder.remoteAddress() + " after " + requestTimeout + " ms"; StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
appendRemoteAddress(sb);
String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
long age = unpreciseMillisTime() - nettyResponseFuture.getStart(); long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
expire(message, age); expire(message, age);
} }
Expand Down
Expand Up @@ -15,6 +15,7 @@


import io.netty.util.TimerTask; import io.netty.util.TimerTask;


import java.net.InetSocketAddress;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


Expand Down Expand Up @@ -44,12 +45,21 @@ protected void expire(String message, long time) {
} }


/** /**
* When the timeout is cancelled, it could still be referenced for quite some time in the Timer. * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
* Holding a reference to the future might mean holding a reference to the channel, and heavy objects such as SslEngines * channel, and heavy objects such as SslEngines
*/ */
public void clean() { public void clean() {
if (done.compareAndSet(false, true)) { if (done.compareAndSet(false, true)) {
nettyResponseFuture = null; nettyResponseFuture = null;
} }
} }

protected void appendRemoteAddress(StringBuilder sb) {
InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
sb.append(remoteAddress.getHostName());
if (!remoteAddress.isUnresolved()) {
sb.append('/').append(remoteAddress.getAddress().getHostAddress());
}
sb.append(':').append(remoteAddress.getPort());
}
} }
Expand Up @@ -41,7 +41,7 @@ public class TimeoutsHolder {
public volatile Timeout readTimeout; public volatile Timeout readTimeout;
private volatile InetSocketAddress remoteAddress; private volatile InetSocketAddress remoteAddress;


public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config) { public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
this.nettyTimer = nettyTimer; this.nettyTimer = nettyTimer;
this.nettyResponseFuture = nettyResponseFuture; this.nettyResponseFuture = nettyResponseFuture;
this.requestSender = requestSender; this.requestSender = requestSender;
Expand All @@ -65,10 +65,14 @@ public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFutu
} }
} }


public void initRemoteAddress(InetSocketAddress address) { public void setResolvedRemoteAddress(InetSocketAddress address) {
remoteAddress = address; remoteAddress = address;
} }


InetSocketAddress remoteAddress() {
return remoteAddress;
}

public void startReadTimeout() { public void startReadTimeout() {
if (readTimeoutValue != -1) { if (readTimeoutValue != -1) {
startReadTimeout(null); startReadTimeout(null);
Expand Down Expand Up @@ -106,8 +110,4 @@ public void cancel() {
private Timeout newTimeout(TimerTask task, long delay) { private Timeout newTimeout(TimerTask task, long delay) {
return requestSender.isClosed() ? null : nettyTimer.newTimeout(task, delay, TimeUnit.MILLISECONDS); return requestSender.isClosed() ? null : nettyTimer.newTimeout(task, delay, TimeUnit.MILLISECONDS);
} }

String remoteAddress() {
return remoteAddress == null ? "not-connected" : remoteAddress.toString();
}
} }
Expand Up @@ -13,64 +13,42 @@
*/ */
package org.asynchttpclient.resolver; package org.asynchttpclient.resolver;


import static org.asynchttpclient.handler.AsyncHandlerExtensionsUtils.toAsyncHandlerExtensions; import io.netty.resolver.NameResolver;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;


import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.Request;
import org.asynchttpclient.handler.AsyncHandlerExtensions; import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.SimpleFutureListener; import org.asynchttpclient.netty.SimpleFutureListener;
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.uri.Uri;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public enum RequestHostnameResolver { public enum RequestHostnameResolver {


INSTANCE; INSTANCE;


public Future<List<InetSocketAddress>> resolve(Request request, ProxyServer proxy, AsyncHandler<?> asyncHandler) { public Future<List<InetSocketAddress>> resolve(NameResolver<InetAddress> nameResolver, InetSocketAddress unresolvedAddress, AsyncHandlerExtensions asyncHandlerExtensions) {


Uri uri = request.getUri(); final String hostname = unresolvedAddress.getHostName();
final int port = unresolvedAddress.getPort();
final Promise<List<InetSocketAddress>> promise = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<List<InetSocketAddress>> promise = ImmediateEventExecutor.INSTANCE.newPromise();


if (request.getAddress() != null) {
List<InetSocketAddress> resolved = Collections.singletonList(new InetSocketAddress(request.getAddress(), uri.getExplicitPort()));
return promise.setSuccess(resolved);
}

// don't notify on explicit address
final AsyncHandlerExtensions asyncHandlerExtensions = request.getAddress() == null ? toAsyncHandlerExtensions(asyncHandler) : null;
final String name;
final int port;

if (proxy != null && !proxy.isIgnoredForHost(uri.getHost())) {
name = proxy.getHost();
port = uri.isSecured() ? proxy.getSecuredPort() : proxy.getPort();
} else {
name = uri.getHost();
port = uri.getExplicitPort();
}

if (asyncHandlerExtensions != null) { if (asyncHandlerExtensions != null) {
try { try {
asyncHandlerExtensions.onHostnameResolutionAttempt(name); asyncHandlerExtensions.onHostnameResolutionAttempt(hostname);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("onHostnameResolutionAttempt crashed", e); LOGGER.error("onHostnameResolutionAttempt crashed", e);
promise.tryFailure(e); promise.tryFailure(e);
return promise; return promise;
} }
} }


final Future<List<InetAddress>> whenResolved = request.getNameResolver().resolveAll(name); final Future<List<InetAddress>> whenResolved = nameResolver.resolveAll(hostname);


whenResolved.addListener(new SimpleFutureListener<List<InetAddress>>() { whenResolved.addListener(new SimpleFutureListener<List<InetAddress>>() {


Expand All @@ -82,7 +60,7 @@ protected void onSuccess(List<InetAddress> value) throws Exception {
} }
if (asyncHandlerExtensions != null) { if (asyncHandlerExtensions != null) {
try { try {
asyncHandlerExtensions.onHostnameResolutionSuccess(name, socketAddresses); asyncHandlerExtensions.onHostnameResolutionSuccess(hostname, socketAddresses);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("onHostnameResolutionSuccess crashed", e); LOGGER.error("onHostnameResolutionSuccess crashed", e);
promise.tryFailure(e); promise.tryFailure(e);
Expand All @@ -96,7 +74,7 @@ protected void onSuccess(List<InetAddress> value) throws Exception {
protected void onFailure(Throwable t) throws Exception { protected void onFailure(Throwable t) throws Exception {
if (asyncHandlerExtensions != null) { if (asyncHandlerExtensions != null) {
try { try {
asyncHandlerExtensions.onHostnameResolutionFailure(name, t); asyncHandlerExtensions.onHostnameResolutionFailure(hostname, t);
} catch (Exception e) { } catch (Exception e) {
LOGGER.error("onHostnameResolutionFailure crashed", e); LOGGER.error("onHostnameResolutionFailure crashed", e);
promise.tryFailure(e); promise.tryFailure(e);
Expand Down

0 comments on commit 2ea7c06

Please sign in to comment.