From 45f8b5ecb65b14a362d617d65e5d9ccdd3cc7f26 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 6 Mar 2021 19:55:04 +0800 Subject: [PATCH 1/2] Support get applied SubscriptionDispatchRate --- .../pulsar/broker/admin/AdminResource.java | 4 -- .../broker/admin/impl/NamespacesBase.java | 30 +++++++++------ .../admin/impl/PersistentTopicsBase.java | 14 ++++++- .../pulsar/broker/admin/v2/Namespaces.java | 10 +++++ .../broker/admin/v2/PersistentTopics.java | 22 +++++------ .../pulsar/broker/admin/AdminApiTest.java | 1 - .../broker/admin/TopicPoliciesTest.java | 32 ++++++++++++++++ .../broker/admin/v1/V1_AdminApiTest.java | 1 - .../pulsar/client/admin/Namespaces.java | 14 +++++++ .../apache/pulsar/client/admin/Topics.java | 24 ++++++++++++ .../client/admin/internal/NamespacesImpl.java | 23 ++++++++++++ .../client/admin/internal/TopicsImpl.java | 37 ++++++++++++------- .../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +++++ .../pulsar/admin/cli/CmdNamespaces.java | 14 +++++++ .../apache/pulsar/admin/cli/CmdTopics.java | 5 ++- 15 files changed, 197 insertions(+), 44 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 55402859a6c61..3806de143286d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -402,10 +402,6 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S policies.topicDispatchRate.put(cluster, dispatchRate()); } - if (policies.subscriptionDispatchRate.isEmpty()) { - policies.subscriptionDispatchRate.put(cluster, subscriptionDispatchRate()); - } - if (policies.clusterSubscribeRate.isEmpty()) { policies.clusterSubscribeRate.put(cluster, subscribeRate()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 4ce30bfdaa8da..3637bff6268c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1198,8 +1198,6 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { validateSuperUserAccess(); log.info("[{}] Set namespace subscription dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); - Entry policiesNode = null; - try { final String path = path(POLICIES, namespaceName.toString()); updatePolicies(path, (policies) -> { @@ -1215,19 +1213,29 @@ protected void internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { } } + protected void internalDeleteSubscriptionDispatchRate() { + validateSuperUserAccess(); + + try { + final String path = path(POLICIES, namespaceName.toString()); + updatePolicies(path, (policies) -> { + policies.subscriptionDispatchRate.remove(pulsar().getConfiguration().getClusterName()); + return policies; + }); + log.info("[{}] Successfully delete the subscriptionDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + } catch (Exception e) { + log.error("[{}] Failed to delete the subscriptionDispatchRate for cluster on namespace {}", clientAppId(), + namespaceName, e); + throw new RestException(e); + } + } + protected DispatchRate internalGetSubscriptionDispatchRate() { validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ); Policies policies = getNamespacePolicies(namespaceName); - DispatchRate dispatchRate = - policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()); - if (dispatchRate != null) { - return dispatchRate; - } else { - throw new RestException(Status.NOT_FOUND, - "Subscription-Dispatch-rate is not configured for cluster " - + pulsar().getConfiguration().getClusterName()); - } + return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()); } protected void internalSetSubscribeRate(SubscribeRate subscribeRate) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ac82b11f61448..ea13f2fa17c71 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3668,8 +3668,18 @@ protected CompletableFuture internalRemoveDispatchRate() { } - protected Optional internalGetSubscriptionDispatchRate() { - return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate); + protected CompletableFuture internalGetSubscriptionDispatchRate(boolean applied) { + DispatchRate dispatchRate = getTopicPolicies(topicName) + .map(TopicPolicies::getSubscriptionDispatchRate) + .orElseGet(() -> { + if (applied) { + DispatchRate namespacePolicy = getNamespacePolicies(namespaceName) + .subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName()); + return namespacePolicy == null ? subscriptionDispatchRate() : namespacePolicy; + } + return null; + }); + return CompletableFuture.completedFuture(dispatchRate); } protected CompletableFuture internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 0ee95dec0f274..2fa0a8f5e3146 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -597,6 +597,16 @@ public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tena return internalGetSubscriptionDispatchRate(); } + @DELETE + @Path("/{tenant}/{namespace}/subscriptionDispatchRate") + @ApiOperation(value = "Delete Subscription dispatch-rate throttling for all topics of the namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) + public void deleteSubscriptionDispatchRate(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalDeleteSubscriptionDispatchRate(); + } + @POST @Path("/{tenant}/{namespace}/subscribeRate") @ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1e6e300bf1995..501f577cde9e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -2647,21 +2647,21 @@ public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse, public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied) { validateTopicName(tenant, namespace, encodedTopic); preValidation(); - try { - Optional dispatchRate = internalGetSubscriptionDispatchRate(); - if (!dispatchRate.isPresent()) { - asyncResponse.resume(Response.noContent().build()); + internalGetSubscriptionDispatchRate(applied).whenComplete((res, ex) -> { + if (ex instanceof RestException) { + log.error("Failed get subscription dispatchRate", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed get subscription dispatchRate", ex); + asyncResponse.resume(new RestException(ex)); } else { - asyncResponse.resume(dispatchRate.get()); + asyncResponse.resume(res); } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 6d3a662a04bc9..573a4e171278b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -710,7 +710,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc // set default quotas on namespace Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf)); policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf)); - policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf)); policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf)); policies.max_unacked_messages_per_subscription = 200000; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 2421efa93f3e0..9f4c6f22fc3b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -339,6 +339,38 @@ public void testRetentionAppliedApi() throws Exception { -> assertEquals(admin.topics().getRetention(topic, true), brokerPolicies)); } + @Test(timeOut = 20000) + public void testGetSubDispatchRateApplied() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + pulsarClient.newProducer().topic(topic).create().close(); + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic))); + assertNull(admin.topics().getSubscriptionDispatchRate(topic)); + assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace)); + DispatchRate brokerDispatchRate = new DispatchRate( + pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInMsg(), + pulsar.getConfiguration().getDispatchThrottlingRatePerSubscriptionInByte(), + 1 + ); + assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate); + DispatchRate namespaceDispatchRate = new DispatchRate(10, 11, 12); + + admin.namespaces().setSubscriptionDispatchRate(myNamespace, namespaceDispatchRate); + Awaitility.await().untilAsserted(() -> assertNotNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace))); + assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), namespaceDispatchRate); + + DispatchRate topicDispatchRate = new DispatchRate(20, 21, 22); + admin.topics().setSubscriptionDispatchRate(topic, topicDispatchRate); + Awaitility.await().untilAsserted(() -> assertNotNull(admin.topics().getSubscriptionDispatchRate(topic))); + assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), topicDispatchRate); + + admin.namespaces().removeSubscriptionDispatchRate(myNamespace); + admin.topics().removeSubscriptionDispatchRate(topic); + Awaitility.await().untilAsserted(() -> assertNull(admin.namespaces().getSubscriptionDispatchRate(myNamespace))); + Awaitility.await().untilAsserted(() -> assertNull(admin.topics().getSubscriptionDispatchRate(topic))); + assertEquals(admin.topics().getSubscriptionDispatchRate(topic, true), brokerDispatchRate); + } + @Test(timeOut = 20000) public void testRetentionPriority() throws Exception { final String topic = testTopic + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 9c4cbc3888743..2e0084a873ec9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -646,7 +646,6 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc // set default quotas on namespace Policies.setStorageQuota(policies, ConfigHelper.backlogQuota(conf)); policies.topicDispatchRate.put("test", ConfigHelper.topicDispatchRate(conf)); - policies.subscriptionDispatchRate.put("test", ConfigHelper.subscriptionDispatchRate(conf)); policies.clusterSubscribeRate.put("test", ConfigHelper.subscribeRate(conf)); policies.max_unacked_messages_per_subscription = 200000; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index faaffe5919d4d..6b84a4b08dbcc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -2087,6 +2087,20 @@ CompletableFuture splitNamespaceBundleAsync( */ CompletableFuture getSubscribeRateAsync(String namespace); + /** + * Remove subscription-message-dispatch-rate. + * @param namespace + * @throws PulsarAdminException + */ + void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException; + + /** + * Remove subscription-message-dispatch-rate asynchronously. + * @param namespace + * @return + */ + CompletableFuture removeSubscriptionDispatchRateAsync(String namespace); + /** * Set subscription-message-dispatch-rate. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 11047b78ff821..f62c0ce9ca743 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2329,6 +2329,30 @@ void setInactiveTopicPolicies(String topic */ CompletableFuture setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate); + /** + * Get applied subscription-message-dispatch-rate. + *

