Skip to content

Commit

Permalink
cleanerPeriod should be half
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Feb 9, 2016
1 parent f47fa7b commit 91ebeba
Showing 1 changed file with 65 additions and 59 deletions.
Expand Up @@ -13,7 +13,7 @@
*/
package org.asynchttpclient.netty.channel;

import static org.asynchttpclient.util.Assertions.*;
import static org.asynchttpclient.util.Assertions.assertNotNull;
import static org.asynchttpclient.util.DateUtils.millisTime;
import io.netty.channel.Channel;
import io.netty.util.Timeout;
Expand All @@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -37,22 +37,20 @@
import org.slf4j.LoggerFactory;

/**
* A simple implementation of
* {@link ChannelPool} based on a
* {@link java.util.concurrent.ConcurrentHashMap}
* A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}
*/
public final class DefaultChannelPool implements ChannelPool {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);

private final ConcurrentHashMap<Object, ConcurrentLinkedQueue<IdleChannel>> partitions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, ChannelCreation> channelId2Creation = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, ChannelCreation> channelId2Creation;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Timer nettyTimer;
private final int maxConnectionTtl;
private final boolean maxConnectionTtlDisabled;
private final long maxIdleTime;
private final boolean maxIdleTimeDisabled;
private final boolean maxConnectionTtlEnabled;
private final int maxIdleTime;
private final boolean maxIdleTimeEnabled;
private final long cleanerPeriod;

