Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support get applied clusterSubscribeRate #9832

Merged
merged 5 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,6 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S
policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate());
}

if (policies.clusterSubscribeRate.isEmpty()) {
policies.clusterSubscribeRate.put(cluster, subscribeRate());
}
}

protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,16 +1248,27 @@ protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
}
}

protected void internalDeleteSubscribeRate() {
validateSuperUserAccess();
try {
final String path = path(POLICIES, namespaceName.toString());
updatePolicies(path, (policies) -> {
policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected SubscribeRate internalGetSubscribeRate() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
SubscribeRate subscribeRate = policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
if (subscribeRate != null) {
return subscribeRate;
} else {
throw new RestException(Status.NOT_FOUND,
"Subscribe-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
}
return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
315157973 marked this conversation as resolved.
Show resolved Hide resolved
}

protected void internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3778,8 +3778,18 @@ protected CompletableFuture<Void> internalRemovePublishRate() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected Optional<SubscribeRate> internalGetSubscribeRate() {
return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied) {
SubscribeRate subscribeRate = getTopicPolicies(topicName)
.map(TopicPolicies::getSubscribeRate)
.orElseGet(() -> {
if (applied) {
SubscribeRate namespacePolicy = getNamespacePolicies(namespaceName)
.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
return namespacePolicy == null ? subscribeRate() : namespacePolicy;
}
return null;
});
return CompletableFuture.completedFuture(subscribeRate);
}

protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,15 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tena
return internalGetSubscriptionDispatchRate();
}

@DELETE
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Delete subscribe-rate throttling for all topics of the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void deleteSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalDeleteSubscribeRate();
}

@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3096,21 +3096,21 @@ public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
if (!subscribeRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
internalGetSubscribeRate(applied).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed get subscribe rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed get subscribe rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(subscribeRate.get());
asyncResponse.resume(res);
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));
policies.max_unacked_messages_per_subscription = 200000;

assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,37 @@ public void testGetSetSubscribeRate() throws Exception {
admin.topics().deletePartitionedTopic(persistenceTopic, true);
}

@Test(timeOut = 20000)
public void testGetSubscribeRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
pulsarClient.newProducer().topic(topic).create().close();
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
assertNull(admin.topics().getSubscribeRate(topic));
assertNull(admin.namespaces().getSubscribeRate(myNamespace));
SubscribeRate brokerPolicy = new SubscribeRate(
pulsar.getConfiguration().getSubscribeThrottlingRatePerConsumer(),
pulsar.getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
);
assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
SubscribeRate namespacePolicy = new SubscribeRate(10, 11);

admin.namespaces().setSubscribeRate(myNamespace, namespacePolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscribeRate(myNamespace)));
assertEquals(admin.topics().getSubscribeRate(topic, true), namespacePolicy);

SubscribeRate topicPolicy = new SubscribeRate(20, 21);
admin.topics().setSubscribeRate(topic, topicPolicy);
Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscribeRate(topic)));
assertEquals(admin.topics().getSubscribeRate(topic, true), topicPolicy);

admin.namespaces().removeSubscribeRate(myNamespace);
admin.topics().removeSubscribeRate(topic);
Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscribeRate(myNamespace)));
Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscribeRate(topic)));
assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
}

@Test
public void testRemoveSubscribeRate() throws Exception {
admin.topics().createPartitionedTopic(persistenceTopic, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf));
policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf));
policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf));
policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf));

policies.max_unacked_messages_per_subscription = 200000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,21 @@ CompletableFuture<Void> splitNamespaceBundleAsync(
*/
CompletableFuture<Void> setSubscribeRateAsync(String namespace, SubscribeRate subscribeRate);

/**
* Remove namespace-subscribe-rate (topics under this namespace will limit by subscribeRate).
*
* @param namespace
* @throws PulsarAdminException
*/
void removeSubscribeRate(String namespace) throws PulsarAdminException;

/**
* Remove namespace-subscribe-rate (topics under this namespace will limit by subscribeRate) asynchronously.
*
* @param namespace
*/
CompletableFuture<Void> removeSubscribeRateAsync(String namespace);

/**
* Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3043,6 +3043,24 @@ CompletableFuture<Void> setSubscriptionTypesEnabledAsync(String topic,
*/
CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic);

/**
* Get applied topic-subscribe-rate (topics allow subscribe times per consumer in a period).
*
* @param topic
* @returns subscribeRate
* @throws PulsarAdminException
* Unexpected error
*/
SubscribeRate getSubscribeRate(String topic, boolean applied) throws PulsarAdminException;

/**
* Get applied topic-subscribe-rate asynchronously.
*
* @param topic
* @returns subscribeRate
*/
CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic, boolean applied);

/**
* Remove topic-subscribe-rate.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,27 @@ public CompletableFuture<Void> setSubscribeRateAsync(String namespace, Subscribe
return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
}

@Override
public void removeSubscribeRate(String namespace) throws PulsarAdminException {
try {
removeSubscribeRateAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> removeSubscribeRateAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "subscribeRate");
return asyncDeleteRequest(path);
}

@Override
public SubscribeRate getSubscribeRate(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3284,8 +3284,18 @@ public CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic) {

@Override
public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
return getSubscribeRate(topic, false);
}

@Override
public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
return getSubscribeRateAsync(topic, false);
}

@Override
public SubscribeRate getSubscribeRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getSubscribeRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -3297,9 +3307,10 @@ public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException
}

@Override
public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscribeRate");
path = path.queryParam("applied", applied);
final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<SubscribeRate>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-subscribe-rate myprop/clust/ns1"));
verify(mockNamespaces).getSubscribeRate("myprop/clust/ns1");

namespaces.run(split("remove-subscribe-rate myprop/clust/ns1"));
verify(mockNamespaces).removeSubscribeRate("myprop/clust/ns1");

namespaces.run(split("set-subscription-dispatch-rate myprop/clust/ns1 -md -1 -bd -1 -dt 2"));
verify(mockNamespaces).setSubscriptionDispatchRate("myprop/clust/ns1", new DispatchRate(-1, -1, 2));

Expand Down Expand Up @@ -740,6 +743,15 @@ public void topics() throws Exception {
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -t 100"));
verify(mockTopics).expireMessages("persistent://myprop/clust/ns1/ds1", "sub1", 100);

cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap"));
verify(mockTopics).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);

cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 2 -st 60"));
verify(mockTopics).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(2, 60));

cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");

//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopics(() -> admin);
cmdTopics.run(split("expire-messages persistent://myprop/clust/ns1/ds1 -s sub1 -p 1:1 -e"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,18 @@ void run() throws PulsarAdminException {
}
}

@Parameters(commandDescription = "Remove configured subscribe-rate per consumer for all topics of the namespace")
private class RemoveSubscribeRate extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeSubscribeRate(namespace);
}
}


@Parameters(commandDescription = "Set subscription message-dispatch-rate for all subscription of the namespace")
private class SetSubscriptionDispatchRate extends CliCommand {
Expand Down Expand Up @@ -2063,6 +2075,7 @@ public CmdNamespaces(Supplier<PulsarAdmin> admin) {

jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());

jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2190,10 +2190,13 @@ private class GetSubscribeRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
print(getAdmin().topics().getSubscribeRate(persistentTopic));
print(getAdmin().topics().getSubscribeRate(persistentTopic, applied));
}
}

Expand Down