Skip to content

Commit

Permalink
Issue 1071: add ratelimiter for subscription (#1358)
Browse files Browse the repository at this point in the history
* add ratelimiter for subscription

* rebase master, change following comments
  • Loading branch information
zhaijack authored and merlimat committed Apr 2, 2018
1 parent f4bb185 commit 9a257c2
Show file tree
Hide file tree
Showing 15 changed files with 948 additions and 60 deletions.
Expand Up @@ -146,6 +146,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
// default message-byte dispatch-throttling
@FieldContext(dynamic = true)
private long dispatchThrottlingRatePerTopicInByte = 0;
// Default number of message dispatching throttling-limit for a subscription.
// Using a value of 0, is disabling.
@FieldContext(dynamic = true)
private int dispatchThrottlingRatePerSubscriptionInMsg = 0;
// Default number of message-bytes dispatching throttling-limit for a subscription.
// Using a value of 0, is disabling.
@FieldContext(dynamic = true)
private long dispatchThrottlingRatePerSubscribeInByte = 0;
// Default dispatch-throttling is disabled for consumers which already caught-up with published messages and
// don't have backlog. This enables dispatch-throttling for non-backlog consumers as well.
@FieldContext(dynamic = true)
Expand Down Expand Up @@ -705,6 +713,22 @@ public void setDispatchThrottlingRatePerTopicInByte(long dispatchThrottlingRateP
this.dispatchThrottlingRatePerTopicInByte = dispatchThrottlingRatePerTopicInByte;
}

public int getDispatchThrottlingRatePerSubscriptionInMsg() {
return dispatchThrottlingRatePerSubscriptionInMsg;
}

public void setDispatchThrottlingRatePerSubscriptionInMsg(int dispatchThrottlingRatePerSubscriptionInMsg) {
this.dispatchThrottlingRatePerSubscriptionInMsg = dispatchThrottlingRatePerSubscriptionInMsg;
}

public long getDispatchThrottlingRatePerSubscribeInByte() {
return dispatchThrottlingRatePerSubscribeInByte;
}

public void setDispatchThrottlingRatePerSubscribeInByte(long dispatchThrottlingRatePerSubscribeInByte) {
this.dispatchThrottlingRatePerSubscribeInByte = dispatchThrottlingRatePerSubscribeInByte;
}

public boolean isDispatchThrottlingOnNonBacklogConsumerEnabled() {
return dispatchThrottlingOnNonBacklogConsumerEnabled;
}
Expand Down Expand Up @@ -1503,7 +1527,7 @@ public Set<String> getTlsCiphers() {
public void setTlsCiphers(Set<String> tlsCiphers) {
this.tlsCiphers = tlsCiphers;
}

public boolean getTlsRequireTrustedClientCertOnConnect() {
return tlsRequireTrustedClientCertOnConnect;
}
Expand Down
Expand Up @@ -633,6 +633,56 @@ protected DispatchRate internalGetDispatchRate() {
}
}

protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
validateSuperUserAccess();

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().subscriptionDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);

// Write back the new policies into zookeeper
globalZk().setData(path, jsonMapper().writeValueAsBytes(policiesNode.getKey()),
policiesNode.getValue().getVersion());
policiesCache().invalidate(path);

log.info("[{}] Successfully updated the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}: does not exist",
clientAppId(), namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected DispatchRate internalGetSubscriptionDispatchRate() {
validateAdminAccessOnProperty(namespaceName.getProperty());

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate = policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Subscription-Dispatch-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessOnProperty(namespaceName.getProperty());
validatePoliciesReadOnlyAccess();
Expand Down
Expand Up @@ -518,6 +518,30 @@ public DispatchRate getDispatchRate(@PathParam("property") String property, @Pat
return internalGetDispatchRate();
}

@POST
@Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setSubscriptionDispatchRate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(property, cluster, namespace);
internalSetSubscriptionDispatchRate(dispatchRate);
}

@GET
@Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetSubscriptionDispatchRate();
}

@GET
@Path("/{property}/{cluster}/{namespace}/backlogQuotaMap")
@ApiOperation(hidden = true, value = "Get backlog quota map on a namespace.")
Expand Down
Expand Up @@ -319,6 +319,28 @@ public DispatchRate getDispatchRate(@PathParam("property") String property,
return internalGetDispatchRate();
}

@POST
@Path("/{property}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Set Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void setSubscriptionDispatchRate(@PathParam("property") String property,
@PathParam("namespace") String namespace,
DispatchRate dispatchRate) {
validateNamespaceName(property, namespace);
internalSetSubscriptionDispatchRate(dispatchRate);
}

@GET
@Path("/{property}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
return internalGetSubscriptionDispatchRate();
}

@GET
@Path("/{property}/{namespace}/backlogQuotaMap")
@ApiOperation(value = "Get backlog quota map on a namespace.")
Expand Down
Expand Up @@ -72,7 +72,9 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.web.PulsarWebResource;
Expand Down Expand Up @@ -1090,6 +1092,14 @@ private void updateConfigurationAndRegisterListeners() {
registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> {
updateManagedLedgerConfig();
});
// add listener to update message-dispatch-rate in msg
registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> {
updateSubscriptionMessageDispatchRate();
});
// add listener to update message-dispatch-rate in byte
registerConfigurationListener("dispatchThrottlingRatePerSubscribeInByte", (dispatchRatePerTopicInByte) -> {
updateSubscriptionMessageDispatchRate();
});
// add more listeners here
}

Expand All @@ -1114,6 +1124,31 @@ private void updateTopicMessageDispatchRate() {
});
}

private void updateSubscriptionMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
topics.forEach((name, topicFuture) -> {
if (topicFuture.isDone()) {
try {
topicFuture.get().getSubscriptions().forEach((subName, persistentSubscription) -> {
if (persistentSubscription
.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
((PersistentDispatcherMultipleConsumers) persistentSubscription
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
} else if (persistentSubscription
.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
((PersistentDispatcherSingleActiveConsumer) persistentSubscription
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
}
});
} catch (Exception e) {
log.warn("Failed to get topic from future while update subscription dispatch rate ", e);
}
}
});
});
}

private void updateManagedLedgerConfig() {
this.pulsar().getExecutor().submit(() -> {
// update managed-ledger config of each topic
Expand Down

0 comments on commit 9a257c2

Please sign in to comment.