diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/Channels.java b/client/src/main/java/org/asynchttpclient/netty/channel/Channels.java index 67adaa08f0..5be13db179 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/Channels.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/Channels.java @@ -14,6 +14,8 @@ package org.asynchttpclient.netty.channel; import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import io.netty.channel.DefaultChannelId; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -26,6 +28,7 @@ public class Channels { private static final Logger LOGGER = LoggerFactory.getLogger(Channels.class); private static final AttributeKey DEFAULT_ATTRIBUTE = AttributeKey.valueOf("default"); + private static final AttributeKey CHANNEL_ID_ATTRIBUTE = AttributeKey.valueOf("channelId"); public static Object getAttribute(Channel channel) { Attribute attr = channel.attr(DEFAULT_ATTRIBUTE); @@ -44,6 +47,15 @@ public static boolean isChannelValid(Channel channel) { return channel != null && channel.isActive(); } + public static ChannelId getChannelId(Channel channel) { + Attribute attr = channel.attr(CHANNEL_ID_ATTRIBUTE); + return attr != null ? attr.get() : null; + } + + public static void initChannelId(Channel channel) { + channel.attr(CHANNEL_ID_ATTRIBUTE).set(new DefaultChannelId()); + } + public static void silentlyCloseChannel(Channel channel) { try { if (channel != null && channel.isActive()) diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java index 186a583a16..a36a388bfd 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java @@ -16,6 +16,7 @@ import static org.asynchttpclient.util.Assertions.assertNotNull; import static org.asynchttpclient.util.DateUtils.millisTime; import io.netty.channel.Channel; +import io.netty.channel.ChannelId; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; @@ -44,11 +45,11 @@ public final class DefaultChannelPool implements ChannelPool { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class); private final ConcurrentHashMap> partitions = new ConcurrentHashMap<>(); - private final ConcurrentHashMap channelId2Creation; + private final ConcurrentHashMap channelId2Creation; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Timer nettyTimer; - private final int maxConnectionTtl; - private final boolean maxConnectionTtlEnabled; + private final int connectionTtl; + private final boolean connectionTtlEnabled; private final int maxIdleTime; private final boolean maxIdleTimeEnabled; private final long cleanerPeriod; @@ -59,8 +60,8 @@ public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) hashedWheelTimer); } - private int channelId(Channel channel) { - return channel.hashCode(); + private ChannelId channelId(Channel channel) { + return Channels.getChannelId(channel); } private int cleanerPeriod(int ttl) { @@ -68,19 +69,19 @@ private int cleanerPeriod(int ttl) { } public DefaultChannelPool(int maxIdleTime,// - int maxConnectionTtl,// + int connectionTtl,// Timer nettyTimer) { this.maxIdleTime = (int) maxIdleTime; - this.maxConnectionTtl = maxConnectionTtl; - maxConnectionTtlEnabled = maxConnectionTtl > 0; - channelId2Creation = maxConnectionTtlEnabled ? new ConcurrentHashMap<>() : null; + this.connectionTtl = connectionTtl; + connectionTtlEnabled = connectionTtl > 0; + channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null; this.nettyTimer = nettyTimer; maxIdleTimeEnabled = maxIdleTime > 0; // period is half - cleanerPeriod = Math.min(maxConnectionTtlEnabled ? cleanerPeriod(maxConnectionTtl) : Integer.MAX_VALUE, maxIdleTimeEnabled ? cleanerPeriod(maxIdleTime) : Long.MAX_VALUE); + cleanerPeriod = Math.min(connectionTtlEnabled ? cleanerPeriod(connectionTtl) : Integer.MAX_VALUE, maxIdleTimeEnabled ? cleanerPeriod(maxIdleTime) : Long.MAX_VALUE); - if (maxConnectionTtlEnabled || maxIdleTimeEnabled) + if (connectionTtlEnabled || maxIdleTimeEnabled) scheduleNewIdleChannelDetector(new IdleChannelDetector()); } @@ -120,11 +121,11 @@ public int hashCode() { } private boolean isTtlExpired(Channel channel, long now) { - if (!maxConnectionTtlEnabled) + if (!connectionTtlEnabled) return false; ChannelCreation creation = channelId2Creation.get(channelId(channel)); - return creation != null && now - creation.creationTime >= maxConnectionTtl; + return creation != null && now - creation.creationTime >= connectionTtl; } private boolean isRemotelyClosed(Channel channel) { @@ -213,7 +214,7 @@ public void run(Timeout timeout) throws Exception { List closedChannels = closeChannels(expiredChannels(partition, start)); if (!closedChannels.isEmpty()) { - if (maxConnectionTtlEnabled) { + if (connectionTtlEnabled) { for (IdleChannel closedChannel : closedChannels) channelId2Creation.remove(channelId(closedChannel.channel)); } @@ -245,7 +246,7 @@ public boolean offer(Channel channel, Object partitionKey) { return false; boolean offered = offer0(channel, partitionKey, now); - if (maxConnectionTtlEnabled && offered) { + if (connectionTtlEnabled && offered) { registerChannelCreation(channel, partitionKey, now); } @@ -293,7 +294,7 @@ else if (isRemotelyClosed(idleChannel.channel)) { * {@inheritDoc} */ public boolean removeAll(Channel channel) { - ChannelCreation creation = maxConnectionTtlEnabled ? channelId2Creation.remove(channelId(channel)) : null; + ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channelId(channel)) : null; return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(channel); } @@ -317,7 +318,7 @@ public void destroy() { } partitions.clear(); - if (maxConnectionTtlEnabled) { + if (connectionTtlEnabled) { channelId2Creation.clear(); } } @@ -325,7 +326,7 @@ public void destroy() { private void close(Channel channel) { // FIXME pity to have to do this here Channels.setDiscard(channel); - if (maxConnectionTtlEnabled) { + if (connectionTtlEnabled) { channelId2Creation.remove(channelId(channel)); } Channels.silentlyCloseChannel(channel); diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java index db1fb829e9..e2292fcf0c 100644 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyChannelConnector.java @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.asynchttpclient.AsyncHandler; +import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.handler.AsyncHandlerExtensions; import org.asynchttpclient.netty.SimpleChannelFutureListener; +import org.asynchttpclient.netty.channel.Channels; import org.asynchttpclient.netty.channel.NettyConnectListener; import org.asynchttpclient.netty.timeout.TimeoutsHolder; @@ -37,15 +39,21 @@ public class NettyChannelConnector { private final List remoteAddresses; private final TimeoutsHolder timeoutsHolder; private final AtomicBoolean closed; + private final boolean connectionTtlEnabled; private volatile int i = 0; - public NettyChannelConnector(InetAddress localAddress, List remoteAddresses, AsyncHandler asyncHandler, TimeoutsHolder timeoutsHolder, - AtomicBoolean closed) { + public NettyChannelConnector(InetAddress localAddress,// + List remoteAddresses,// + AsyncHandler asyncHandler,// + TimeoutsHolder timeoutsHolder,// + AtomicBoolean closed,// + AsyncHttpClientConfig config) { this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null; this.remoteAddresses = remoteAddresses; this.asyncHandlerExtensions = toAsyncHandlerExtensions(asyncHandler); this.timeoutsHolder = assertNotNull(timeoutsHolder, "timeoutsHolder"); this.closed = closed; + this.connectionTtlEnabled = config.getConnectionTtl() > 0; } private boolean pickNextRemoteAddress() { @@ -81,6 +89,9 @@ public void onSuccess(Channel channel) { asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, future.channel()); } timeoutsHolder.initRemoteAddress(remoteAddress); + if (connectionTtlEnabled) { + Channels.initChannelId(channel); + } connectListener.onSuccess(channel); } diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java index 0c3d2068a6..6cbb9ee98b 100755 --- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java +++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java @@ -277,7 +277,7 @@ private ListenableFuture sendRequestWithNewChannel(// @Override protected void onSuccess(List addresses) { NettyConnectListener connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey); - new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed).connect(bootstrap, connectListener); + new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, connectListener); } @Override