Skip to content

Commit

Permalink
[fix][broker] Fix delete system topic clean topic policy (#18823)
Browse files Browse the repository at this point in the history
If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted.

Only change_events topic do not need to clear topic policies.

(cherry picked from commit 93c41de)
  • Loading branch information
liangyepianzhou committed Mar 16, 2023
1 parent cc68533 commit dd4ce57
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 17 deletions.
Expand Up @@ -73,6 +73,7 @@
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand Down Expand Up @@ -310,22 +311,36 @@ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean auth
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
// remove system topics first.
Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
if (!topics.isEmpty()) {
for (String topic : topics) {
try {
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
partitionedTopicPolicySystemTopic.add(topic);
} else {
noPartitionedTopicPolicySystemTopic.add(topic);
}
} else {
futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
}
} catch (Exception ex) {
log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
return;
}
}
}
FutureUtil.waitForAll(futures).thenCompose(__ -> {
List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
FutureUtil.waitForAll(futures)
.thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
.thenCompose(__ -> {
List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
if (ownership.isPresent()) {
Expand Down Expand Up @@ -475,27 +490,41 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
Set<String> nonPartitionedTopics = new HashSet<>();
Set<String> allSystemTopics = new HashSet<>();
Set<String> allPartitionedSystemTopics = new HashSet<>();
Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();

for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
} else {
allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
}
continue;
}
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
// Distinguish partitioned topic to avoid duplicate deletion of the same schema
topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
partitionedTopic, true, true));
partitionedTopics.add(partitionedTopic);
}
} else {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
allSystemTopics.add(topic);
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
noPartitionedTopicPolicySystemTopic.add(topic);
} else {
allSystemTopics.add(topic);
}
continue;
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
} catch (Exception e) {
String errorMessage = String.format("Failed to force delete topic %s, "
+ "but the previous deletion command of partitioned-topics:%s "
Expand All @@ -508,11 +537,6 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
}

for (String partitionedTopic : partitionedTopics) {
topicFutures.add(namespaceResources().getPartitionedTopicResources()
.deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
}

if (log.isDebugEnabled()) {
log.debug("Successfully send deletion command of partitioned-topics:{} "
+ "and non-partitioned-topics:{} in namespace:{}.",
Expand All @@ -524,6 +548,9 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
.thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
.thenCompose((ignore) ->
internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore ->
internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
.thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
Expand Down
Expand Up @@ -1185,8 +1185,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -57,11 +58,13 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -81,6 +84,7 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -103,6 +107,8 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
Expand Down Expand Up @@ -1938,7 +1944,7 @@ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
}

@Test
public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
Expand All @@ -1958,4 +1964,33 @@ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
// 4. delete the policies topic and the topic wil not to clear topic polices
admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
@Test
public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
field.setAccessible(true);
field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));

String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw")));

admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(systemTopic).create();
admin.topicPolicies().setMaxConsumers(systemTopic, 5);

Integer maxConsumerPerTopic = pulsar
.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
.getMaxConsumerPerTopic();

assertEquals(maxConsumerPerTopic, 5);
admin.topics().delete(systemTopic, true);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
assertNull(topicPolicies);
}
}

0 comments on commit dd4ce57

Please sign in to comment.