Skip to content

Commit

Permalink
Use ChannelId instead of hashcode as key
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Feb 9, 2016
1 parent 376a454 commit 27161cc
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
Expand Up @@ -14,6 +14,8 @@
package org.asynchttpclient.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.DefaultChannelId;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

Expand All @@ -26,6 +28,7 @@ public class Channels {
private static final Logger LOGGER = LoggerFactory.getLogger(Channels.class);

private static final AttributeKey<Object> DEFAULT_ATTRIBUTE = AttributeKey.valueOf("default");
private static final AttributeKey<ChannelId> CHANNEL_ID_ATTRIBUTE = AttributeKey.valueOf("channelId");

public static Object getAttribute(Channel channel) {
Attribute<Object> attr = channel.attr(DEFAULT_ATTRIBUTE);
Expand All @@ -44,6 +47,15 @@ public static boolean isChannelValid(Channel channel) {
return channel != null && channel.isActive();
}

public static ChannelId getChannelId(Channel channel) {
Attribute<ChannelId> attr = channel.attr(CHANNEL_ID_ATTRIBUTE);
return attr != null ? attr.get() : null;
}

public static void initChannelId(Channel channel) {
channel.attr(CHANNEL_ID_ATTRIBUTE).set(new DefaultChannelId());
}

public static void silentlyCloseChannel(Channel channel) {
try {
if (channel != null && channel.isActive())
Expand Down
Expand Up @@ -16,6 +16,7 @@
import static org.asynchttpclient.util.Assertions.assertNotNull;
import static org.asynchttpclient.util.DateUtils.millisTime;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
Expand Down Expand Up @@ -44,11 +45,11 @@ public final class DefaultChannelPool implements ChannelPool {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);

private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, ChannelCreation> channelId2Creation;
private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Timer nettyTimer;
private final int maxConnectionTtl;
private final boolean maxConnectionTtlEnabled;
private final int connectionTtl;
private final boolean connectionTtlEnabled;
private final int maxIdleTime;
private final boolean maxIdleTimeEnabled;
private final long cleanerPeriod;
Expand All @@ -59,28 +60,28 @@ public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer)
hashedWheelTimer);
}

private int channelId(Channel channel) {
return channel.hashCode();
private ChannelId channelId(Channel channel) {
return Channels.getChannelId(channel);
}

private int cleanerPeriod(int ttl) {
return (int) Math.ceil(ttl / 2.0);
}

public DefaultChannelPool(int maxIdleTime,//
int maxConnectionTtl,//
int connectionTtl,//
Timer nettyTimer) {
this.maxIdleTime = (int) maxIdleTime;
this.maxConnectionTtl = maxConnectionTtl;
maxConnectionTtlEnabled = maxConnectionTtl > 0;
channelId2Creation = maxConnectionTtlEnabled ? new ConcurrentHashMap<>() : null;
this.connectionTtl = connectionTtl;
connectionTtlEnabled = connectionTtl > 0;
channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;
this.nettyTimer = nettyTimer;
maxIdleTimeEnabled = maxIdleTime > 0;

// period is half
cleanerPeriod = Math.min(maxConnectionTtlEnabled ? cleanerPeriod(maxConnectionTtl) : Integer.MAX_VALUE, maxIdleTimeEnabled ? cleanerPeriod(maxIdleTime) : Long.MAX_VALUE);
cleanerPeriod = Math.min(connectionTtlEnabled ? cleanerPeriod(connectionTtl) : Integer.MAX_VALUE, maxIdleTimeEnabled ? cleanerPeriod(maxIdleTime) : Long.MAX_VALUE);

if (maxConnectionTtlEnabled || maxIdleTimeEnabled)
if (connectionTtlEnabled || maxIdleTimeEnabled)
scheduleNewIdleChannelDetector(new IdleChannelDetector());
}

Expand Down Expand Up @@ -120,11 +121,11 @@ public int hashCode() {
}

private boolean isTtlExpired(Channel channel, long now) {
if (!maxConnectionTtlEnabled)
if (!connectionTtlEnabled)
return false;

ChannelCreation creation = channelId2Creation.get(channelId(channel));
return creation != null && now - creation.creationTime >= maxConnectionTtl;
return creation != null && now - creation.creationTime >= connectionTtl;
}

private boolean isRemotelyClosed(Channel channel) {
Expand Down Expand Up @@ -213,7 +214,7 @@ public void run(Timeout timeout) throws Exception {
List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));

if (!closedChannels.isEmpty()) {
if (maxConnectionTtlEnabled) {
if (connectionTtlEnabled) {
for (IdleChannel closedChannel : closedChannels)
channelId2Creation.remove(channelId(closedChannel.channel));
}
Expand Down Expand Up @@ -245,7 +246,7 @@ public boolean offer(Channel channel, Object partitionKey) {
return false;

boolean offered = offer0(channel, partitionKey, now);
if (maxConnectionTtlEnabled && offered) {
if (connectionTtlEnabled && offered) {
registerChannelCreation(channel, partitionKey, now);
}

Expand Down Expand Up @@ -293,7 +294,7 @@ else if (isRemotelyClosed(idleChannel.channel)) {
* {@inheritDoc}
*/
public boolean removeAll(Channel channel) {
ChannelCreation creation = maxConnectionTtlEnabled ? channelId2Creation.remove(channelId(channel)) : null;
ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channelId(channel)) : null;
return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(channel);
}

Expand All @@ -317,15 +318,15 @@ public void destroy() {
}

partitions.clear();
if (maxConnectionTtlEnabled) {
if (connectionTtlEnabled) {
channelId2Creation.clear();
}
}

private void close(Channel channel) {
// FIXME pity to have to do this here
Channels.setDiscard(channel);
if (maxConnectionTtlEnabled) {
if (connectionTtlEnabled) {
channelId2Creation.remove(channelId(channel));
}
Channels.silentlyCloseChannel(channel);
Expand Down
Expand Up @@ -25,8 +25,10 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.SimpleChannelFutureListener;
import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.channel.NettyConnectListener;
import org.asynchttpclient.netty.timeout.TimeoutsHolder;

Expand All @@ -37,15 +39,21 @@ public class NettyChannelConnector {
private final List<InetSocketAddress> remoteAddresses;
private final TimeoutsHolder timeoutsHolder;
private final AtomicBoolean closed;
private final boolean connectionTtlEnabled;
private volatile int i = 0;

public NettyChannelConnector(InetAddress localAddress, List<InetSocketAddress> remoteAddresses, AsyncHandler<?> asyncHandler, TimeoutsHolder timeoutsHolder,
AtomicBoolean closed) {
public NettyChannelConnector(InetAddress localAddress,//
List<InetSocketAddress> remoteAddresses,//
AsyncHandler<?> asyncHandler,//
TimeoutsHolder timeoutsHolder,//
AtomicBoolean closed,//
AsyncHttpClientConfig config) {
this.localAddress = localAddress != null ? new InetSocketAddress(localAddress, 0) : null;
this.remoteAddresses = remoteAddresses;
this.asyncHandlerExtensions = toAsyncHandlerExtensions(asyncHandler);
this.timeoutsHolder = assertNotNull(timeoutsHolder, "timeoutsHolder");
this.closed = closed;
this.connectionTtlEnabled = config.getConnectionTtl() > 0;
}

private boolean pickNextRemoteAddress() {
Expand Down Expand Up @@ -81,6 +89,9 @@ public void onSuccess(Channel channel) {
asyncHandlerExtensions.onTcpConnectSuccess(remoteAddress, future.channel());
}
timeoutsHolder.initRemoteAddress(remoteAddress);
if (connectionTtlEnabled) {
Channels.initChannelId(channel);
}
connectListener.onSuccess(channel);
}

Expand Down
Expand Up @@ -277,7 +277,7 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
@Override
protected void onSuccess(List<InetSocketAddress> addresses) {
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, channelPreempted, partitionKey);
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed).connect(bootstrap, connectListener);
new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, future.getTimeoutsHolder(), closed, config).connect(bootstrap, connectListener);
}

@Override
Expand Down

0 comments on commit 27161cc

Please sign in to comment.