Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-3 : Introduce message-dispatch rate limiting #634

Merged
merged 2 commits into from
Aug 14, 2017

Conversation

rdhabalia
Copy link
Contributor

Motivation

As discussed in PIP, adding message-rate limiting to throttle message-dispatching.

Modifications

Result

Broker can throttle message-dispatching at topic level.

@rdhabalia rdhabalia added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature labels Aug 3, 2017
@rdhabalia rdhabalia added this to the 1.20.0-incubating milestone Aug 3, 2017
@rdhabalia rdhabalia self-assigned this Aug 3, 2017
@rdhabalia rdhabalia changed the title Introduce message-dispatch rate limiting PIP-3 : Introduce message-dispatch rate limiting Aug 3, 2017
@rdhabalia
Copy link
Contributor Author

@merlimat as per your suggestion in #402, I have added rate-limiting by msg-rate and byte-rate. I will update the doc once you get a chance to review it.

@rdhabalia rdhabalia requested a review from merlimat August 8, 2017 00:36
@rdhabalia rdhabalia force-pushed the dispatch branch 2 times, most recently from 6100afd to 1c1c031 Compare August 8, 2017 06:22
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall approach looks good to me

conf/broker.conf Outdated
@@ -106,6 +106,14 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchRatePerTopicInMsg=0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include "throttling" in the var name? Like dispatchThrottlingRatePerTopicInMsg

@@ -245,6 +245,19 @@ protected Policies getNamespacePolicies(String property, String cluster, String
}
}

protected LocalPolicies getNamespaceLocalPolicies(String property, String cluster, String namespace) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we configuring the throttling policies on the global zk?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least from an API perspective, it should be very clear what is the scope of the configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we configuring the throttling policies on the global zk?

Actually, every cluster has different traffic for the namespace and we may want to control enabling or throttling-limit per cluster level. Therefore, kept it in local-policies and not in global-zk.

Copy link
Contributor

@merlimat merlimat Aug 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, but it can be quite confusing for a user given that other methods are changing the settings globally.

I think it could either be explicitely set for a particular cluster, or for all the clusters. In order to clarify the scope. Eg:

{
 "rateLimit" : { 
     "dispatchRatePerTopicInMsg" : 1000.0,
      "dispatchRatePerTopicInBytes" : 1000000.0,
     "clusters" : {
          "small-cluster" : {
               "dispatchRatePerTopicInMsg" : 10.0,
               "dispatchRatePerTopicInBytes" : 1000.0,
          },
         "large-cluster" : {
               "dispatchRatePerTopicInMsg" : 1000000.0,
               "dispatchRatePerTopicInBytes" : 1000000000.0,
          }
     } 
 }
}

The advantage is that you can also see all the limit in a single place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. But I think we introduced local-policies to keep cluster-level policy information such as BundleData and it keeps global-policies clean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though BundleData is different. It is not configuration. It is written back, by the brokers themselves, into the local ZK because we want each cluster to be able to split the bundles independently.

For rate limiting, we are talking about proper configuration.

@@ -810,6 +811,42 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
}
}

@PUT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this POST rather than PUT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have updated with POST but, REST standard suggests PUT for resource-update and POST for create.

multiLayerTopicsMap.forEach((namespace, bundle) -> {
bundle.forEach((name, topics) -> {
topics.forEach((topicName, topic) -> {
if (topic instanceof PersistentTopic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just iterate on the flat topics map?

@@ -575,5 +578,31 @@ private void clearUnAckedMsgs(Consumer consumer) {
subscription.addUnAckedMessages(-unaAckedMsgs);
}

public static class SendMessageInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this information used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At dispatcher, we want to know how many messages/bytes we actually sent to manage the permits. Therefore, at here in consumer, we exactly know how many msgs/bytes is sent and return it to dispatcher. Earlier, consumer.sendMessages() was returning Pair<ChannelPromise, Integer> and we wanted to add one more variable so, created SendMessageInfo entity to combine all information.

} catch (Exception e) {
log.warn("Failed to get message-rate for {}", this.topic, e);
}
DispatchRate dispatchRate = brokerService.pulsar().getConfiguration().getDispatchRatePerTopicInMsg() > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, if both limits are set, both of them should be applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that could just be done inside DispatchRate class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that could just be done inside DispatchRate class

Actually, using admin-api, we always define DispatchRateType=msgRate/byteRate and throttling-value in DispatchRate class.
However, for dynamic configuration we can't store complex-type so, created two variables dispatchRatePerTopicInMsg and dispatchRatePerTopicInByte, therefore, we need this logic when we have value set for both configuration.

Ideally, if both limits are set, both of them should be applied.

This will require 2 RateLimiter objects for every topic which we use across all the subscriptions. Do you think considering both limits would be useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require 2 rate limiters only if both limits are set. Otherwise one of them, or both, will be null.

}

@Override
public int hashCode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless hashCode() is used in a very critical place, just use Objects.hashCode(var1, var2)

}

@Override
public boolean equals(Object obj) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Objects.equals(a, obj.a) && Objects.equals(b, obj.b)

}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Objects.toString(this);

😄

* acquire 10 permits at any time with in that 1 second.
*
*/
public class RateLimiter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this class different from Guava rate limiter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have started with Guava-RateLimiter but ended up with custom one due to 2 main reasons:
(1) achieve per-second rate-limiting and (2) custom is faster compare to guava. I have added both Explanation here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I found out the gist with the explanation and it makes sense. Can you include the full explanation here as well? Also, would it make sense to have the 1sec period to be configurable as well? Eg: only rate limit if it's exceeding the rate for 10 seconds instead of 1.

@rdhabalia rdhabalia force-pushed the dispatch branch 7 times, most recently from dbd93c9 to bda64ac Compare August 10, 2017 04:51
read messages based on available-message-rate-permits

add admin command

add message-rate + byte-rate options for dispatch-rate limiting

policies on global-zk + combined msg/byte rate + reset rateLimiter-timeUnit
@rdhabalia
Copy link
Contributor Author

@merlimat I addressed all the comments.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just few final comments


}

public synchronized void shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement AutoCloseable and change this into close() for consistency

conf/broker.conf Outdated
@@ -106,6 +106,14 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# Default number of message dispatching throttling-limit for every topic. Using a value of 0, is disabling default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say the "messages per second" rate

conf/broker.conf Outdated
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0

# Default number of message-bytes dispatching throttling-limit for every topic. Using a value of 0, is disabling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, make it clear the meaning is bytes/sec

// throttle only if: (1) cursor is not active bcz active-cursor reads message from cache rather from
// bookkeeper (2) if topic has reached message-rate threshold: then schedule the read after
// MESSAGE_RATE_BACKOFF_MS
if (!cursor.isActive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we could later add a config option to also rate limit the delivery from cache (that might help for topics with many subscriptions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, we could later add a config option to also rate limit the delivery from cache (that might help for topics with many subscriptions)

yes, we also need this configuration to bound actual number of dispatch-messages, it is also helpful to throttle n/w bandwidth consumption by each client. I will create a separate PR to introduce this config.

dispatchRate);
validateSuperUserAccess();

if (!cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to check the cluster here, change will be stored in global-zk and the right cluster will pick it up.

CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
if (path == null || !path.startsWith(LOCAL_POLICIES_ROOT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed since we're not using local policies anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, this method returns CompletableFuture<> so, we should fail the future rather throwing runtime exception.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants