From 5d5ec947249df50bf35a78b6a2a0d3b00d97ca66 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Fri, 12 May 2023 19:29:42 +0800 Subject: [PATCH] [fix][broker] Allow Access to System Topic Metadata for Reader Creation Post-Namespace Deletion (#20304) ## Motivation After initiating the snapshot segment function, deletion of topics necessitates the activation of readers. Furthermore, these readers should be opened and deleted as they are used, which implies that we should not pre-store readers. However, after initiating the deletion of namespaces currently, it is not allowed to obtain the metadata of partition topics or lookup, making it impossible to create readers. This results in the inability to delete namespaces. ## Modification Allow the acquisition of system topic metadata after initiating namespace deletion, thus creating readers to clean up topic data. --- .../admin/impl/PersistentTopicsBase.java | 14 +++++- .../pulsar/broker/lookup/TopicLookupBase.java | 44 +++++++++++-------- .../broker/transaction/TransactionTest.java | 21 +++++++++ 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9dcbe13d6159b..ed7cd70c6411c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; @@ -4408,8 +4409,13 @@ public CompletableFuture getPartitionedTopicMetadata( // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating // producer/consumer + // It is necessary for system topic operations because system topics are used to store metadata + // and other vital information. Even after namespace starting deletion,, + // we need to access the metadata of system topics to create readers and clean up topic data. + // If we don't do this, it can prevent namespace deletion due to inaccessible readers. authorizationFuture.thenCompose(__ -> - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())) + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), + SystemTopicNames.isSystemTopic(topicName))) .thenCompose(res -> pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) .thenAccept(metadata -> { @@ -4436,7 +4442,11 @@ public static CompletableFuture unsafeGetPartitionedTo // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating // producer/consumer - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) + // It is necessary for system topic operations because system topics are used to store metadata + // and other vital information. Even after namespace starting deletion,, + // we need to access the metadata of system topics to create readers and clean up topic data. + // If we don't do this, it can prevent namespace deletion due to inaccessible readers. + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName)) .thenCompose(res -> pulsar.getBrokerService() .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) .thenAccept(metadata -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 3b64d2a9f8393..bd70201cba55d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -41,6 +41,7 @@ import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -221,26 +222,31 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe // (2) authorize client checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> { // (3) validate global namespace + // It is necessary for system topic operations because system topics are used to store metadata + // and other vital information. Even after namespace starting deletion, + // we need to access the metadata of system topics to create readers and clean up topic data. + // If we don't do this, it can prevent namespace deletion due to inaccessible readers. checkLocalOrGetPeerReplicationCluster(pulsarService, - topicName.getNamespaceObject()).thenAccept(peerClusterData -> { - if (peerClusterData == null) { - // (4) all validation passed: initiate lookup - validationFuture.complete(null); - return; - } - // if peer-cluster-data is present it means namespace is owned by that peer-cluster and - // request should be redirect to the peer-cluster - if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl()) - && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) { - validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError, - "Redirected cluster's brokerService url is not configured", - requestId)); - return; - } - validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(), - peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, - requestId, - false)); + topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName)) + .thenAccept(peerClusterData -> { + if (peerClusterData == null) { + // (4) all validation passed: initiate lookup + validationFuture.complete(null); + return; + } + // if peer-cluster-data is present it means namespace is owned by that peer-cluster + // and request should be redirect to the peer-cluster + if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl()) + && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) { + validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError, + "Redirected cluster's brokerService url is not configured", + requestId)); + return; + } + validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(), + peerClusterData.getBrokerServiceUrlTls(), true, + LookupType.Redirect, requestId, + false)); }).exceptionally(ex -> { Throwable throwable = FutureUtil.unwrapCompletionException(ex); if (throwable instanceof RestException restException){ diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index c3533e70cf8be..c4ec2ec766e32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -272,6 +272,27 @@ public void testCreateTransactionSystemTopic() throws Exception { } } + @Test + public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws Exception { + // Enable the segmented snapshot feature + pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true); + pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true); + + // Create a new namespace + String namespaceName = TENANT + "/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic"; + admin.namespaces().createNamespace(namespaceName); + + // Create a new topic in the namespace + String topicName = "persistent://" + namespaceName + "/newTopic"; + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + producer.close(); + + // Destroy the namespace after the test + admin.namespaces().deleteNamespace(namespaceName, true); + pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false); + } + @Test public void brokerNotInitTxnManagedLedgerTopic() throws Exception { String subName = "test";