Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.asynchttpclient.filter.RequestFilter;
import org.asynchttpclient.handler.resumable.ResumableAsyncHandler;
import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
import org.asynchttpclient.netty.request.NettyRequestSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +44,7 @@ public class DefaultAsyncHttpClient implements AsyncHttpClient {
private final AsyncHttpClientConfig config;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ChannelManager channelManager;
private final ConnectionSemaphore connectionSemaphore;
private final NettyRequestSender requestSender;
private final boolean allowStopNettyTimer;
private final Timer nettyTimer;
Expand Down Expand Up @@ -84,7 +86,8 @@ public DefaultAsyncHttpClient(AsyncHttpClientConfig config) {
nettyTimer = allowStopNettyTimer ? newNettyTimer() : config.getNettyTimer();

channelManager = new ChannelManager(config, nettyTimer);
requestSender = new NettyRequestSender(config, channelManager, nettyTimer, new AsyncHttpClientState(closed));
connectionSemaphore = new ConnectionSemaphore(config);
requestSender = new NettyRequestSender(config, channelManager, connectionSemaphore, nettyTimer, new AsyncHttpClientState(closed));
channelManager.configureBootstraps(requestSender);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime;
import io.netty.channel.Channel;

import java.io.IOException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,8 +32,10 @@
import org.asynchttpclient.Realm;
import org.asynchttpclient.Request;
import org.asynchttpclient.channel.ChannelPoolPartitioning;
import org.asynchttpclient.netty.channel.ChannelManager;
import org.asynchttpclient.netty.channel.ChannelState;
import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.channel.ConnectionSemaphore;
import org.asynchttpclient.netty.request.NettyRequest;
import org.asynchttpclient.netty.timeout.TimeoutsHolder;
import org.asynchttpclient.proxy.ProxyServer;
Expand All @@ -56,6 +59,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {

private final long start = unpreciseMillisTime();
private final ChannelPoolPartitioning connectionPoolPartitioning;
private final ConnectionSemaphore connectionSemaphore;
private final ProxyServer proxyServer;
private final int maxRetry;
private final CompletableFuture<V> future = new CompletableFuture<>();
Expand All @@ -73,6 +77,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
private volatile int onThrowableCalled = 0;
@SuppressWarnings("unused")
private volatile TimeoutsHolder timeoutsHolder;
// partition key, when != null used to release lock in ChannelManager
private volatile Object partitionKeyLock;

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
Expand All @@ -88,6 +94,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> partitionKeyLockField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");

// volatile where we need CAS ops
private volatile int redirectCount = 0;
Expand Down Expand Up @@ -118,16 +125,36 @@ public NettyResponseFuture(Request originalRequest,//
NettyRequest nettyRequest,//
int maxRetry,//
ChannelPoolPartitioning connectionPoolPartitioning,//
ConnectionSemaphore connectionSemaphore,//
ProxyServer proxyServer) {

this.asyncHandler = asyncHandler;
this.targetRequest = currentRequest = originalRequest;
this.nettyRequest = nettyRequest;
this.connectionPoolPartitioning = connectionPoolPartitioning;
this.connectionSemaphore = connectionSemaphore;
this.proxyServer = proxyServer;
this.maxRetry = maxRetry;
}

private void releasePartitionKeyLock() {
Object partitionKey = takePartitionKeyLock();
if (partitionKey != null) {
connectionSemaphore.releaseChannelLock(partitionKey);
}
}

// Take partition key lock object,
// but do not release channel lock.
public Object takePartitionKeyLock() {
// shortcut, much faster than getAndSet
if (partitionKeyLock == null) {
return null;
}

return partitionKeyLockField.getAndSet(this, null);
}

// java.util.concurrent.Future

@Override
Expand All @@ -142,6 +169,7 @@ public boolean isCancelled() {

@Override
public boolean cancel(boolean force) {
releasePartitionKeyLock();
cancelTimeouts();

if (isCancelledField.getAndSet(this, 1) != 0)
Expand Down Expand Up @@ -210,6 +238,7 @@ private V getContent() throws ExecutionException {
// org.asynchttpclient.ListenableFuture

private boolean terminateAndExit() {
releasePartitionKeyLock();
cancelTimeouts();
this.channel = null;
this.reuseChannel = false;
Expand Down Expand Up @@ -454,6 +483,29 @@ public Object getPartitionKey() {
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), proxyServer);
}

public void acquirePartitionLockLazily() throws IOException {
if (partitionKeyLock != null) {
return;
}

Object partitionKey = getPartitionKey();
connectionSemaphore.acquireChannelLock(partitionKey);
Object prevKey = partitionKeyLockField.getAndSet(this, partitionKey);
if (prevKey != null) {
// self-check

connectionSemaphore.releaseChannelLock(prevKey);
releasePartitionKeyLock();

throw new IllegalStateException("Trying to acquire partition lock concurrently. Please report.");
}

if (isDone()) {
// may be cancelled while we acquired a lock
releasePartitionKeyLock();
}
}

public Realm getRealm() {
return realm;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package org.asynchttpclient.netty.channel;

import static io.netty.util.internal.ThrowableUtil.unknownStackTrace;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
Expand All @@ -35,16 +34,13 @@
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AttributeKey;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -61,9 +57,6 @@
import org.asynchttpclient.channel.ChannelPool;
import org.asynchttpclient.channel.ChannelPoolPartitioning;
import org.asynchttpclient.channel.NoopChannelPool;
import org.asynchttpclient.exception.PoolAlreadyClosedException;
import org.asynchttpclient.exception.TooManyConnectionsException;
import org.asynchttpclient.exception.TooManyConnectionsPerHostException;
import org.asynchttpclient.handler.AsyncHandlerExtensions;
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.OnLastHttpContentCallback;
Expand Down Expand Up @@ -93,30 +86,23 @@ public class ChannelManager {
public static final String AHC_WS_HANDLER = "ahc-ws";
public static final String LOGGING_HANDLER = "logging";

private static final AttributeKey<Object> partitionKeyAttr = AttributeKey.valueOf(ChannelManager.class, "partitionKey");

private final AsyncHttpClientConfig config;
private final SslEngineFactory sslEngineFactory;
private final EventLoopGroup eventLoopGroup;
private final boolean allowReleaseEventLoopGroup;
private final Bootstrap httpBootstrap;
private final Bootstrap wsBootstrap;
private final long handshakeTimeout;
private final IOException tooManyConnections;
private final IOException tooManyConnectionsPerHost;

private final ChannelPool channelPool;
private final ChannelGroup openChannels;
private final boolean maxTotalConnectionsEnabled;
private final NonBlockingSemaphoreLike freeChannels;
private final boolean maxConnectionsPerHostEnabled;
private final ConcurrentHashMap<Object, NonBlockingSemaphore> freeChannelsPerHost = new ConcurrentHashMap<>();

private AsyncHttpClientHandler wsHandler;

public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {

this.config = config;

this.sslEngineFactory = config.getSslEngineFactory() != null ? config.getSslEngineFactory() : new DefaultSslEngineFactory();
try {
this.sslEngineFactory.init(config);
Expand All @@ -134,38 +120,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
}
this.channelPool = channelPool;


tooManyConnections = unknownStackTrace(new TooManyConnectionsException(config.getMaxConnections()), ChannelManager.class, "acquireChannelLock");
tooManyConnectionsPerHost = unknownStackTrace(new TooManyConnectionsPerHostException(config.getMaxConnectionsPerHost()), ChannelManager.class, "acquireChannelLock");
maxTotalConnectionsEnabled = config.getMaxConnections() > 0;
maxConnectionsPerHostEnabled = config.getMaxConnectionsPerHost() > 0;

freeChannels = maxTotalConnectionsEnabled ?
new NonBlockingSemaphore(config.getMaxConnections()) :
NonBlockingSemaphoreInfinite.INSTANCE;

if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
@Override
public boolean remove(Object o) {
boolean removed = super.remove(o);
if (removed) {
freeChannels.release();
if (maxConnectionsPerHostEnabled) {
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
if (partitionKey != null) {
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
if (hostFreeChannels != null)
hostFreeChannels.release();
}
}
}
return removed;
}
};
} else {
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
}
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);

handshakeTimeout = config.getHandshakeTimeout();

Expand Down Expand Up @@ -315,10 +270,7 @@ public final void tryToOfferChannelToPool(Channel channel, AsyncHandler<?> async
Channels.setDiscard(channel);
if (asyncHandler instanceof AsyncHandlerExtensions)
AsyncHandlerExtensions.class.cast(asyncHandler).onConnectionOffer(channel);
if (channelPool.offer(channel, partitionKey)) {
if (maxConnectionsPerHostEnabled)
channel.attr(partitionKeyAttr).setIfAbsent(partitionKey);
} else {
if (!channelPool.offer(channel, partitionKey)) {
// rejected by pool
closeChannel(channel);
}
Expand All @@ -337,32 +289,6 @@ public boolean removeAll(Channel connection) {
return channelPool.removeAll(connection);
}

private boolean tryAcquireGlobal() {
return freeChannels.tryAcquire();
}

private NonBlockingSemaphoreLike getFreeConnectionsForHost(Object partitionKey) {
return maxConnectionsPerHostEnabled ?
freeChannelsPerHost.computeIfAbsent(partitionKey, pk -> new NonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
NonBlockingSemaphoreInfinite.INSTANCE;
}

private boolean tryAcquirePerHost(Object partitionKey) {
return getFreeConnectionsForHost(partitionKey).tryAcquire();
}

public void acquireChannelLock(Object partitionKey) throws IOException {
if (!channelPool.isOpen())
throw PoolAlreadyClosedException.INSTANCE;
if (!tryAcquireGlobal())
throw tooManyConnections;
if (!tryAcquirePerHost(partitionKey)) {
freeChannels.release();

throw tooManyConnectionsPerHost;
}
}

private void doClose() {
openChannels.close();
channelPool.destroy();
Expand All @@ -383,16 +309,8 @@ public void closeChannel(Channel channel) {
Channels.silentlyCloseChannel(channel);
}

public void releaseChannelLock(Object partitionKey) {
freeChannels.release();
getFreeConnectionsForHost(partitionKey).release();
}

public void registerOpenChannel(Channel channel, Object partitionKey) {
openChannels.add(channel);
if (maxConnectionsPerHostEnabled) {
channel.attr(partitionKeyAttr).set(partitionKey);
}
}

private HttpClientCodec newHttpClientCodec() {
Expand Down Expand Up @@ -520,4 +438,8 @@ public ClientStats getClientStats() {
));
return new ClientStats(statsPerHost);
}

public boolean isOpen() {
return channelPool.isOpen();
}
}
Loading