Skip to content

Commit

Permalink
Add server-side lookup throttling (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Mar 6, 2017
1 parent 92b79d0 commit 3345d24
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 38 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -102,6 +102,9 @@ tlsAllowInsecureConnection=false
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=10000

### --- Authentication --- ###

# Enable authentication
Expand Down
Expand Up @@ -88,6 +88,9 @@ public class ServiceConfiguration implements PulsarConfiguration{
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;
// Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
@FieldContext(dynamic = true)
private int maxConcurrentLookupRequest = 10000;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -415,6 +418,14 @@ public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer)
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}

public int getMaxConcurrentLookupRequest() {
return maxConcurrentLookupRequest;
}

public void setMaxConcurrentLookupRequest(int maxConcurrentLookupRequest) {
this.maxConcurrentLookupRequest = maxConcurrentLookupRequest;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
Expand Up @@ -67,6 +67,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
@Suspended AsyncResponse asyncResponse) {
dest = Codec.decode(dest);
DestinationName topic = DestinationName.get("persistent", property, cluster, namespace, dest);

if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

try {
validateClusterOwnership(topic.getCluster());
Expand All @@ -75,12 +81,12 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
} catch (WebApplicationException we) {
// Validation checks failed
log.error("Validation check failed: {}", we.getMessage());
asyncResponse.resume(we);
completeLookupResponseExceptionally(asyncResponse, we);
return;
} catch (Throwable t) {
// Validation checks failed with unknown error
log.error("Validation check failed: {}", t.getMessage(), t);
asyncResponse.resume(new RestException(t));
completeLookupResponseExceptionally(asyncResponse, new RestException(t));
return;
}

Expand All @@ -90,7 +96,7 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
lookupFuture.thenAccept(result -> {
if (result == null) {
log.warn("No broker was found available for topic {}", topic);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

Expand All @@ -105,24 +111,24 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
topic.getLookupName(), newAuthoritative));
} catch (URISyntaxException e) {
log.error("Error in preparing redirect url for {}: {}", topic, e.getMessage(), e);
asyncResponse.resume(e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topic, redirect);
}
asyncResponse.resume(new WebApplicationException(Response.temporaryRedirect(redirect).build()));
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topic, result.getLookupData());
}
asyncResponse.resume(result.getLookupData());
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topic, exception.getMessage(), exception);
asyncResponse.resume(exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});

Expand Down Expand Up @@ -236,6 +242,16 @@ public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pu

return lookupfuture;
}

private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(t);
}

private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
asyncResponse.resume(lookupData);
}

private static final Logger log = LoggerFactory.getLogger(DestinationLookup.class);
}
Expand Up @@ -34,7 +34,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -128,6 +130,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AuthorizationManager authorizationManager = null;
private final ScheduledExecutorService statsUpdater;
private final ScheduledExecutorService backlogQuotaChecker;

protected final AtomicReference<Semaphore> lookupRequestSemaphore;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -206,7 +210,10 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
}
};
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), true));

PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}
Expand Down Expand Up @@ -619,6 +626,10 @@ public List<Metrics> getDestinationMetrics() {
public Map<String, NamespaceBundleStats> getBundleStats() {
return pulsarStats.getBundleStats();
}

public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
Expand Down Expand Up @@ -841,7 +852,7 @@ public Map<String, PersistentTopicStats> getTopicStats() {
public AuthenticationService getAuthenticationService() {
return authenticationService;
}

public List<PersistentTopic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
}
Expand All @@ -857,7 +868,10 @@ public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
private void updateConfigurationAndRegisterListeners() {
// update ServiceConfiguration value by reading zk-configuration-map
updateDynamicServiceConfiguration();
//add more listeners here
// add listener on "maxConcurrentLookupRequest" value change
registerConfigurationListener("maxConcurrentLookupRequest",
(pendingLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) pendingLookupRequest, true)));
// add more listeners here
}

/**
Expand Down
Expand Up @@ -124,4 +124,4 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return PulsarApi.ServerError.UnknownError;
}
}
}
}
Expand Up @@ -159,15 +159,28 @@ protected void handleLookup(CommandLookupTopic lookup) {
}
final long requestId = lookup.getRequestId();
final String topic = lookup.getTopic();
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).thenAccept(lookupResponse -> {
ctx.writeAndFlush(lookupResponse);
}).exceptionally(ex -> {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(),
getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
// it should never happen
log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex);
ctx.writeAndFlush(
newLookupResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed lookup due to too many lookup-requets {}", remoteAddress, topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}

}

@Override
Expand All @@ -177,24 +190,36 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
final long requestId = partitionMetadata.getRequestId();
final String topic = partitionMetadata.getTopic();
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.thenAccept(metadata -> {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topic,
ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
return null;
});
if (service.getLookupRequestSemaphore().tryAcquire()) {
getPartitionedTopicMetadata(getBrokerService().pulsar(), getRole(), DestinationName.get(topic))
.handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
} else {
if (ex instanceof PulsarClientException) {
log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress,
topic, ex.getMessage());
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
ex.getMessage(), requestId));
} else {
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topic,
ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
ex.getMessage(), requestId));
}
}
service.getLookupRequestSemaphore().release();
return null;
});
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requets {}", remoteAddress,
topic);
}
ctx.writeAndFlush(newLookupResponse(ServerError.TooManyRequests,
"Failed due to too many pending lookup requests", requestId));
}
}

@Override
Expand Down Expand Up @@ -543,7 +568,6 @@ protected void handleProducer(final CommandProducer cmdProducer) {
});
}


@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
checkArgument(state == State.Connected);
Expand Down
Expand Up @@ -27,6 +27,8 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
Expand Down Expand Up @@ -108,6 +110,7 @@ public void setUp() throws Exception {
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(auth).when(brokerService).getAuthorizationManager();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
}

@Test
Expand All @@ -134,6 +137,35 @@ public void crossColoLookup() throws Exception {
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
}


@Test
public void testNotEnoughLookupPermits() throws Exception {

BrokerService brokerService = pulsar.getBrokerService();
doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore();

DestinationLookup destLookup = spy(new DestinationLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();

AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupDestinationAsync("myprop", "usc", "ns2", "topic1", false, asyncResponse1);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
assertEquals(arg.getValue().getClass(), WebApplicationException.class);
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode());
}

@Test
public void testValidateReplicationSettingsOnNamespace() throws Exception {
Expand Down

0 comments on commit 3345d24

Please sign in to comment.