diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index fe0229b44947d..2fc71a1a0e815 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1117,6 +1117,14 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, }); } + protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) { + if (force) { + internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative); + } else { + internalDeleteSubscription(asyncResponse, subName, authoritative); + } + } + protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) { if (topicName.isGlobal()) { try { @@ -1139,7 +1147,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su TopicName topicNamePartition = topicName.getPartition(i); try { futures.add(pulsar().getAdminClient().topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); } catch (Exception e) { log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName, e); @@ -1205,6 +1213,87 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn } } + protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { + if (topicName.isGlobal()) { + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true)); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName, + e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative); + } + }).exceptionally(ex -> { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + } + + private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { + try { + validateAdminAccessForSubscriber(subName, authoritative); + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + sub.deleteForcefully().get(); + log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e); + if (e instanceof WebApplicationException) { + asyncResponse.resume(e); + } else { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e); + asyncResponse.resume(new RestException(e)); + } + } + } + protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { if (topicName.isGlobal()) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 7cf93b7f0e754..326c1a2dde68b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -367,7 +367,8 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") + @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. " + + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -376,10 +377,11 @@ public void getPartitionedStatsInternal( public void deleteSubscription(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName, + @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative); + internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 8a6efdc437626..0b6252a170859 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -606,7 +606,8 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") + @ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. " + + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @@ -625,11 +626,13 @@ public void deleteSubscription( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Subscription to be deleted") @PathParam("subName") String encodedSubName, + @ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean") + @QueryParam("force") @DefaultValue("false") boolean force, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(tenant, namespace, encodedTopic); - internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative); + internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index f7d9687523960..85b54152ad9db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -65,6 +65,8 @@ default long getNumberOfEntriesDelayed() { CompletableFuture delete(); + CompletableFuture deleteForcefully(); + CompletableFuture disconnect(); CompletableFuture doUnsubscribe(Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index f653ee52b4953..73152c32370ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -304,12 +304,53 @@ public synchronized CompletableFuture disconnect() { */ @Override public CompletableFuture delete() { + return delete(false); + } + + /** + * Forcefully close all consumers and deletes the subscription. + * @return + */ + @Override + public CompletableFuture deleteForcefully() { + return delete(true); + } + + /** + * Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer. + * + * @param closeIfConsumersConnected + * Flag indicate whether explicitly close connected consumers before trying to delete subscription. If + * any consumer is connected to it and if this flag is disable then this operation fails. + * @return CompletableFuture indicating the completion of delete operation + */ + private CompletableFuture delete(boolean closeIfConsumersConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); log.info("[{}][{}] Unsubscribing", topicName, subName); + CompletableFuture closeSubscriptionFuture = new CompletableFuture<>(); + + if (closeIfConsumersConnected) { + this.disconnect().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex); + closeSubscriptionFuture.completeExceptionally(ex); + return null; + }); + } else { + this.close().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(exception -> { + log.error("[{}][{}] Error closing subscription", topicName, subName, exception); + closeSubscriptionFuture.completeExceptionally(exception); + return null; + }); + } + // cursor close handles pending delete (ack) operations - this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { + closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { synchronized (this) { (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> { log.info("[{}][{}] Successfully deleted subscription", topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 1ae209d4a305a..e7230fa080268 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -850,12 +850,53 @@ public synchronized CompletableFuture disconnect() { */ @Override public CompletableFuture delete() { + return delete(false); + } + + /** + * Forcefully close all consumers and deletes the subscription. + * @return + */ + @Override + public CompletableFuture deleteForcefully() { + return delete(true); + } + + /** + * Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer. + * + * @param closeIfConsumersConnected + * Flag indicate whether explicitly close connected consumers before trying to delete subscription. If + * any consumer is connected to it and if this flag is disable then this operation fails. + * @return CompletableFuture indicating the completion of delete operation + */ + private CompletableFuture delete(boolean closeIfConsumersConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); log.info("[{}][{}] Unsubscribing", topicName, subName); + CompletableFuture closeSubscriptionFuture = new CompletableFuture<>(); + + if (closeIfConsumersConnected) { + this.disconnect().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex); + closeSubscriptionFuture.completeExceptionally(ex); + return null; + }); + } else { + this.close().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(exception -> { + log.error("[{}][{}] Error closing subscription", topicName, subName, exception); + closeSubscriptionFuture.completeExceptionally(exception); + return null; + }); + } + // cursor close handles pending delete (ack) operations - this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { + closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { synchronized (this) { (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> { log.info("[{}][{}] Successfully deleted subscription", topicName, subName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 15e23fd1a4eb9..3f6303fc4ebcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -84,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -1336,6 +1337,55 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception { admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2"); } + @Test + public void testDeleteSubscription() throws Exception { + final String subName = "test-sub"; + final String persistentTopicName = "persistent://prop-xyz/ns1/test-sub-topic"; + + // disable auto subscription creation + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + + // create a topic and produce some messages + publishMessagesOnPersistentTopic(persistentTopicName, 5); + assertEquals(admin.topics().getList("prop-xyz/ns1"), + Lists.newArrayList(persistentTopicName)); + + // create the subscription by PulsarAdmin + admin.topics().createSubscription(persistentTopicName, subName, MessageId.earliest); + + assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); + + // create consumer and subscription + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + Consumer consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName) + .subscriptionType(SubscriptionType.Exclusive).subscribe(); + + // try to delete the subscription with a connected consumer + try { + admin.topics().deleteSubscription(persistentTopicName, subName); + fail("should have failed"); + } catch (PulsarAdminException.PreconditionFailedException e) { + assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); + } + + // failed to delete the subscription + assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); + + // try to delete the subscription with a connected consumer forcefully + admin.topics().deleteSubscription(persistentTopicName, subName, true); + + // delete the subscription successfully + assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); + + // reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); + + client.close(); + } + @Test(dataProvider = "bundling") public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2d2d0cfa2b097..b164f2c805a7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -183,7 +183,7 @@ public void testGetSubscriptions() { // 6) Delete the subscription response = mock(AsyncResponse.class); - persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true); + persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index cd8e59489098c..7ced674141977 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -939,6 +939,31 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) */ void deleteSubscription(String topic, String subName) throws PulsarAdminException; + /** + * Delete a subscription. + *

+ * Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + * Force flag deletes subscription forcefully by closing all active consumers. + *

+ * + * @param topic + * topic name + * @param subName + * Subscription name + * @param force + * Delete topic forcefully + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws PreconditionFailedException + * Subscription has active consumers + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException; + /** * Delete a subscription asynchronously. *

@@ -954,6 +979,24 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) */ CompletableFuture deleteSubscriptionAsync(String topic, String subName); + /** + * Delete a subscription asynchronously. + *

+ * Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + * Force flag deletes subscription forcefully by closing all active consumers. + *

+ * + * @param topic + * topic name + * @param subName + * Subscription name + * @param force + * Delete topic forcefully + * + * @return a future that can be used to track when the subscription is deleted + */ + CompletableFuture deleteSubscriptionAsync(String topic, String subName, boolean force); + /** * Skip all messages on a topic subscription. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 39a4c122d1d0f..0840409a73c69 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -810,11 +810,31 @@ public void deleteSubscription(String topic, String subName) throws PulsarAdminE } } + @Override + public void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException { + try { + deleteSubscriptionAsync(topic, subName, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public CompletableFuture deleteSubscriptionAsync(String topic, String subName) { + return deleteSubscriptionAsync(topic, subName, false); + } + + @Override + public CompletableFuture deleteSubscriptionAsync(String topic, String subName, boolean force) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName); + path = path.queryParam("force", force); return asyncDeleteRequest(path); } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 283c5fe7c12ac..814df853a7a47 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -639,7 +639,7 @@ void topics() throws Exception { verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1")); - verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1"); + verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false); cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false); @@ -725,7 +725,7 @@ void persistentTopics() throws Exception { verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1"); topics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1")); - verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1"); + verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false); topics.run(split("stats persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 9df331fba320a..64d63e0620b06 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -297,13 +297,17 @@ private class DeleteSubscription extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) private java.util.List params; + @Parameter(names = { "-f", + "--force" }, description = "Disconnect and close all consumers and delete subscription forcefully") + private boolean force = false; + @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true) private String subName; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - persistentTopics.deleteSubscription(persistentTopic, subName); + persistentTopics.deleteSubscription(persistentTopic, subName, force); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index f5954cf38bac4..079405c797f3b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -354,13 +354,17 @@ private class DeleteSubscription extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; + @Parameter(names = { "-f", + "--force" }, description = "Disconnect and close all consumers and delete subscription forcefully") + private boolean force = false; + @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true) private String subName; @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); - topics.deleteSubscription(topic, subName); + topics.deleteSubscription(topic, subName, force); } } diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index c52f87bad31f3..fe258e2c85e06 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1958,6 +1958,7 @@ Options |Flag|Description|Default| |---|---|---| |`-s`, `--subscription`|The subscription to delete|| +|`-f`, `--force`|Disconnect and close all consumers and delete subscription forcefully|false| ### `stats`