Skip to content

Commit

Permalink
[fix][broker] Fix delete system topic clean topic policy (apache#18823)
Browse files Browse the repository at this point in the history
### Motivation
If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted.

### Modification
Only change_events topic do not need to clear topic policies.
  • Loading branch information
liangyepianzhou committed Dec 9, 2022
1 parent 66f8f8c commit 93c41de
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
Expand Up @@ -72,6 +72,7 @@
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
Expand Down Expand Up @@ -224,20 +225,30 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
List<String> topicPolicy = new ArrayList<>();
List<String> partitionedTopicPolicy = new ArrayList<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
allSystemTopics.add(topic);
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
} else {
allSystemTopics.add(topic);
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
Expand All @@ -256,6 +267,10 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
return internalDeleteTopicsAsync(allSystemTopics);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(topicPolicy);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
});
})
.thenCompose(ignore -> pulsar().getNamespaceService()
Expand Down
Expand Up @@ -1221,8 +1221,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> {
if (!this.getBrokerService().getPulsar().getBrokerService()
.isSystemTopic(TopicName.get(topic))) {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -62,12 +63,14 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -88,6 +91,7 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -110,6 +114,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -2050,7 +2055,7 @@ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
}

@Test
public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
Expand All @@ -2070,4 +2075,33 @@ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
// 4. delete the policies topic and the topic wil not to clear topic polices
admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
@Test
public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
field.setAccessible(true);
field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));

String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw")));

admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(systemTopic).create();
admin.topicPolicies().setMaxConsumers(systemTopic, 5);

Integer maxConsumerPerTopic = pulsar
.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
.getMaxConsumerPerTopic();

assertEquals(maxConsumerPerTopic, 5);
admin.topics().delete(systemTopic, true);
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
assertNull(topicPolicies);
}
}

0 comments on commit 93c41de

Please sign in to comment.