Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix delete system topic clean topic policy #18823

Merged
merged 6 commits into from Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -72,6 +72,7 @@
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
Expand Down Expand Up @@ -224,20 +225,30 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
boolean hasNonSystemTopic = false;
List<String> allSystemTopics = new ArrayList<>();
List<String> allPartitionedSystemTopics = new ArrayList<>();
List<String> topicPolicy = new ArrayList<>();
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
List<String> partitionedTopicPolicy = new ArrayList<>();
for (String topic : allTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedTopics.add(topic);
} else {
allSystemTopics.add(topic);
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
topicPolicy.add(topic);
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
} else {
allSystemTopics.add(topic);
}
}
}
for (String topic : allPartitionedTopics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
allUserCreatedPartitionTopics.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
partitionedTopicPolicy.add(topic);
} else {
allPartitionedSystemTopics.add(topic);
}
}
}
if (!force) {
Expand All @@ -256,6 +267,10 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
return internalDeleteTopicsAsync(allSystemTopics);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics);
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(topicPolicy);
}).thenCompose(ignore__ -> {
return internalDeletePartitionedTopicsAsync(partitionedTopicPolicy);
});
})
.thenCompose(ignore -> pulsar().getNamespaceService()
Expand Down
Expand Up @@ -1221,8 +1221,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
.thenCompose(ignore -> {
if (!this.getBrokerService().getPulsar().getBrokerService()
.isSystemTopic(TopicName.get(topic))) {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -2064,7 +2064,7 @@ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
}

@Test
public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
Expand Down
Expand Up @@ -1616,4 +1616,33 @@ public void testGetTxnState() throws Exception {
Transaction abortingTxn = transaction;
Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
}
@Test
public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
String NAMESPACE2 = TENANT + "/ns2";
admin.namespaces().createNamespace(NAMESPACE2);
String topic = NAMESPACE2 + "/testDeleteTopicPolicyWhenDeleteSystemTopic";
String systemTopic = NAMESPACE2 + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create();
pulsarServiceList.get(0).getConfig().setMaxConsumersPerTopic(6);
admin.topicPolicies().setMaxConsumers(systemTopic, 5);

Integer maxConsumerPerTopic = pulsarServiceList.get(0)
.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
.getMaxConsumerPerTopic();

assertEquals(maxConsumerPerTopic, 5);
admin.topics().delete(systemTopic, true);
try {
pulsarServiceList.get(0)
.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
fail();
} catch (TimeoutException ignored) {

}

}
}