+ * Subscriptions under this namespace can dispatch this many messages per second. + * + * @param namespace + * @returns DispatchRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + DispatchRate getSubscriptionDispatchRate(String namespace, boolean applied) throws PulsarAdminException; + + /** + * Get applied subscription-message-dispatch-rate asynchronously. + *

+ * Subscriptions under this namespace can dispatch this many messages per second. + * + * @param namespace + * @returns DispatchRate + * number of messages per second + */ + CompletableFuture getSubscriptionDispatchRateAsync(String namespace, boolean applied); + /** * Get subscription-message-dispatch-rate for the topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index b0177189a0829..e5ff0a33558ce 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1698,6 +1698,29 @@ public void failed(Throwable throwable) { return future; } + @Override + public void removeSubscriptionDispatchRate(String namespace) throws PulsarAdminException { + try { + removeSubscriptionDispatchRateAsync(namespace). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture removeSubscriptionDispatchRateAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "subscriptionDispatchRate"); + return asyncDeleteRequest(path); + } + + @Override public void setSubscriptionDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException { try { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index d1a18374f7706..7196e9db866c4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2438,9 +2438,9 @@ public CompletableFuture removeDispatchRateAsync(String topic) { } @Override - public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException { + public DispatchRate getSubscriptionDispatchRate(String topic, boolean applied) throws PulsarAdminException { try { - return getSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + return getSubscriptionDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -2452,25 +2452,36 @@ public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdmin } @Override - public CompletableFuture getSubscriptionDispatchRateAsync(String topic) { + public CompletableFuture getSubscriptionDispatchRateAsync(String topic, boolean applied) { TopicName topicName = validateTopic(topic); WebTarget path = topicPath(topicName, "subscriptionDispatchRate"); + path = path.queryParam("applied", applied); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, - new InvocationCallback() { - @Override - public void completed(DispatchRate dispatchRate) { - future.complete(dispatchRate); - } + new InvocationCallback() { + @Override + public void completed(DispatchRate dispatchRate) { + future.complete(dispatchRate); + } - @Override - public void failed(Throwable throwable) { - future.completeExceptionally(getApiException(throwable.getCause())); - } - }); + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); return future; } + @Override + public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException { + return getSubscriptionDispatchRate(topic, false); + } + + @Override + public CompletableFuture getSubscriptionDispatchRateAsync(String topic) { + return getSubscriptionDispatchRateAsync(topic, false); + } + @Override public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException { try { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 0be27e4c48535..3d8e38ad2a4b0 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -539,6 +539,9 @@ public void namespaces() throws Exception { namespaces.run(split("get-subscription-dispatch-rate myprop/clust/ns1")); verify(mockNamespaces).getSubscriptionDispatchRate("myprop/clust/ns1"); + namespaces.run(split("remove-subscription-dispatch-rate myprop/clust/ns1")); + verify(mockNamespaces).removeSubscriptionDispatchRate("myprop/clust/ns1"); + namespaces.run(split("get-compaction-threshold myprop/clust/ns1")); verify(mockNamespaces).getCompactionThreshold("myprop/clust/ns1"); @@ -779,6 +782,13 @@ public void topics() throws Exception { cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable")); verify(mockTopics).setDeduplicationStatus("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2")); + verify(mockTopics).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", new DispatchRate(-1, -1, 2)); + cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false); + cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index e988e8264000d..8909d7d2e5ef7 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -883,6 +883,19 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Remove subscription configured message-dispatch-rate " + + "for all topics of the namespace") + private class RemoveSubscriptionDispatchRate extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().removeSubscriptionDispatchRate(namespace); + } + } + @Parameters(commandDescription = "Get subscription configured message-dispatch-rate for all topics of the namespace (Disabled if value < 0)") private class GetSubscriptionDispatchRate extends CliCommand { @Parameter(description = "tenant/namespace\n", required = true) @@ -2066,6 +2079,7 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate()); jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate()); + jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate()); jcommander.addCommand("set-publish-rate", new SetPublishRate()); jcommander.addCommand("get-publish-rate", new GetPublishRate()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 71172be2a0714..78e6b768e0b1e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1817,10 +1817,13 @@ private class GetSubscriptionDispatchRate extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic)); + print(getAdmin().topics().getSubscriptionDispatchRate(persistentTopic, applied)); } } From 6e591b83699ebdca09d328e61b7ca2a9da8e808a Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Sat, 6 Mar 2021 20:13:29 +0800 Subject: [PATCH 2/2] code style --- .../client/admin/internal/TopicsImpl.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 7196e9db866c4..89d16e70b16ff 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -2458,17 +2458,17 @@ public CompletableFuture getSubscriptionDispatchRateAsync(String t path = path.queryParam("applied", applied); final CompletableFuture future = new CompletableFuture<>(); asyncGetRequest(path, - new InvocationCallback() { - @Override - public void completed(DispatchRate dispatchRate) { - future.complete(dispatchRate); - } + new InvocationCallback() { + @Override + public void completed(DispatchRate dispatchRate) { + future.complete(dispatchRate); + } - @Override - public void failed(Throwable throwable) { - future.completeExceptionally(getApiException(throwable.getCause())); - } - }); + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); return future; }