Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support get applied SubscriptionDispatchRate #9827

Merged
merged 2 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,6 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S
policies.topicDispatchRate.put(cluster, dispatchRate());
}

if (policies.subscriptionDispatchRate.isEmpty()) {
policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate());
}

if (policies.clusterSubscribeRate.isEmpty()) {
policies.clusterSubscribeRate.put(cluster, subscribeRate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,6 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);

Entry<Policies, Stat> policiesNode = null;

try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
Expand All @@ -1215,19 +1213,29 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
}
}

protected void internalDeleteSubscriptionDispatchRate() {
validateSuperUserAccess();

try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected DispatchRate internalGetSubscriptionDispatchRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
DispatchRate dispatchRate =
policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
if (dispatchRate != null) {
return dispatchRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Subscription-Dispatch-rate is not configured for cluster "
+ pulsar().getConfiguration().getClusterName());
}
return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}

protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3668,8 +3668,18 @@ protected CompletableFuture<Void> internalRemoveDispatchRate() {

}

protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied) {
DispatchRate dispatchRate = getTopicPolicies(topicName)
.map(TopicPolicies::getSubscriptionDispatchRate)
.orElseGet(() -> {
if (applied) {
DispatchRate namespacePolicy = getNamespacePolicies(namespaceName)
.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? subscriptionDispatchRate() : namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(dispatchRate);
}

protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,16 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tena
return internalGetSubscriptionDispatchRate();
}

@DELETE
@Path("/{tenant}/{namespace}/subscriptionDispatchRate")
@ApiOperation(value = "Delete Subscription dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalDeleteSubscriptionDispatchRate();
}

@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2647,21 +2647,21 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
if (!dispatchRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
internalGetSubscriptionDispatchRate(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get subscription dispatchRate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get subscription dispatchRate", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(dispatchRate.get());
asyncResponse.resume(res);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
// set default quotas on namespace
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
policies.max_unacked_messages_per_subscription = 200000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,38 @@ public void testRetentionAppliedApi() throws Exception {
-> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies));
}

@Test(timeOut = 20000)
public void testGetSubDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getSubscriptionDispatchRate(topic));
assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace));
DispatchRate brokerDispatchRate = new DispatchRate(
pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(),
pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(),
1
);
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12);

admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate);

DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22);
admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), topicDispatchRate);

admin.namespaces().removeSubscriptionDispatchRate(myNamespace);
admin.topics().removeSubscriptionDispatchRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscriptionDispatchRate(topic)));
assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate);
}

@Test(timeOut = 20000)
public void testRetentionPriority() throws Exception {
final String topic = testTopic + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
// set default quotas on namespace
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));

policies.max_unacked_messages_per_subscription = 200000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,20 @@ CompletableFuture<Void> splitNamespaceBundleAsync(
*/
CompletableFuture<SubscribeRate> getSubscribeRateAsync(String namespace);

/**
* Remove subscription-message-dispatch-rate.
* @param namespace
* @throws PulsarAdminException
*/
void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException;

/**
* Remove subscription-message-dispatch-rate asynchronously.
* @param namespace
* @return
*/
CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String namespace);

/**
* Set subscription-message-dispatch-rate.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,30 @@ void setInactiveTopicPolicies(String topic
*/
CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate);

/**
* Get applied subscription-message-dispatch-rate.
* <p/>
* Subscriptions under this namespace can dispatch this many messages per second.
*
* @param namespace
* @returns DispatchRate
* number of messages per second
* @throws PulsarAdminException
* Unexpected error
*/
DispatchRate getSubscriptionDispatchRate(String namespace, boolean applied) throws PulsarAdminException;

/**
* Get applied subscription-message-dispatch-rate asynchronously.
* <p/>
* Subscriptions under this namespace can dispatch this many messages per second.
*
* @param namespace
* @returns DispatchRate
* number of messages per second
*/
CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String namespace, boolean applied);

/**
* Get subscription-message-dispatch-rate for the topic.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,29 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException {
try {
removeSubscriptionDispatchRateAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "subscriptionDispatchRate");
return asyncDeleteRequest(path);
}


@Override
public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2438,9 +2438,9 @@ public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
}

@Override
public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
public DispatchRate getSubscriptionDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getSubscriptionDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -2452,9 +2452,10 @@ public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdmin
}

@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
Expand All @@ -2471,6 +2472,16 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
return getSubscriptionDispatchRate(topic, false);
}

@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
return getSubscriptionDispatchRateAsync(topic, false);
}

@Override
public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionDispatchRate("myprop/clust/ns1");

namespaces.run(split("remove-subscription-dispatch-rate myprop/clust/ns1"));
verify(mockNamespaces).removeSubscriptionDispatchRate("myprop/clust/ns1");

namespaces.run(split("get-compaction-threshold myprop/clust/ns1"));
verify(mockNamespaces).getCompactionThreshold("myprop/clust/ns1");

Expand Down Expand Up @@ -779,6 +782,13 @@ public void topics() throws Exception {
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false);

cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopics).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", new DispatchRate(-1, -1, 2));
cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");

cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,19 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove subscription configured message-dispatch-rate " +
"for all topics of the namespace")
private class RemoveSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeSubscriptionDispatchRate(namespace);
}
}

@Parameters(commandDescription = "Get subscription configured message-dispatch-rate for all topics of the namespace (Disabled if value < 0)")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -2066,6 +2079,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());

jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,10 +1817,13 @@ private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic));
print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic, applied));
}
}

Expand Down