public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {
Expand All @@ -65,18 +63,24 @@ private int channelId(Channel channel) {
return channel.hashCode();
}

public DefaultChannelPool(long maxIdleTime,//
private int cleanerPeriod(int ttl) {
return (int) Math.ceil(ttl / 2.0);
}

public DefaultChannelPool(int maxIdleTime,//
int maxConnectionTtl,//
Timer nettyTimer) {
this.maxIdleTime = maxIdleTime;
this.maxIdleTime = (int) maxIdleTime;
this.maxConnectionTtl = maxConnectionTtl;
maxConnectionTtlDisabled = maxConnectionTtl <= 0;
maxConnectionTtlEnabled = maxConnectionTtl > 0;
channelId2Creation = maxConnectionTtlEnabled ? new ConcurrentHashMap<>() : null;
this.nettyTimer = nettyTimer;
maxIdleTimeDisabled = maxIdleTime <= 0;
maxIdleTimeEnabled = maxIdleTime > 0;

cleanerPeriod = Math.min(maxConnectionTtlDisabled ? Long.MAX_VALUE : maxConnectionTtl, maxIdleTimeDisabled ? Long.MAX_VALUE : maxIdleTime);
// period is half
cleanerPeriod = Math.min(maxConnectionTtlEnabled ? cleanerPeriod(maxConnectionTtl) : Integer.MAX_VALUE, maxIdleTimeEnabled ? cleanerPeriod(maxIdleTime) : Long.MAX_VALUE);

if (!maxConnectionTtlDisabled || !maxIdleTimeDisabled)
if (maxConnectionTtlEnabled || maxIdleTimeEnabled)
scheduleNewIdleChannelDetector(new IdleChannelDetector());
}

Expand Down Expand Up @@ -116,7 +120,7 @@ public int hashCode() {
}

private boolean isTtlExpired(Channel channel, long now) {
if (maxConnectionTtlDisabled)
if (!maxConnectionTtlEnabled)
return false;

ChannelCreation creation = channelId2Creation.get(channelId(channel));
Expand All @@ -130,14 +134,14 @@ private boolean isRemotelyClosed(Channel channel) {
private final class IdleChannelDetector implements TimerTask {

private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {
return !maxIdleTimeDisabled && now - idleChannel.start >= maxIdleTime;
return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;
}

private List<IdleChannel> expiredChannels(ConcurrentLinkedQueue<IdleChannel> partition, long now) {
private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {
// lazy create
List<IdleChannel> idleTimeoutChannels = null;
for (IdleChannel idleChannel : partition) {
if (isTtlExpired(idleChannel.channel, now) || isIdleTimeoutExpired(idleChannel, now) || isRemotelyClosed(idleChannel.channel)) {
if (isIdleTimeoutExpired(idleChannel, now) || isRemotelyClosed(idleChannel.channel) || isTtlExpired(idleChannel.channel, now)) {
LOGGER.debug("Adding Candidate expired Channel {}", idleChannel.channel);
if (idleTimeoutChannels == null)
idleTimeoutChannels = new ArrayList<>();
Expand All @@ -153,7 +157,7 @@ private boolean isChannelCloseable(Channel channel) {
if (attribute instanceof NettyResponseFuture) {
NettyResponseFuture<?> future = (NettyResponseFuture<?>) attribute;
if (!future.isDone()) {
LOGGER.error("Future not in appropriate state %s, not closing", future);
LOGGER.error("Future not in appropriate state {}, not closing", future);
return false;
}
}
Expand Down Expand Up @@ -190,40 +194,38 @@ public void run(Timeout timeout) throws Exception {
if (isClosed.get())
return;

try {
if (LOGGER.isDebugEnabled())
for (Object key: partitions.keySet()) {
LOGGER.debug("Entry count for : {} : {}", key, partitions.get(key).size());
}
if (LOGGER.isDebugEnabled())
for (Object key : partitions.keySet()) {
LOGGER.debug("Entry count for : {} : {}", key, partitions.get(key).size());
}

long start = millisTime();
int closedCount = 0;
int totalCount = 0;
long start = millisTime();
int closedCount = 0;
int totalCount = 0;

for (ConcurrentLinkedQueue<IdleChannel> partition : partitions.values()) {
for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {

// store in intermediate unsynchronized lists to minimize
// the impact on the ConcurrentLinkedQueue
if (LOGGER.isDebugEnabled())
totalCount += partition.size();
// store in intermediate unsynchronized lists to minimize
// the impact on the ConcurrentLinkedDeque
if (LOGGER.isDebugEnabled())
totalCount += partition.size();

List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));
List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));

if (!closedChannels.isEmpty()) {
if (!closedChannels.isEmpty()) {
if (maxConnectionTtlEnabled) {
for (IdleChannel closedChannel : closedChannels)
channelId2Creation.remove(channelId(closedChannel.channel));

partition.removeAll(closedChannels);
closedCount += closedChannels.size();
}

partition.removeAll(closedChannels);
closedCount += closedChannels.size();
}
}

if (LOGGER.isDebugEnabled()) {
long duration = millisTime() - start;

LOGGER.debug("Closed {} connections out of {} in {}ms", closedCount, totalCount, duration);

} catch (Throwable t) {
LOGGER.error("uncaught exception!", t);
}

scheduleNewIdleChannelDetector(timeout.task());
Expand All @@ -242,38 +244,38 @@ public boolean offer(Channel channel, Object partitionKey) {
if (isTtlExpired(channel, now))
return false;

boolean offered = offer0(channel, partitionKey,now);
if (offered) {
boolean offered = offer0(channel, partitionKey, now);
if (maxConnectionTtlEnabled && offered) {
registerChannelCreation(channel, partitionKey, now);
}

return offered;
}

private boolean offer0(Channel channel, Object partitionKey, long now) {
ConcurrentLinkedQueue<IdleChannel> partition = partitions.get(partitionKey);
ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
if (partition == null) {
partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedQueue<>());
partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());
}
return partition.add(new IdleChannel(channel, now));
return partition.offerFirst(new IdleChannel(channel, now));
}

private void registerChannelCreation(Channel channel, Object partitionKey, long now) {
if (channelId2Creation.containsKey(partitionKey)) {
channelId2Creation.putIfAbsent(channelId(channel), new ChannelCreation(now, partitionKey));
}
}

/**
* {@inheritDoc}
*/
public Channel poll(Object partitionKey) {

IdleChannel idleChannel = null;
ConcurrentLinkedQueue<IdleChannel> partition = partitions.get(partitionKey);
ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
if (partition != null) {
while (idleChannel == null) {
idleChannel = partition.poll();
idleChannel = partition.pollFirst();

if (idleChannel == null)
// pool is empty
Expand All @@ -291,7 +293,7 @@ else if (isRemotelyClosed(idleChannel.channel)) {
* {@inheritDoc}
*/
public boolean removeAll(Channel channel) {
ChannelCreation creation = channelId2Creation.remove(channelId(channel));
ChannelCreation creation = maxConnectionTtlEnabled ? channelId2Creation.remove(channelId(channel)) : null;
return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(channel);
}

Expand All @@ -309,23 +311,27 @@ public void destroy() {
if (isClosed.getAndSet(true))
return;

for (ConcurrentLinkedQueue<IdleChannel> partition : partitions.values()) {
for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {
for (IdleChannel idleChannel : partition)
close(idleChannel.channel);
}

partitions.clear();
channelId2Creation.clear();
if (maxConnectionTtlEnabled) {
channelId2Creation.clear();
}
}

private void close(Channel channel) {
// FIXME pity to have to do this here
Channels.setDiscard(channel);
channelId2Creation.remove(channelId(channel));
if (maxConnectionTtlEnabled) {
channelId2Creation.remove(channelId(channel));
}
Channels.silentlyCloseChannel(channel);
}

private void flushPartition(Object partitionKey, ConcurrentLinkedQueue<IdleChannel> partition) {
private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {
if (partition != null) {
partitions.remove(partitionKey);
for (IdleChannel idleChannel : partition)
Expand All @@ -341,7 +347,7 @@ public void flushPartition(Object partitionKey) {
@Override
public void flushPartitions(ChannelPoolPartitionSelector selector) {

for (Map.Entry<Object, ConcurrentLinkedQueue<IdleChannel>> partitionsEntry : partitions.entrySet()) {
for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
Object partitionKey = partitionsEntry.getKey();
if (selector.select(partitionKey))
flushPartition(partitionKey, partitionsEntry.getValue());
Expand Down

0 comments on commit 91ebeba

Please sign in to comment.