Skip to content

Commit

Permalink
Expose flushPartition on AsyncHttpClient, close #1375
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Mar 28, 2017
1 parent eff9e93 commit fc7024c
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
8 changes: 8 additions & 0 deletions client/src/main/java/org/asynchttpclient/AsyncHttpClient.java
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.util.concurrent.Future;
import java.util.function.Predicate;

/**
* This class support asynchronous and synchronous HTTP request.
Expand Down Expand Up @@ -273,4 +274,11 @@ public interface AsyncHttpClient extends Closeable {
* @return a {@link ClientStats}
*/
ClientStats getClientStats();

/**
* Flush ChannelPool partitions based on a predicate
*
* @param predicate the predicate
*/
void flushChannelPoolPartitions(Predicate<Object> predicate);
}
Expand Up @@ -22,6 +22,7 @@
import io.netty.util.Timer;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import org.asynchttpclient.channel.ChannelPool;
import org.asynchttpclient.filter.FilterContext;
Expand Down Expand Up @@ -259,6 +260,11 @@ public EventLoopGroup getEventLoopGroup() {
public ClientStats getClientStats() {
return channelManager.getClientStats();
}

@Override
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
channelManager.flushChannelPoolPartitions(predicate);
}

protected BoundRequestBuilder requestBuilder(String method, String url) {
return new BoundRequestBuilder(this, method, config.isDisableUrlEncodingForBoundRequests()).setUrl(url).setSignatureCalculator(signatureCalculator);
Expand Down
13 changes: 3 additions & 10 deletions client/src/main/java/org/asynchttpclient/channel/ChannelPool.java
Expand Up @@ -61,18 +61,11 @@ public interface ChannelPool {
void destroy();

/**
* Flush a partition
* Flush partitions based on a predicate
*
* @param partitionKey the partition
* @param predicate the predicate
*/
void flushPartition(Object partitionKey);

/**
* Flush partitions based on a selector
*
* @param selector the selector
*/
void flushPartitions(Predicate<Object> selector);
void flushPartitions(Predicate<Object> predicate);

/**
* @return The number of idle channels per host.
Expand Down
Expand Up @@ -48,11 +48,7 @@ public void destroy() {
}

@Override
public void flushPartition(Object partitionKey) {
}

@Override
public void flushPartitions(Predicate<Object> selector) {
public void flushPartitions(Predicate<Object> predicate) {
}

@Override
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import javax.net.ssl.SSLEngine;
Expand Down Expand Up @@ -521,4 +522,8 @@ public ClientStats getClientStats() {
));
return new ClientStats(statsPerHost);
}

public void flushChannelPoolPartitions(Predicate<Object> predicate) {
channelPool.flushPartitions(predicate);
}
}
Expand Up @@ -353,16 +353,10 @@ private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChann
}

@Override
public void flushPartition(Object partitionKey) {
flushPartition(partitionKey, partitions.get(partitionKey));
}

@Override
public void flushPartitions(Predicate<Object> selector) {

public void flushPartitions(Predicate<Object> predicate) {
for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
Object partitionKey = partitionsEntry.getKey();
if (selector.test(partitionKey))
if (predicate.test(partitionKey))
flushPartition(partitionKey, partitionsEntry.getValue());
}
}
Expand Down
Expand Up @@ -12,6 +12,8 @@
*/
package org.asynchttpclient.extras.registry;

import java.util.function.Predicate;

import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
Expand Down Expand Up @@ -131,4 +133,9 @@ public ListenableFuture<Response> executeRequest(RequestBuilder requestBuilder)
public ClientStats getClientStats() {
throw new UnsupportedOperationException();
}

@Override
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -12,6 +12,8 @@
*/
package org.asynchttpclient.extras.registry;

import java.util.function.Predicate;

import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
Expand Down Expand Up @@ -127,4 +129,9 @@ public ListenableFuture<Response> executeRequest(RequestBuilder requestBuilder)
public ClientStats getClientStats() {
throw new UnsupportedOperationException();
}

@Override
public void flushChannelPoolPartitions(Predicate<Object> predicate) {
throw new UnsupportedOperationException();
}
}

0 comments on commit fc7024c

Please sign in to comment.