From f844defc11f9fecba0c7bd8c1d8f08592b3c4979 Mon Sep 17 00:00:00 2001 From: technoboy Date: Fri, 17 Dec 2021 17:22:01 +0800 Subject: [PATCH 1/5] Fix delete namespace with 'Cannot delete non empty bundle' issue. --- .../broker/admin/impl/NamespacesBase.java | 43 +++++++++++-------- ...temTopicBasedTopicPoliciesServiceTest.java | 27 +++++++++++- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 912b9ecb4e86f..a99febd46c992 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -262,29 +262,38 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth // remove from owned namespace map and ephemeral node from ZK final List> futures = Lists.newArrayList(); - try { - // remove system topics first. - if (!topics.isEmpty()) { - for (String topic : topics) { - pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { - topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully())); + // remove system topics first. + if (!topics.isEmpty()) { + for (String topic : topics) { + pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { + topicOptional.ifPresent(systemTopic -> { + futures.add(systemTopic.deleteForcefully()); }); - } + }); } - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle bundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then we do not need to delete the bundle - if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { - futures.add(pulsar().getAdminClient().namespaces() - .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange())); + } + FutureUtil.waitForAll(futures).thenAccept(r -> { + futures.clear(); + try { + NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundles(namespaceName); + for (NamespaceBundle bundle : bundles.getBundles()) { + // check if the bundle is owned by any broker, if not then we do not need to delete the bundle + if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { + futures.add(pulsar().getAdminClient().namespaces() + .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange())); + } } + } catch (Exception e) { + log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); + asyncResponse.resume(new RestException(e)); + return; } - } catch (Exception e) { + }).exceptionally(e -> { log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); asyncResponse.resume(new RestException(e)); - return; - } + return null; + }); FutureUtil.waitForAll(futures).handle((result, exception) -> { if (exception != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 1b4b2fdf2b97f..300db45d37ec9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -65,6 +65,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic private static final String NAMESPACE1 = "system-topic/namespace-1"; private static final String NAMESPACE2 = "system-topic/namespace-2"; private static final String NAMESPACE3 = "system-topic/namespace-3"; + private static final String NAMESPACE4 = "system-topic/namespace-4"; private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1"); private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2"); @@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2"); private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1"); private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2"); + private static final TopicName TOPIC7 = TopicName.get("persistent", NamespaceName.get(NAMESPACE4), "topic-7"); private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService; @@ -279,7 +281,6 @@ public void testListenerCleanupByPartition() throws Exception { } - private void prepareData() throws PulsarAdminException { admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); admin.tenants().createTenant("system-topic", @@ -287,12 +288,14 @@ private void prepareData() throws PulsarAdminException { admin.namespaces().createNamespace(NAMESPACE1); admin.namespaces().createNamespace(NAMESPACE2); admin.namespaces().createNamespace(NAMESPACE3); + admin.namespaces().createNamespace(NAMESPACE4); admin.lookups().lookupTopic(TOPIC1.toString()); admin.lookups().lookupTopic(TOPIC2.toString()); admin.lookups().lookupTopic(TOPIC3.toString()); admin.lookups().lookupTopic(TOPIC4.toString()); admin.lookups().lookupTopic(TOPIC5.toString()); admin.lookups().lookupTopic(TOPIC6.toString()); + admin.topics().createNonPartitionedTopic(TOPIC7.toString()); systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); } @@ -373,4 +376,26 @@ public void run() { } }); } + + @Test + public void testDeleteNamespaceWithSystemTopicDeleted() throws Exception { + TopicPolicies initPolicy = TopicPolicies.builder() + .maxConsumerPerTopic(10) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC7, initPolicy).get(); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(systemTopicBasedTopicPoliciesService + .getPoliciesCacheInit(TOPIC7.getNamespaceObject()))); + Awaitility.await().untilAsserted(() -> { + Assert.assertNotNull(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7)); + Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7).getMaxConsumerPerTopic().intValue(), 10); + }); + + admin.topics().delete(TOPIC7.getPartitionedTopicName()); + admin.namespaces().deleteNamespace(NAMESPACE4); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(admin.namespaces().getNamespaces("system-topic").contains(NAMESPACE4)); + Assert.assertNull(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7)); + }); + } } From b19e703981f58caebe343a6914524db6c1058e89 Mon Sep 17 00:00:00 2001 From: technoboy Date: Tue, 21 Dec 2021 19:42:57 +0800 Subject: [PATCH 2/5] Add removeTopicPoliciesCache. --- .../pulsar/broker/admin/impl/NamespacesBase.java | 4 +++- .../SystemTopicBasedTopicPoliciesService.java | 14 ++++++++++++++ .../broker/service/TopicPoliciesService.java | 11 +++++++++++ ...SystemTopicBasedTopicPoliciesServiceTest.java | 16 +++++++++++++++- 4 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index a99febd46c992..504f913924ea1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -267,7 +267,9 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth for (String topic : topics) { pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { topicOptional.ifPresent(systemTopic -> { - futures.add(systemTopic.deleteForcefully()); + futures.add(pulsar().getTopicPoliciesService() + .removeTopicPoliciesCache(TopicName.get(topic)) + .thenAccept(__ -> systemTopic.deleteForcefully())); }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4da9d59fbe558..4ab1439c6efd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -204,6 +204,20 @@ public TopicPolicies getTopicPolicies(TopicName topicName, : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); } + @Override + public CompletableFuture removeTopicPoliciesCache(TopicName topicName) { + NamespaceName namespace = topicName.getNamespaceObject(); + CompletableFuture> readerCompletableFuture = + readerCaches.remove(namespace); + if (readerCompletableFuture != null) { + readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync); + ownedBundlesCountPerNamespace.remove(namespace); + policyCacheInitMap.remove(namespace); + policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace)); + } + return CompletableFuture.completedFuture(null); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index e4066d9deeebf..75e6cf43438a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -77,6 +77,12 @@ public interface TopicPoliciesService { */ TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; + /** + * Remove policies from current cache. + * @param topicName topic name + */ + CompletableFuture removeTopicPoliciesCache(TopicName topicName); + /** * When getting TopicPolicies, if the initialization has not been completed, * we will go back off and try again until time out. @@ -160,6 +166,11 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) return null; } + @Override + public CompletableFuture removeTopicPoliciesCache(TopicName topicName) { + return CompletableFuture.completedFuture(null); + } + @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 300db45d37ec9..22c07d87a94ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -395,7 +395,21 @@ public void testDeleteNamespaceWithSystemTopicDeleted() throws Exception { admin.namespaces().deleteNamespace(NAMESPACE4); Awaitility.await().untilAsserted(() -> { Assert.assertFalse(admin.namespaces().getNamespaces("system-topic").contains(NAMESPACE4)); - Assert.assertNull(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7)); }); } + + @Test + public void testRemoveTopicPoliciesCache() throws Exception { + TopicPolicies initPolicy = TopicPolicies.builder() + .maxConsumerPerTopic(10) + .build(); + systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC7, initPolicy).get(); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(systemTopicBasedTopicPoliciesService + .getPoliciesCacheInit(TOPIC7.getNamespaceObject()))); + systemTopicBasedTopicPoliciesService.removeTopicPoliciesCache(TOPIC7); + Awaitility.await().untilAsserted(() -> + Assert.assertNull(systemTopicBasedTopicPoliciesService + .getPoliciesCacheInit(TOPIC7.getNamespaceObject()))); + } } From 6af029354acb89a9cced5445dfd3d4165ad6d3c1 Mon Sep 17 00:00:00 2001 From: technoboy Date: Wed, 29 Dec 2021 20:39:17 +0800 Subject: [PATCH 3/5] Call topic.terminate --- .../pulsar/broker/admin/impl/NamespacesBase.java | 3 +-- .../SystemTopicBasedTopicPoliciesService.java | 14 -------------- .../broker/service/TopicPoliciesService.java | 11 ----------- .../SystemTopicBasedTopicPoliciesServiceTest.java | 15 --------------- 4 files changed, 1 insertion(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 504f913924ea1..abaf209d1c2ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -267,8 +267,7 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth for (String topic : topics) { pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { topicOptional.ifPresent(systemTopic -> { - futures.add(pulsar().getTopicPoliciesService() - .removeTopicPoliciesCache(TopicName.get(topic)) + futures.add(((PersistentTopic) systemTopic).terminate() .thenAccept(__ -> systemTopic.deleteForcefully())); }); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 4ab1439c6efd9..4da9d59fbe558 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -204,20 +204,6 @@ public TopicPolicies getTopicPolicies(TopicName topicName, : policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); } - @Override - public CompletableFuture removeTopicPoliciesCache(TopicName topicName) { - NamespaceName namespace = topicName.getNamespaceObject(); - CompletableFuture> readerCompletableFuture = - readerCaches.remove(namespace); - if (readerCompletableFuture != null) { - readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync); - ownedBundlesCountPerNamespace.remove(namespace); - policyCacheInitMap.remove(namespace); - policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace)); - } - return CompletableFuture.completedFuture(null); - } - @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return policiesCache.get(TopicName.get(topicName.getPartitionedTopicName())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 75e6cf43438a0..e4066d9deeebf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -77,12 +77,6 @@ public interface TopicPoliciesService { */ TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; - /** - * Remove policies from current cache. - * @param topicName topic name - */ - CompletableFuture removeTopicPoliciesCache(TopicName topicName); - /** * When getting TopicPolicies, if the initialization has not been completed, * we will go back off and try again until time out. @@ -166,11 +160,6 @@ public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) return null; } - @Override - public CompletableFuture removeTopicPoliciesCache(TopicName topicName) { - return CompletableFuture.completedFuture(null); - } - @Override public TopicPolicies getTopicPoliciesIfExists(TopicName topicName) { return null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 22c07d87a94ef..3c87a6afdaaf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -397,19 +397,4 @@ public void testDeleteNamespaceWithSystemTopicDeleted() throws Exception { Assert.assertFalse(admin.namespaces().getNamespaces("system-topic").contains(NAMESPACE4)); }); } - - @Test - public void testRemoveTopicPoliciesCache() throws Exception { - TopicPolicies initPolicy = TopicPolicies.builder() - .maxConsumerPerTopic(10) - .build(); - systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC7, initPolicy).get(); - Awaitility.await().untilAsserted(() -> - Assert.assertTrue(systemTopicBasedTopicPoliciesService - .getPoliciesCacheInit(TOPIC7.getNamespaceObject()))); - systemTopicBasedTopicPoliciesService.removeTopicPoliciesCache(TOPIC7); - Awaitility.await().untilAsserted(() -> - Assert.assertNull(systemTopicBasedTopicPoliciesService - .getPoliciesCacheInit(TOPIC7.getNamespaceObject()))); - } } From 3a73d89b4d7c2c113694f609fa9d548abf3a840d Mon Sep 17 00:00:00 2001 From: technoboy Date: Mon, 10 Jan 2022 17:05:09 +0800 Subject: [PATCH 4/5] Call adminClient to process the topic. --- .../pulsar/broker/admin/impl/NamespacesBase.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index abaf209d1c2ba..d2ae26af68ad8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -60,6 +60,7 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -265,12 +266,15 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth // remove system topics first. if (!topics.isEmpty()) { for (String topic : topics) { - pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> { - topicOptional.ifPresent(systemTopic -> { - futures.add(((PersistentTopic) systemTopic).terminate() - .thenAccept(__ -> systemTopic.deleteForcefully())); - }); - }); + try { + Topics cli = pulsar().getAdminClient().topics(); + futures.add(cli.terminateTopicAsync(topic).thenCompose(__ -> + cli.deleteAsync(topic, true, true))); + } catch (Exception ex) { + log.error("[{}] Failed to delete system topic {}", topic, clientAppId(), ex); + asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex)); + return; + } } } FutureUtil.waitForAll(futures).thenAccept(r -> { From 90336641acf3ff5f61152e28fdceee2e7c43320a Mon Sep 17 00:00:00 2001 From: technoboy Date: Tue, 11 Jan 2022 14:22:53 +0800 Subject: [PATCH 5/5] use getCause. --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d2ae26af68ad8..cb2d4a6a4efc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -295,8 +295,8 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth return; } }).exceptionally(e -> { - log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e); - asyncResponse.resume(new RestException(e)); + log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e.getCause()); + asyncResponse.resume(new RestException(e.getCause())); return null; });