Skip to content

Commit

Permalink
Support dispatch rate policy at the topic level (#9175)
Browse files Browse the repository at this point in the history
Fixes #9143

### Motivation
The dispatch rate police is supported at the namespace level
But does not support at the topic level since we supported topic level policy at 2.7.0

### Modifications
add API

### Verifying this change
ReplicatorRateLimiterTest#testReplicatorRatePriority
TopicPoliciesTest#testReplicatorRateApi
  • Loading branch information
315157973 committed Jan 14, 2021
1 parent 4c60262 commit 7b65fab
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,18 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private void preValidation() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,90 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Get replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
Optional<DispatchRate> dispatchRate = internalGetReplicatorDispatchRate();
if (dispatchRate.isPresent()) {
asyncResponse.resume(dispatchRate.get());
} else {
asyncResponse.resume(Response.noContent().build());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Set replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")})
public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Replicator dispatch rate of the topic")
DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Updating replicatorDispatchRate failed", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Updating replicatorDispatchRate failed", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
+ ", replicatorDispatchRate={}"
, clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate")
@ApiOperation(value = "Remove replicatorDispatchRate config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove replicatorDispatchRate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Get maxProducers config for specified topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ private DispatchRate createDispatchRate() {
}

/**
* Update dispatch-throttling-rate. gives first priority to namespace-policy configured dispatch rate else applies
* default broker dispatch-throttling-rate
* Update dispatch-throttling-rate.
* Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
* broker-level
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
Expand Down Expand Up @@ -189,6 +190,13 @@ public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService br
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getSubscriptionDispatchRate);
break;
case REPLICATOR:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getReplicatorDispatchRate);
break;
default:
break;
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies cache have not init.", topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,13 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer,
});
}

@Override
protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
long newEpoch = currentEpoch.orElse(-1L) + 1;
return setTopicEpoch(newEpoch);
}

@Override
protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
CompletableFuture<Long> future = new CompletableFuture<>();
ledger.asyncSetProperty(TOPIC_EPOCH_PROPERTY_NAME, String.valueOf(newEpoch), new UpdatePropertiesCallback() {
Expand Down Expand Up @@ -2059,7 +2061,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
}
});
replicators.forEach((name, replicator) ->
replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
);
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
Expand Down Expand Up @@ -2662,10 +2664,12 @@ public void onUpdate(TopicPolicies policies) {
}

initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent() && policies != null) {
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
replicators.forEach((name, replicator) -> replicator.getRateLimiter()
.ifPresent(DispatchRateLimiter::updateDispatchRate));
}

private Optional<Policies> getNamespacePolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,4 +1387,25 @@ public void testMaxSubscriptionsPerTopic() throws Exception {
c.close();
}
}

@Test(timeOut = 30000)
public void testReplicatorRateApi() throws Exception {
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));

assertNull(admin.topics().getReplicatorDispatchRate(topic));

DispatchRate dispatchRate = new DispatchRate(100,200L,10);
admin.topics().setReplicatorDispatchRate(topic, dispatchRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertEquals(admin.topics().getReplicatorDispatchRate(topic), dispatchRate));

admin.topics().removeReplicatorDispatchRate(topic);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(()
-> assertNull(admin.topics().getReplicatorDispatchRate(topic)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -41,6 +43,10 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

/**
* Starts 3 brokers that are in 3 different clusters
*/
Expand Down Expand Up @@ -74,6 +80,73 @@ public Object[][] dispatchRateProvider() {
return new Object[][] { { DispatchRateType.messageRate }, { DispatchRateType.byteRate } };
}

@Test
public void testReplicatorRatePriority() throws Exception {
shutdown();
config1.setSystemTopicEnabled(true);
config1.setTopicLevelPoliciesEnabled(true);
config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
setup();

final String namespace = "pulsar/replicatorchange-" + System.currentTimeMillis();
final String topicName = "persistent://" + namespace + "/ratechange";

admin1.namespaces().createNamespace(namespace);
// set 2 clusters, there will be 1 replicator in each topic
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();
client1.newProducer().topic(topicName).create().close();
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();
Awaitility.await().atMost(3, TimeUnit.SECONDS)
.until(() -> pulsar1.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topicName)));

//use broker-level by default
assertTrue(topic.getReplicators().values().get(0).getRateLimiter().isPresent());
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 200L);

//set namespace-level policy, which should take effect
DispatchRate nsDispatchRate = new DispatchRate(50, 60L, 70);
admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 50);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 60L);

//set topic-level policy, which should take effect
DispatchRate topicRate = new DispatchRate(10, 20L, 30);
admin1.topics().setReplicatorDispatchRate(topicName, topicRate);
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(admin1.topics().getReplicatorDispatchRate(topicName), topicRate));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 10);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);

//Set the namespace-level policy, which should not take effect
DispatchRate nsDispatchRate2 = new DispatchRate(500, 600L, 700);
admin1.namespaces().setReplicatorDispatchRate(namespace, nsDispatchRate2);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertEquals(admin1.namespaces().getReplicatorDispatchRate(namespace), nsDispatchRate2));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(), 20L);

//remove topic-level policy, namespace-level should take effect
admin1.topics().removeReplicatorDispatchRate(topicName);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertNull(admin1.topics().getReplicatorDispatchRate(topicName)));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 500);
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
600L);

//remove namespace-level policy, broker-level should take effect
admin1.namespaces().setReplicatorDispatchRate(namespace, null);
Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnMsg(), 100));
assertEquals(topic.getReplicators().values().get(0).getRateLimiter().get().getDispatchRateOnByte(),
200L);
}

/**
* verifies dispatch rate for replicators get changed once namespace policies changed.
*
Expand Down Expand Up @@ -103,7 +176,6 @@ public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName).get();

// 1. default replicator throttling not configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,70 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic);

/**
* Set replicatorDispatchRate for the topic.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second
*
* @param topic
* @param dispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException;

/**
* Set replicatorDispatchRate for the topic asynchronously.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @param dispatchRate
* number of messages per second
*/
CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate);

/**
* Get replicatorDispatchRate for the topic.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @returns DispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException;

/**
* Get replicatorDispatchRate asynchronously.
* <p/>
* Replicator dispatch rate under this topic can dispatch this many messages per second.
*
* @param topic
* @returns DispatchRate
* number of messages per second
*/
CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic);

/**
* Remove replicatorDispatchRate for a topic.
* @param topic
* Topic name
* @throws PulsarAdminException
* Unexpected error
*/
void removeReplicatorDispatchRate(String topic) throws PulsarAdminException;

/**
* Remove replicatorDispatchRate for a topic asynchronously.
* @param topic
* Topic name
*/
CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic);

/**
* Get the compactionThreshold for a topic. The maximum number of bytes
* can have before compaction is triggered. 0 disables.
Expand Down

0 comments on commit 7b65fab

Please sign in to comment.