diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java index 784f65b89d..c21cd59af1 100755 --- a/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClient.java @@ -266,4 +266,11 @@ public interface AsyncHttpClient extends Closeable { * @return a {@link Future} of type Response */ ListenableFuture executeRequest(RequestBuilder requestBuilder); + + /*** + * Return details about pooled connections. + * + * @return a {@link ClientStats} + */ + ClientStats getClientStats(); } diff --git a/client/src/main/java/org/asynchttpclient/ClientStats.java b/client/src/main/java/org/asynchttpclient/ClientStats.java new file mode 100644 index 0000000000..4bafd7b4c1 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/ClientStats.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * A record class representing the state of an (@link org.asynchttpclient.AsyncHttpClient). + */ +public class ClientStats { + + private final Map statsPerHost; + + public ClientStats(Map statsPerHost) { + this.statsPerHost = Collections.unmodifiableMap(statsPerHost); + } + + /** + * @return A map from hostname to statistics on that host's connections. + * The returned map is an {@link java.util.Collections.UnmodifiableMap}. + */ + public Map getStatsPerHost() { + return statsPerHost; + } + + /** + * @return The sum of {@link #getTotalActiveConnectionCount()} and {@link #getTotalIdleConnectionCount()}, + * a long representing the total number of connections in the connection pool. + */ + public long getTotalConnectionCount() { + return statsPerHost + .values() + .stream() + .mapToLong(HostStats::getHostConnectionCount) + .sum(); + } + + /** + * @return A long representing the number of active connections in the connection pool. + */ + public long getTotalActiveConnectionCount() { + return statsPerHost + .values() + .stream() + .mapToLong(HostStats::getHostActiveConnectionCount) + .sum(); + } + + /** + * @return A long representing the number of idle connections in the connection pool. + */ + public long getTotalIdleConnectionCount() { + return statsPerHost + .values() + .stream() + .mapToLong(HostStats::getHostIdleConnectionCount) + .sum(); + } + + @Override + public String toString() { + return "There are " + getTotalConnectionCount() + + " total connections, " + getTotalActiveConnectionCount() + + " are active and " + getTotalIdleConnectionCount() + " are idle."; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ClientStats that = (ClientStats) o; + return Objects.equals(statsPerHost, that.statsPerHost); + } + + @Override + public int hashCode() { + return Objects.hashCode(statsPerHost); + } +} diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java index db884e55ad..74fd6d26ab 100644 --- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java +++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClient.java @@ -255,6 +255,11 @@ public EventLoopGroup getEventLoopGroup() { return channelManager.getEventLoopGroup(); } + @Override + public ClientStats getClientStats() { + return channelManager.getClientStats(); + } + protected BoundRequestBuilder requestBuilder(String method, String url) { return new BoundRequestBuilder(this, method, config.isDisableUrlEncodingForBoundRequests()).setUrl(url).setSignatureCalculator(signatureCalculator); } diff --git a/client/src/main/java/org/asynchttpclient/HostStats.java b/client/src/main/java/org/asynchttpclient/HostStats.java new file mode 100644 index 0000000000..87d9278820 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/HostStats.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient; + +import java.util.Objects; + +/** + * A record class representing the status of connections to some host. + */ +public class HostStats { + + private final long activeConnectionCount; + private final long idleConnectionCount; + + public HostStats(long activeConnectionCount, + long idleConnectionCount) { + this.activeConnectionCount = activeConnectionCount; + this.idleConnectionCount = idleConnectionCount; + } + + /** + * @return The sum of {@link #getHostActiveConnectionCount()} and {@link #getHostIdleConnectionCount()}, + * a long representing the total number of connections to this host. + */ + public long getHostConnectionCount() { + return activeConnectionCount + idleConnectionCount; + } + + /** + * @return A long representing the number of active connections to the host. + */ + public long getHostActiveConnectionCount() { + return activeConnectionCount; + } + + /** + * @return A long representing the number of idle connections in the connection pool. + */ + public long getHostIdleConnectionCount() { + return idleConnectionCount; + } + + @Override + public String toString() { + return "There are " + getHostConnectionCount() + + " total connections, " + getHostActiveConnectionCount() + + " are active and " + getHostIdleConnectionCount() + " are idle."; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final HostStats hostStats = (HostStats) o; + return activeConnectionCount == hostStats.activeConnectionCount && + idleConnectionCount == hostStats.idleConnectionCount; + } + + @Override + public int hashCode() { + return Objects.hash(activeConnectionCount, idleConnectionCount); + } +} diff --git a/client/src/main/java/org/asynchttpclient/channel/ChannelPool.java b/client/src/main/java/org/asynchttpclient/channel/ChannelPool.java index f8cea67fe6..0d20df349d 100755 --- a/client/src/main/java/org/asynchttpclient/channel/ChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/channel/ChannelPool.java @@ -13,6 +13,8 @@ */ package org.asynchttpclient.channel; +import java.util.Map; + import io.netty.channel.Channel; public interface ChannelPool { @@ -70,4 +72,9 @@ public interface ChannelPool { * @param selector the selector */ void flushPartitions(ChannelPoolPartitionSelector selector); + + /** + * @return The number of idle channels per host. + */ + Map getIdleChannelCountPerHost(); } diff --git a/client/src/main/java/org/asynchttpclient/channel/NoopChannelPool.java b/client/src/main/java/org/asynchttpclient/channel/NoopChannelPool.java index f5b59fab6a..30ec3875e2 100644 --- a/client/src/main/java/org/asynchttpclient/channel/NoopChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/channel/NoopChannelPool.java @@ -13,6 +13,9 @@ */ package org.asynchttpclient.channel; +import java.util.Collections; +import java.util.Map; + import io.netty.channel.Channel; public enum NoopChannelPool implements ChannelPool { @@ -50,4 +53,9 @@ public void flushPartition(Object partitionKey) { @Override public void flushPartitions(ChannelPoolPartitionSelector selector) { } + + @Override + public Map getIdleChannelCountPerHost() { + return Collections.emptyMap(); + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index bdd559ef44..8b45754559 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -40,18 +40,23 @@ import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; -import org.asynchttpclient.AsyncHandler; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.SslEngineFactory; +import org.asynchttpclient.*; import org.asynchttpclient.channel.ChannelPool; import org.asynchttpclient.channel.ChannelPoolPartitioning; import org.asynchttpclient.channel.NoopChannelPool; @@ -488,4 +493,28 @@ public ChannelPool getChannelPool() { public EventLoopGroup getEventLoopGroup() { return eventLoopGroup; } + + public ClientStats getClientStats() { + final Map totalConnectionsPerHost = openChannels + .stream() + .map(Channel::remoteAddress) + .filter(a -> a.getClass() == InetSocketAddress.class) + .map(a -> (InetSocketAddress) a) + .map(InetSocketAddress::getHostName) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + final Map idleConnectionsPerHost = channelPool.getIdleChannelCountPerHost(); + final Map statsPerHost = totalConnectionsPerHost + .entrySet() + .stream() + .collect(Collectors.toMap( + Entry::getKey, + entry -> { + final long totalConnectionCount = entry.getValue(); + final long idleConnectionCount = idleConnectionsPerHost.getOrDefault(entry.getKey(), 0L); + final long activeConnectionCount = totalConnectionCount - idleConnectionCount; + return new HostStats(activeConnectionCount, idleConnectionCount); + } + )); + return new ClientStats(statsPerHost); + } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java index 878db3a290..0a8312bae1 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/DefaultChannelPool.java @@ -21,11 +21,14 @@ import io.netty.util.Timer; import io.netty.util.TimerTask; +import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.channel.ChannelPool; @@ -120,6 +123,10 @@ public boolean takeOwnership() { return owned.compareAndSet(false, true); } + public Channel getChannel() { + return channel; + } + @Override // only depends on channel public boolean equals(Object o) { @@ -356,6 +363,19 @@ public void flushPartitions(ChannelPoolPartitionSelector selector) { } } + @Override + public Map getIdleChannelCountPerHost() { + return partitions + .values() + .stream() + .flatMap(ConcurrentLinkedDeque::stream) + .map(idle -> idle.getChannel().remoteAddress()) + .filter(a -> a.getClass() == InetSocketAddress.class) + .map(a -> (InetSocketAddress) a) + .map(InetSocketAddress::getHostName) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + } + public enum PoolLeaseStrategy { LIFO { public E lease(Deque d) { diff --git a/client/src/test/java/org/asynchttpclient/ClientStatsTest.java b/client/src/test/java/org/asynchttpclient/ClientStatsTest.java new file mode 100644 index 0000000000..10c04d10d1 --- /dev/null +++ b/client/src/test/java/org/asynchttpclient/ClientStatsTest.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2014 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient; + +import static org.asynchttpclient.Dsl.asyncHttpClient; +import static org.asynchttpclient.Dsl.config; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.testng.annotations.Test; + +/** + * Created by grenville on 9/25/16. + */ +public class ClientStatsTest extends AbstractBasicTest { + + private final static String hostname = "localhost"; + + @Test(groups = "standalone") + public void testClientStatus() throws Throwable { + try (final AsyncHttpClient client = asyncHttpClient(config().setKeepAlive(true).setPooledConnectionIdleTimeout(5000))) { + final String url = getTargetUrl(); + + final ClientStats emptyStats = client.getClientStats(); + + assertEquals(emptyStats.toString(), "There are 0 total connections, 0 are active and 0 are idle."); + assertEquals(emptyStats.getTotalActiveConnectionCount(), 0); + assertEquals(emptyStats.getTotalIdleConnectionCount(), 0); + assertEquals(emptyStats.getTotalConnectionCount(), 0); + assertNull(emptyStats.getStatsPerHost().get(hostname)); + + final List> futures = + Stream.generate(() -> client.prepareGet(url).setHeader("LockThread","6").execute()) + .limit(5) + .collect(Collectors.toList()); + + Thread.sleep(2000); + + final ClientStats activeStats = client.getClientStats(); + + assertEquals(activeStats.toString(), "There are 5 total connections, 5 are active and 0 are idle."); + assertEquals(activeStats.getTotalActiveConnectionCount(), 5); + assertEquals(activeStats.getTotalIdleConnectionCount(), 0); + assertEquals(activeStats.getTotalConnectionCount(), 5); + assertEquals(activeStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 5); + + futures.forEach(future -> future.toCompletableFuture().join()); + + Thread.sleep(1000); + + final ClientStats idleStats = client.getClientStats(); + + assertEquals(idleStats.toString(), "There are 5 total connections, 0 are active and 5 are idle."); + assertEquals(idleStats.getTotalActiveConnectionCount(), 0); + assertEquals(idleStats.getTotalIdleConnectionCount(), 5); + assertEquals(idleStats.getTotalConnectionCount(), 5); + assertEquals(idleStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 5); + + // Let's make sure the active count is correct when reusing cached connections. + + final List> repeatedFutures = + Stream.generate(() -> client.prepareGet(url).setHeader("LockThread","6").execute()) + .limit(3) + .collect(Collectors.toList()); + + Thread.sleep(2000); + + final ClientStats activeCachedStats = client.getClientStats(); + + assertEquals(activeCachedStats.toString(), "There are 5 total connections, 3 are active and 2 are idle."); + assertEquals(activeCachedStats.getTotalActiveConnectionCount(), 3); + assertEquals(activeCachedStats.getTotalIdleConnectionCount(), 2); + assertEquals(activeCachedStats.getTotalConnectionCount(), 5); + assertEquals(activeCachedStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 5); + + repeatedFutures.forEach(future -> future.toCompletableFuture().join()); + + Thread.sleep(1000); + + final ClientStats idleCachedStats = client.getClientStats(); + + assertEquals(idleCachedStats.toString(), "There are 3 total connections, 0 are active and 3 are idle."); + assertEquals(idleCachedStats.getTotalActiveConnectionCount(), 0); + assertEquals(idleCachedStats.getTotalIdleConnectionCount(), 3); + assertEquals(idleCachedStats.getTotalConnectionCount(), 3); + assertEquals(idleCachedStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 3); + + Thread.sleep(5000); + + final ClientStats timeoutStats = client.getClientStats(); + + assertEquals(timeoutStats.toString(), "There are 0 total connections, 0 are active and 0 are idle."); + assertEquals(timeoutStats.getTotalActiveConnectionCount(), 0); + assertEquals(timeoutStats.getTotalIdleConnectionCount(), 0); + assertEquals(timeoutStats.getTotalConnectionCount(), 0); + assertNull(timeoutStats.getStatsPerHost().get(hostname)); + } + } + + @Test(groups = "standalone") + public void testClientStatusNoKeepalive() throws Throwable { + try (final AsyncHttpClient client = asyncHttpClient(config().setKeepAlive(false))) { + final String url = getTargetUrl(); + + final ClientStats emptyStats = client.getClientStats(); + + assertEquals(emptyStats.toString(), "There are 0 total connections, 0 are active and 0 are idle."); + assertEquals(emptyStats.getTotalActiveConnectionCount(), 0); + assertEquals(emptyStats.getTotalIdleConnectionCount(), 0); + assertEquals(emptyStats.getTotalConnectionCount(), 0); + assertNull(emptyStats.getStatsPerHost().get(hostname)); + + final List> futures = + Stream.generate(() -> client.prepareGet(url).setHeader("LockThread","6").execute()) + .limit(5) + .collect(Collectors.toList()); + + Thread.sleep(2000); + + final ClientStats activeStats = client.getClientStats(); + + assertEquals(activeStats.toString(), "There are 5 total connections, 5 are active and 0 are idle."); + assertEquals(activeStats.getTotalActiveConnectionCount(), 5); + assertEquals(activeStats.getTotalIdleConnectionCount(), 0); + assertEquals(activeStats.getTotalConnectionCount(), 5); + assertEquals(activeStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 5); + + futures.forEach(future -> future.toCompletableFuture().join()); + + Thread.sleep(1000); + + final ClientStats idleStats = client.getClientStats(); + + assertEquals(idleStats.toString(), "There are 0 total connections, 0 are active and 0 are idle."); + assertEquals(idleStats.getTotalActiveConnectionCount(), 0); + assertEquals(idleStats.getTotalIdleConnectionCount(), 0); + assertEquals(idleStats.getTotalConnectionCount(), 0); + assertNull(idleStats.getStatsPerHost().get(hostname)); + + // Let's make sure the active count is correct when reusing cached connections. + + final List> repeatedFutures = + Stream.generate(() -> client.prepareGet(url).setHeader("LockThread","6").execute()) + .limit(3) + .collect(Collectors.toList()); + + Thread.sleep(2000); + + final ClientStats activeCachedStats = client.getClientStats(); + + assertEquals(activeCachedStats.toString(), "There are 3 total connections, 3 are active and 0 are idle."); + assertEquals(activeCachedStats.getTotalActiveConnectionCount(), 3); + assertEquals(activeCachedStats.getTotalIdleConnectionCount(), 0); + assertEquals(activeCachedStats.getTotalConnectionCount(), 3); + assertEquals(activeCachedStats.getStatsPerHost().get(hostname).getHostConnectionCount(), 3); + + repeatedFutures.forEach(future -> future.toCompletableFuture().join()); + + Thread.sleep(1000); + + final ClientStats idleCachedStats = client.getClientStats(); + + assertEquals(idleCachedStats.toString(), "There are 0 total connections, 0 are active and 0 are idle."); + assertEquals(idleCachedStats.getTotalActiveConnectionCount(), 0); + assertEquals(idleCachedStats.getTotalIdleConnectionCount(), 0); + assertEquals(idleCachedStats.getTotalConnectionCount(), 0); + assertNull(idleCachedStats.getStatsPerHost().get(hostname)); + } + } +} diff --git a/client/src/test/java/org/asynchttpclient/test/EchoHandler.java b/client/src/test/java/org/asynchttpclient/test/EchoHandler.java index ec707ad334..71bb57561c 100644 --- a/client/src/test/java/org/asynchttpclient/test/EchoHandler.java +++ b/client/src/test/java/org/asynchttpclient/test/EchoHandler.java @@ -55,8 +55,9 @@ public void handle(String pathInContext, Request request, HttpServletRequest htt param = e.nextElement().toString(); if (param.startsWith("LockThread")) { + final int sleepTime = httpRequest.getIntHeader(param); try { - Thread.sleep(40 * 1000); + Thread.sleep(sleepTime == -1 ? 40 : sleepTime * 1000); } catch (InterruptedException ex) { } } diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java index 41c083fe51..05ecd3f779 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/BadAsyncHttpClient.java @@ -15,6 +15,7 @@ import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.ClientStats; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; @@ -125,4 +126,9 @@ public ListenableFuture executeRequest(RequestBuilder requestBuilder, Asy public ListenableFuture executeRequest(RequestBuilder requestBuilder) { return null; } + + @Override + public ClientStats getClientStats() { + throw new UnsupportedOperationException(); + } } diff --git a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java index febee33bc3..fc2d5eae94 100644 --- a/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java +++ b/extras/registry/src/test/java/org/asynchttpclient/extras/registry/TestAsyncHttpClient.java @@ -15,6 +15,7 @@ import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.ClientStats; import org.asynchttpclient.AsyncHttpClientConfig; import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.Request; @@ -122,4 +123,8 @@ public ListenableFuture executeRequest(RequestBuilder requestBuilder) return null; } + @Override + public ClientStats getClientStats() { + throw new UnsupportedOperationException(); + } }