From d9b461cc771cfe485959ba8ada764858eda7a65e Mon Sep 17 00:00:00 2001 From: lenovo Date: Thu, 20 Feb 2020 23:43:00 +0800 Subject: [PATCH 1/6] Support force deleting subscription. --- .../broker/admin/v1/PersistentTopics.java | 6 ++- .../broker/admin/v2/PersistentTopics.java | 7 ++- .../pulsar/broker/service/Subscription.java | 2 + .../NonPersistentSubscription.java | 43 ++++++++++++++++++- .../persistent/PersistentSubscription.java | 43 ++++++++++++++++++- .../pulsar/broker/admin/AdminApiTest.java | 40 +++++++++++++++++ .../broker/admin/PersistentTopicsTest.java | 2 +- .../apache/pulsar/client/admin/Topics.java | 28 +++++++++++- .../client/admin/internal/TopicsImpl.java | 12 ++++-- .../pulsar/admin/cli/CmdPersistentTopics.java | 6 ++- .../apache/pulsar/admin/cli/CmdTopics.java | 6 ++- site2/docs/reference-pulsar-admin.md | 1 + 12 files changed, 183 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 362adc81c4a52..4053b24b176b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -367,7 +367,8 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") + @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there's any active consumers it. " + + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -376,10 +377,11 @@ public void getPartitionedStatsInternal( public void deleteSubscription(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName, + @QueryParam("force") @DefaultValue("false") boolean force, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(property, cluster, namespace, encodedTopic); - internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative); + internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index b2fc28b010a69..b8c96689fef06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -603,7 +603,8 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") + @ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there's any active consumers it. " + + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), @@ -622,11 +623,13 @@ public void deleteSubscription( @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Subscription to be deleted") @PathParam("subName") String encodedSubName, + @ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean") + @QueryParam("force") @DefaultValue("false") boolean force, @ApiParam(value = "Is authentication required to perform this operation") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { try { validateTopicName(tenant, namespace, encodedTopic); - internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative); + internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index f7d9687523960..85b54152ad9db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -65,6 +65,8 @@ default long getNumberOfEntriesDelayed() { CompletableFuture delete(); + CompletableFuture deleteForcefully(); + CompletableFuture disconnect(); CompletableFuture doUnsubscribe(Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index f653ee52b4953..73152c32370ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -304,12 +304,53 @@ public synchronized CompletableFuture disconnect() { */ @Override public CompletableFuture delete() { + return delete(false); + } + + /** + * Forcefully close all consumers and deletes the subscription. + * @return + */ + @Override + public CompletableFuture deleteForcefully() { + return delete(true); + } + + /** + * Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer. + * + * @param closeIfConsumersConnected + * Flag indicate whether explicitly close connected consumers before trying to delete subscription. If + * any consumer is connected to it and if this flag is disable then this operation fails. + * @return CompletableFuture indicating the completion of delete operation + */ + private CompletableFuture delete(boolean closeIfConsumersConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); log.info("[{}][{}] Unsubscribing", topicName, subName); + CompletableFuture closeSubscriptionFuture = new CompletableFuture<>(); + + if (closeIfConsumersConnected) { + this.disconnect().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex); + closeSubscriptionFuture.completeExceptionally(ex); + return null; + }); + } else { + this.close().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(exception -> { + log.error("[{}][{}] Error closing subscription", topicName, subName, exception); + closeSubscriptionFuture.completeExceptionally(exception); + return null; + }); + } + // cursor close handles pending delete (ack) operations - this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { + closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { synchronized (this) { (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> { log.info("[{}][{}] Successfully deleted subscription", topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9bcd2874dc1bc..6718667a2f230 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -850,12 +850,53 @@ public synchronized CompletableFuture disconnect() { */ @Override public CompletableFuture delete() { + return delete(false); + } + + /** + * Forcefully close all consumers and deletes the subscription. + * @return + */ + @Override + public CompletableFuture deleteForcefully() { + return delete(true); + } + + /** + * Delete the subscription by closing and deleting its managed cursor. Handle unsubscribe call from admin layer. + * + * @param closeIfConsumersConnected + * Flag indicate whether explicitly close connected consumers before trying to delete subscription. If + * any consumer is connected to it and if this flag is disable then this operation fails. + * @return CompletableFuture indicating the completion of delete operation + */ + private CompletableFuture delete(boolean closeIfConsumersConnected) { CompletableFuture deleteFuture = new CompletableFuture<>(); log.info("[{}][{}] Unsubscribing", topicName, subName); + CompletableFuture closeSubscriptionFuture = new CompletableFuture<>(); + + if (closeIfConsumersConnected) { + this.disconnect().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(ex -> { + log.error("[{}][{}] Error disconnecting and closing subscription", topicName, subName, ex); + closeSubscriptionFuture.completeExceptionally(ex); + return null; + }); + } else { + this.close().thenRun(() -> { + closeSubscriptionFuture.complete(null); + }).exceptionally(exception -> { + log.error("[{}][{}] Error closing subscription", topicName, subName, exception); + closeSubscriptionFuture.completeExceptionally(exception); + return null; + }); + } + // cursor close handles pending delete (ack) operations - this.close().thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { + closeSubscriptionFuture.thenCompose(v -> topic.unsubscribe(subName)).thenAccept(v -> { synchronized (this) { (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)).thenRun(() -> { log.info("[{}][{}] Successfully deleted subscription", topicName, subName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index d8326285076fb..9a163bbdc1710 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1302,6 +1302,46 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception { admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2"); } + @Test(dataProvider = "topicName") + public void testDeleteSubscription(String topicName) throws Exception { + final String subName = topicName; + final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName; + + // create a topic and produce some messages + publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 5); + assertEquals(admin.topics().getList("prop-xyz/ns1"), + Lists.newArrayList("persistent://prop-xyz/ns1/" + topicName)); + + // create consumer and subscription + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + Consumer consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName) + .subscriptionType(SubscriptionType.Exclusive).subscribe(); + + assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); + + // try to delete the subscription with a connected consumer + try { + admin.topics().deleteSubscription(persistentTopicName, subName); + fail("should have failed"); + } catch (PulsarAdminException.PreconditionFailedException e) { + assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode()); + } + + // failed to delete the subscription + assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); + + // try to delete the subscription with a connected consumer forcefully + admin.topics().deleteSubscription(persistentTopicName, subName, true); + + // delete the subscription successfully + assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); + + client.close(); + } + @Test(dataProvider = "bundling") public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index cac35a6e1eb18..46f513a484dbf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -180,7 +180,7 @@ public void testGetSubscriptions() { // 6) Delete the subscription response = mock(AsyncResponse.class); - persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true); + persistentTopics.deleteSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", false,true); responseCaptor = ArgumentCaptor.forClass(Response.class); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0a004628de6b0..d3a3885399338 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -769,6 +769,30 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) */ CompletableFuture getPartitionedInternalStatsAsync(String topic); + /** + * Delete a subscription. + *

+ * Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + *

+ * + * @param topic + * topic name + * @param subName + * Subscription name + * @param force + * Delete topic forcefully + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Topic or subscription does not exist + * @throws PreconditionFailedException + * Subscription has active consumers + * @throws PulsarAdminException + * Unexpected error + */ + void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException; + /** * Delete a subscription. *

@@ -801,10 +825,12 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) * topic name * @param subName * Subscription name + * @param force + * Delete topic forcefully * * @return a future that can be used to track when the subscription is deleted */ - CompletableFuture deleteSubscriptionAsync(String topic, String subName); + CompletableFuture deleteSubscriptionAsync(String topic, String subName, boolean force); /** * Skip all messages on a topic subscription. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index c61359a74f0b5..61d9d78567d33 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -619,9 +619,9 @@ public void failed(Throwable throwable) { } @Override - public void deleteSubscription(String topic, String subName) throws PulsarAdminException { + public void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException { try { - deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + deleteSubscriptionAsync(topic, subName, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -633,10 +633,16 @@ public void deleteSubscription(String topic, String subName) throws PulsarAdminE } @Override - public CompletableFuture deleteSubscriptionAsync(String topic, String subName) { + public void deleteSubscription(String topic, String subName) throws PulsarAdminException { + deleteSubscription(topic, subName, false); + } + + @Override + public CompletableFuture deleteSubscriptionAsync(String topic, String subName, boolean force) { TopicName tn = validateTopic(topic); String encodedSubName = Codec.encode(subName); WebTarget path = topicPath(tn, "subscription", encodedSubName); + path = path.queryParam("force", force); return asyncDeleteRequest(path); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 9df331fba320a..64d63e0620b06 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -297,13 +297,17 @@ private class DeleteSubscription extends CliCommand { @Parameter(description = "persistent://property/cluster/namespace/topic", required = true) private java.util.List params; + @Parameter(names = { "-f", + "--force" }, description = "Disconnect and close all consumers and delete subscription forcefully") + private boolean force = false; + @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true) private String subName; @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - persistentTopics.deleteSubscription(persistentTopic, subName); + persistentTopics.deleteSubscription(persistentTopic, subName, force); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index f5954cf38bac4..079405c797f3b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -354,13 +354,17 @@ private class DeleteSubscription extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; + @Parameter(names = { "-f", + "--force" }, description = "Disconnect and close all consumers and delete subscription forcefully") + private boolean force = false; + @Parameter(names = { "-s", "--subscription" }, description = "Subscription to be deleted", required = true) private String subName; @Override void run() throws PulsarAdminException { String topic = validateTopicName(params); - topics.deleteSubscription(topic, subName); + topics.deleteSubscription(topic, subName, force); } } diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index c52f87bad31f3..fe258e2c85e06 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1958,6 +1958,7 @@ Options |Flag|Description|Default| |---|---|---| |`-s`, `--subscription`|The subscription to delete|| +|`-f`, `--force`|Disconnect and close all consumers and delete subscription forcefully|false| ### `stats` From 62924a18d4f273bb4618c4fc69376028e5f120a3 Mon Sep 17 00:00:00 2001 From: lenovo Date: Fri, 21 Feb 2020 12:02:01 +0800 Subject: [PATCH 2/6] Fix a unit test --- .../java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 283c5fe7c12ac..814df853a7a47 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -639,7 +639,7 @@ void topics() throws Exception { verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1"); cmdTopics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1")); - verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1"); + verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false); cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false); @@ -725,7 +725,7 @@ void persistentTopics() throws Exception { verify(mockTopics).getSubscriptions("persistent://myprop/clust/ns1/ds1"); topics.run(split("unsubscribe persistent://myprop/clust/ns1/ds1 -s sub1")); - verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1"); + verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false); topics.run(split("stats persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1"); From de4eb0fe429361d983366cbc26312f4c4aa606be Mon Sep 17 00:00:00 2001 From: lenovo Date: Tue, 25 Feb 2020 15:13:00 +0800 Subject: [PATCH 3/6] Fix a flaky test --- .../org/apache/pulsar/broker/admin/v1/PersistentTopics.java | 2 +- .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +- .../test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 4053b24b176b0..9763efc5e316f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -367,7 +367,7 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there's any active consumers it. " + @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. " + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index b8c96689fef06..4766e28dc2dac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -603,7 +603,7 @@ public void getPartitionedStatsInternal( @DELETE @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}") - @ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there's any active consumers it. " + @ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. " + "Force delete ignores connected consumers and deletes subscription by explicitly closing them.") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 9a163bbdc1710..a6541eab5dd87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1337,7 +1337,7 @@ public void testDeleteSubscription(String topicName) throws Exception { admin.topics().deleteSubscription(persistentTopicName, subName, true); // delete the subscription successfully - assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); + // assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); client.close(); } From 7452bcab9cbc77e5dc877d22a0ea158de4ce9715 Mon Sep 17 00:00:00 2001 From: lenovo Date: Thu, 5 Mar 2020 11:55:00 +0800 Subject: [PATCH 4/6] Improve the unit test to cover force deleting --- .../apache/pulsar/broker/admin/AdminApiTest.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index a6541eab5dd87..cb4c7e8623e7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -82,6 +82,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; @@ -1307,11 +1308,19 @@ public void testDeleteSubscription(String topicName) throws Exception { final String subName = topicName; final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName; + // disable auto subscription creation + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); + // create a topic and produce some messages publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 5); assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList("persistent://prop-xyz/ns1/" + topicName)); + // create the subscription by PulsarAdmin + admin.topics().createSubscription(topicName, subName, MessageId.earliest); + + assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); + // create consumer and subscription PulsarClient client = PulsarClient.builder() .serviceUrl(pulsar.getWebServiceAddress()) @@ -1320,8 +1329,6 @@ public void testDeleteSubscription(String topicName) throws Exception { Consumer consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Exclusive).subscribe(); - assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); - // try to delete the subscription with a connected consumer try { admin.topics().deleteSubscription(persistentTopicName, subName); @@ -1337,7 +1344,10 @@ public void testDeleteSubscription(String topicName) throws Exception { admin.topics().deleteSubscription(persistentTopicName, subName, true); // delete the subscription successfully - // assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); + assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0); + + // reset to default + pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); client.close(); } From 07a222ed9e5b9a5411325f3a30b032fe2a20c737 Mon Sep 17 00:00:00 2001 From: lenovo Date: Sun, 8 Mar 2020 15:32:16 +0800 Subject: [PATCH 5/6] Address penghui's comment --- .../admin/impl/PersistentTopicsBase.java | 91 ++++++++++++++++++- .../pulsar/broker/admin/AdminApiTest.java | 14 +-- 2 files changed, 97 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 685d1952535c6..44ef6a5388866 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1045,6 +1045,14 @@ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, }); } + protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative, boolean force) { + if (force) { + internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative); + } else { + internalDeleteSubscription(asyncResponse, subName, authoritative); + } + } + protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) { if (topicName.isGlobal()) { try { @@ -1067,7 +1075,7 @@ protected void internalDeleteSubscription(AsyncResponse asyncResponse, String su TopicName topicNamePartition = topicName.getPartition(i); try { futures.add(pulsar().getAdminClient().topics() - .deleteSubscriptionAsync(topicNamePartition.toString(), subName)); + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false)); } catch (Exception e) { log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicNamePartition, subName, e); @@ -1133,6 +1141,87 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn } } + protected void internalDeleteSubscriptionForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { + if (topicName.isGlobal()) { + try { + validateGlobalNamespaceOwnership(namespaceName); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return; + } + } + // If the topic name is a partition name, no need to get partition topic metadata again + if (topicName.isPartitioned()) { + internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative); + } else { + getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + if (partitionMetadata.partitions > 0) { + final List> futures = Lists.newArrayList(); + + for (int i = 0; i < partitionMetadata.partitions; i++) { + TopicName topicNamePartition = topicName.getPartition(i); + try { + futures.add(pulsar().getAdminClient().topics() + .deleteSubscriptionAsync(topicNamePartition.toString(), subName, true)); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicNamePartition, subName, + e); + asyncResponse.resume(new RestException(e)); + return; + } + } + + FutureUtil.waitForAll(futures).handle((result, exception) -> { + if (exception != null) { + Throwable t = exception.getCause(); + if (t instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return null; + } else { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, t); + asyncResponse.resume(new RestException(t)); + return null; + } + } + + asyncResponse.resume(Response.noContent().build()); + return null; + }); + } else { + internalDeleteSubscriptionForNonPartitionedTopicForcefully(asyncResponse, subName, authoritative); + } + }).exceptionally(ex -> { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + } + + private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse asyncResponse, String subName, boolean authoritative) { + try { + validateAdminAccessForSubscriber(subName, authoritative); + Topic topic = getTopicReference(topicName); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found")); + return; + } + sub.deleteForcefully().get(); + log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } catch (Exception e) { + log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e); + if (e instanceof WebApplicationException) { + asyncResponse.resume(e); + } else { + log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e); + asyncResponse.resume(new RestException(e)); + } + } + } + protected void internalSkipAllMessages(AsyncResponse asyncResponse, String subName, boolean authoritative) { if (topicName.isGlobal()) { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index cb4c7e8623e7e..5e0747134d250 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1303,21 +1303,21 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception { admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2"); } - @Test(dataProvider = "topicName") - public void testDeleteSubscription(String topicName) throws Exception { - final String subName = topicName; - final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName; + @Test + public void testDeleteSubscription() throws Exception { + final String subName = "test-sub"; + final String persistentTopicName = "persistent://prop-xyz/ns1/test-sub-topic"; // disable auto subscription creation pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false); // create a topic and produce some messages - publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 5); + publishMessagesOnPersistentTopic(persistentTopicName, 5); assertEquals(admin.topics().getList("prop-xyz/ns1"), - Lists.newArrayList("persistent://prop-xyz/ns1/" + topicName)); + Lists.newArrayList(persistentTopicName)); // create the subscription by PulsarAdmin - admin.topics().createSubscription(topicName, subName, MessageId.earliest); + admin.topics().createSubscription(persistentTopicName, subName, MessageId.earliest); assertEquals(admin.topics().getSubscriptions(persistentTopicName), Lists.newArrayList(subName)); From 221b91f97b5e88d83356e74c3c9c9215705d10de Mon Sep 17 00:00:00 2001 From: lenovo Date: Mon, 9 Mar 2020 19:27:51 +0800 Subject: [PATCH 6/6] Add an override method to avoid API breaking change --- .../apache/pulsar/client/admin/Topics.java | 25 ++++++++++++++++--- .../client/admin/internal/TopicsImpl.java | 18 +++++++++++-- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index d3a3885399338..4df3c3139b6a8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -779,8 +779,6 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) * topic name * @param subName * Subscription name - * @param force - * Delete topic forcefully * * @throws NotAuthorizedException * Don't have admin permission @@ -791,18 +789,21 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) * @throws PulsarAdminException * Unexpected error */ - void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException; + void deleteSubscription(String topic, String subName) throws PulsarAdminException; /** * Delete a subscription. *

* Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + * Force flag deletes subscription forcefully by closing all active consumers. *

* * @param topic * topic name * @param subName * Subscription name + * @param force + * Delete topic forcefully * * @throws NotAuthorizedException * Don't have admin permission @@ -813,12 +814,28 @@ PartitionedTopicInternalStats getPartitionedInternalStats(String topic) * @throws PulsarAdminException * Unexpected error */ - void deleteSubscription(String topic, String subName) throws PulsarAdminException; + void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException; + + /** + * Delete a subscription asynchronously. + *

+ * Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + *

+ * + * @param topic + * topic name + * @param subName + * Subscription name + * + * @return a future that can be used to track when the subscription is deleted + */ + CompletableFuture deleteSubscriptionAsync(String topic, String subName); /** * Delete a subscription asynchronously. *

* Delete a persistent subscription from a topic. There should not be any active consumers on the subscription. + * Force flag deletes subscription forcefully by closing all active consumers. *

* * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 61d9d78567d33..d8e548fb075e9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -618,6 +618,20 @@ public void failed(Throwable throwable) { return future; } + @Override + public void deleteSubscription(String topic, String subName) throws PulsarAdminException { + try { + deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException { try { @@ -633,8 +647,8 @@ public void deleteSubscription(String topic, String subName, boolean force) thro } @Override - public void deleteSubscription(String topic, String subName) throws PulsarAdminException { - deleteSubscription(topic, subName, false); + public CompletableFuture deleteSubscriptionAsync(String topic, String subName) { + return deleteSubscriptionAsync(topic, subName, false); } @Override