Skip to content

Commit

Permalink
Optimize topic policy with HierarchyTopicPolicies about publishRate
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed Feb 14, 2022
1 parent a3c8525 commit 3bf6552
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public AbstractTopic(String topic, BrokerService brokerService) {

this.lastActive = System.nanoTime();
this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(Optional.empty());
}

public SchemaCompatibilityStrategy getSchemaCompatibilityStrategy() {
Expand Down Expand Up @@ -186,11 +185,22 @@ protected void updateTopicPolicy(TopicPolicies data) {
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
topicPolicies.getPublishRate().updateTopicValue(normalize(data.getPublishRate()));
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
}

private PublishRate normalize(PublishRate publishRate) {
if (publishRate != null
&& (publishRate.publishThrottlingRateInMsg > 0
|| publishRate.publishThrottlingRateInByte > 0)) {
return publishRate;
} else {
return null;
}
}

protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
if (log.isDebugEnabled()) {
log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies);
Expand All @@ -216,6 +226,7 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
namespacePolicies.deduplicationSnapshotIntervalSeconds);
updateNamespacePublishRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
.map(DelayedDeliveryPolicies::isActive).orElse(null));
Expand All @@ -230,6 +241,14 @@ protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
}

private void updateNamespacePublishRate(Policies namespacePolicies, String cluster) {
topicPolicies.getPublishRate().updateNamespaceValue(
normalize(
namespacePolicies.publishMaxMessageRate != null
? namespacePolicies.publishMaxMessageRate.get(cluster)
: null));
}

private void updateSchemaCompatibilityStrategyNamespaceValue(Policies namespacePolicies){
if (isSystemTopic()) {
return;
Expand Down Expand Up @@ -275,6 +294,7 @@ private void updateTopicPolicyByBrokerConfig() {

topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
topicPolicies.getPublishRate().updateBrokerValue(publishRateInBroker(config));
topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
topicPolicies.getCompactionThreshold().updateBrokerValue(config.getBrokerServiceCompactionThresholdInBytes());
Expand Down Expand Up @@ -304,6 +324,10 @@ private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionType
}
}

private PublishRate publishRateInBroker(ServiceConfiguration config) {
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes());
}

protected boolean isProducersExceeded() {
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
if (maxProducers > 0 && maxProducers <= producers.size()) {
Expand Down Expand Up @@ -922,51 +946,20 @@ public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}

public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(Optional.of(policies));
}

private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate policy
Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}",
this.topic);
updatePublishDispatcher(topicPublishRate.get());
return;
}

public void updateResourceGroupLimiter(Optional<Policies> optPolicies) {
Policies policies;
try {
if (optPolicies.isPresent()) {
policies = optPolicies.get();
} else {
policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
TopicName.get(topic).getNamespaceObject())
.orElseGet(() -> new Policies());
}
policies = optPolicies.orElseGet(() ->
brokerService.pulsar()
.getPulsarResources()
.getNamespaceResources()
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
.orElseGet(Policies::new));
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
policies = new Policies();
}

//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;

//both namespace-level and topic-level policy are not set, try to use broker-level policy
ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (publishRate != null) {
//publishRate is not null, use namespace-level policy
updatePublishDispatcher(publishRate);
} else {
PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
, serviceConfiguration.getMaxPublishRatePerTopicInBytes());
updatePublishDispatcher(brokerPublishRate);
}

// attach the resource-group level rate limiters, if set
String rgName = policies.resource_group_name != null
? policies.resource_group_name
Expand Down Expand Up @@ -1061,7 +1054,8 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
/**
* update topic publish dispatcher for this topic.
*/
protected void updatePublishDispatcher(PublishRate publishRate) {
public void updatePublishDispatcher() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0
|| publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} ", publishRate);
Expand Down Expand Up @@ -1098,4 +1092,9 @@ public void updateBrokerSubscriptionTypesEnabled() {
topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue(
subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled()));
}

public void updateBrokerPublishRate() {
topicPolicies.getPublishRate()
.updateBrokerValue(publishRateInBroker(brokerService.pulsar().getConfiguration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2206,6 +2206,12 @@ private void updateBrokerDispatchThrottlingMaxRate() {
}

private void updateBrokerPublisherThrottlingMaxRate() {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
((AbstractTopic) topic).updatePublishDispatcher();
}
});
int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
int brokerTickMs = pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ public CompletableFuture<Void> initialize() {
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
}
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,19 @@ public CompletableFuture<Void> initialize() {
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
return;
}

Policies policies = optPolicies.get();

this.updateTopicPolicyByNamespacePolicy(policies);

updatePublishDispatcher();

updateResourceGroupLimiter(optPolicies);

this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
Expand Down Expand Up @@ -2382,7 +2388,9 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

initializeRateLimiterIfNeeded(Optional.ofNullable(data));

this.updateMaxPublishRate(data);
updatePublishDispatcher();

this.updateResourceGroupLimiter(Optional.of(data));

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
Expand Down Expand Up @@ -3033,12 +3041,7 @@ public void onUpdate(TopicPolicies policies) {
}));

FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
if (policies.getPublishRate() != null) {
updatePublishDispatcher(policies.getPublishRate());
} else {
updateMaxPublishRate(namespacePolicies.orElse(null));
}

updatePublishDispatcher();
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testPrecisTopicPublishRateLimitingDisabled() throws Exception {

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
topicRef.onPoliciesUpdate(policies);
MessageId messageId = null;
try {
// first will be success
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testProducerBlockedByPrecisTopicPublishRateLimiting() throws Excepti

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
topicRef.onPoliciesUpdate(policies);
MessageId messageId = null;
try {
// first will be success, and will set auto read to false
Expand Down Expand Up @@ -130,7 +130,7 @@ public void testPrecisTopicPublishRateLimitingProduceRefresh() throws Exception

Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).updateMaxPublishRate(policies);
topicRef.onPoliciesUpdate(policies);
MessageId messageId = null;
try {
// first will be success, and will set auto read to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class HierarchyTopicPolicies {
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Long> compactionThreshold;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
final PolicyHierarchyValue<PublishRate> publishRate;
final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
Expand All @@ -71,6 +72,7 @@ public HierarchyTopicPolicies() {
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
publishRate = new PolicyHierarchyValue<>();
delayedDeliveryEnabled = new PolicyHierarchyValue<>();
delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
compactionThreshold = new PolicyHierarchyValue<>();
Expand Down

0 comments on commit 3bf6552

Please sign in to comment.