Skip to content

Commit

Permalink
Add client-side lookup throttling (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Feb 7, 2017
1 parent 9f42936 commit 295577c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 11 deletions.
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.fail;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -30,13 +31,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.admin.BrokerStats;
import com.yahoo.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -705,4 +706,27 @@ public void testTlsAuthUseTrustCert() throws Exception {
}
}

/**
* Verifies: client side throttling.
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
final String topicName = "persistent://prop/usw/my-ns/newTopic";

com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setConcurrentLookupRequest(0);
String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf);

try {
Consumer consumer = pulsarClient.subscribe(topicName, "mysub", new ConsumerConfiguration());
fail("It should fail as throttling should not receive any request");
} catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyLookupRequestException e) {
// ok as throttling set to 0
}
}

}
Expand Up @@ -48,6 +48,7 @@ public class ClientConfiguration implements Serializable {
private boolean useTls = false;
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private int concurrentLookupRequest = 5000;

/**
* @return the authentication provider to be used
Expand Down Expand Up @@ -309,4 +310,24 @@ public long getStatsIntervalSeconds() {
public void setStatsInterval(long statsInterval, TimeUnit unit) {
this.statsIntervalSeconds = unit.toSeconds(statsInterval);
}

/**
* Get configured total allowed concurrent lookup-request.
*
* @return
*/
public int getConcurrentLookupRequest() {
return concurrentLookupRequest;
}

/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
public void setConcurrentLookupRequest(int concurrentLookupRequest) {
this.concurrentLookupRequest = concurrentLookupRequest;
}
}
Expand Up @@ -60,6 +60,12 @@ public LookupException(String msg) {
}
}

public static class TooManyLookupRequestException extends LookupException {
public TooManyLookupRequestException(String msg) {
super(msg);
}
}

public static class ConnectException extends PulsarClientException {
public ConnectException(String msg) {
super(msg);
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
Expand Down Expand Up @@ -59,13 +60,16 @@ public class ClientCnx extends PulsarHandler {
private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);

private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final Semaphore pendingLookupRequestSemaphore;

enum State {
None, SentConnectFrame, Ready
}

public ClientCnx(PulsarClientImpl pulsarClient) {
super(30, TimeUnit.SECONDS);
this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(),
true);
authentication = pulsarClient.getConfiguration().getAuthentication();
state = State.None;
}
Expand Down Expand Up @@ -105,7 +109,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// Fail out all the pending ops
pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingLookupRequests.forEach((key, future) -> getAndRemovePendingLookupRequest(key).completeExceptionally(e));

// Notify all attached producers/consumers so they have a chance to reconnect
producers.forEach((id, producer) -> producer.connectionClosed(this));
Expand Down Expand Up @@ -202,7 +206,7 @@ protected void handleLookupResponse(CommandLookupTopicResponse lookupResult) {
log.info("Received Broker lookup response: {}", lookupResult.getResponse());

long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);

if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
Expand Down Expand Up @@ -230,7 +234,7 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
log.info("Received Broker Partition response: {}", lookupResult.getPartitions());

long requestId = lookupResult.getRequestId();
CompletableFuture<LookupDataResult> requestFuture = pendingLookupRequests.remove(requestId);
CompletableFuture<LookupDataResult> requestFuture = getAndRemovePendingLookupRequest(requestId);

if (requestFuture != null) {
// Complete future with exception if : Result.response=fail/null
Expand All @@ -251,6 +255,22 @@ protected void handlePartitionResponse(CommandPartitionedTopicMetadataResponse l
}
}

private boolean addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
if (pendingLookupRequestSemaphore.tryAcquire()) {
pendingLookupRequests.put(requestId, future);
return true;
}
return false;
}

private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
CompletableFuture<LookupDataResult> result = pendingLookupRequests.remove(requestId);
if (result != null) {
pendingLookupRequestSemaphore.release();
}
return result;
}

@Override
protected void handleSendError(CommandSendError sendError) {
log.warn("{} Received send error from server: {}", ctx.channel(), sendError);
Expand Down Expand Up @@ -312,13 +332,22 @@ protected boolean isHandshakeCompleted() {

CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
pendingLookupRequests.put(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request to broker: {}", ctx.channel(), writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());

if (addPendingLookupRequests(requestId, future)) {
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());
}
});
} else {
if (log.isDebugEnabled()) {
log.debug("{} Failed to add lookup-request into pending queue", requestId);
}
});
future.completeExceptionally(new PulsarClientException.TooManyLookupRequestException(
"Failed due to too many pending lookup requests"));
}
return future;
}

Expand Down

0 comments on commit 295577c

Please sign in to comment.