Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -262,29 +263,42 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth

// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
try {
// remove system topics first.
if (!topics.isEmpty()) {
for (String topic : topics) {
pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
});
// remove system topics first.
if (!topics.isEmpty()) {
for (String topic : topics) {
try {
Topics cli = pulsar().getAdminClient().topics();
futures.add(cli.terminateTopicAsync(topic).thenCompose(__ ->
cli.deleteAsync(topic, true, true)));
} catch (Exception ex) {
log.error("[{}] Failed to delete system topic {}", topic, clientAppId(), ex);
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
return;
}
}
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
futures.add(pulsar().getAdminClient().namespaces()
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
}
FutureUtil.waitForAll(futures).thenAccept(r -> {
futures.clear();
try {
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
futures.add(pulsar().getAdminClient().namespaces()
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
}
}
} catch (Exception e) {
log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return;
}
} catch (Exception e) {
log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
asyncResponse.resume(new RestException(e));
return;
}
}).exceptionally(e -> {
log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e.getCause());
asyncResponse.resume(new RestException(e.getCause()));
return null;
});

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
private static final String NAMESPACE1 = "system-topic/namespace-1";
private static final String NAMESPACE2 = "system-topic/namespace-2";
private static final String NAMESPACE3 = "system-topic/namespace-3";
private static final String NAMESPACE4 = "system-topic/namespace-4";

private static final TopicName TOPIC1 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-1");
private static final TopicName TOPIC2 = TopicName.get("persistent", NamespaceName.get(NAMESPACE1), "topic-2");
private static final TopicName TOPIC3 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-1");
private static final TopicName TOPIC4 = TopicName.get("persistent", NamespaceName.get(NAMESPACE2), "topic-2");
private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1");
private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2");
private static final TopicName TOPIC7 = TopicName.get("persistent", NamespaceName.get(NAMESPACE4), "topic-7");

private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;

Expand Down Expand Up @@ -279,20 +281,21 @@ public void testListenerCleanupByPartition() throws Exception {
}



private void prepareData() throws PulsarAdminException {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("system-topic",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NAMESPACE1);
admin.namespaces().createNamespace(NAMESPACE2);
admin.namespaces().createNamespace(NAMESPACE3);
admin.namespaces().createNamespace(NAMESPACE4);
admin.lookups().lookupTopic(TOPIC1.toString());
admin.lookups().lookupTopic(TOPIC2.toString());
admin.lookups().lookupTopic(TOPIC3.toString());
admin.lookups().lookupTopic(TOPIC4.toString());
admin.lookups().lookupTopic(TOPIC5.toString());
admin.lookups().lookupTopic(TOPIC6.toString());
admin.topics().createNonPartitionedTopic(TOPIC7.toString());
systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
}

Expand Down Expand Up @@ -373,4 +376,25 @@ public void run() {
}
});
}

@Test
public void testDeleteNamespaceWithSystemTopicDeleted() throws Exception {
TopicPolicies initPolicy = TopicPolicies.builder()
.maxConsumerPerTopic(10)
.build();
systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC7, initPolicy).get();
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(systemTopicBasedTopicPoliciesService
.getPoliciesCacheInit(TOPIC7.getNamespaceObject())));
Awaitility.await().untilAsserted(() -> {
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7));
Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC7).getMaxConsumerPerTopic().intValue(), 10);
});

admin.topics().delete(TOPIC7.getPartitionedTopicName());
admin.namespaces().deleteNamespace(NAMESPACE4);
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(admin.namespaces().getNamespaces("system-topic").contains(NAMESPACE4));
});
}
}