diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 4011a48207512..00e381e07292f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -197,22 +197,21 @@ protected CompletableFuture deleteAsync(String path) { } protected CompletableFuture deleteIfExistsAsync(String path) { - return cache.exists(path).thenCompose(exists -> { - if (!exists) { - return CompletableFuture.completedFuture(null); + log.info("Deleting path: {}", path); + CompletableFuture future = new CompletableFuture<>(); + cache.delete(path).whenComplete((ignore, ex) -> { + if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { + log.info("Path {} did not exist in metadata store", path); + future.complete(null); + } else if (ex != null) { + log.info("Failed to delete path from metadata store: {}", path, ex); + future.completeExceptionally(ex); + } else { + log.info("Deleted path from metadata store: {}", path); + future.complete(null); } - CompletableFuture future = new CompletableFuture<>(); - cache.delete(path).whenComplete((ignore, ex) -> { - if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) { - future.complete(null); - } else if (ex != null) { - future.completeExceptionally(ex); - } else { - future.complete(null); - } - }); - return future; }); + return future; } protected boolean exists(String path) throws MetadataStoreException { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index c6b658c3bd025..ae3479fde59b8 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -79,7 +79,7 @@ public void deleteLocalPolicies(NamespaceName ns) throws MetadataStoreException } public CompletableFuture deleteLocalPoliciesAsync(NamespaceName ns) { - return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); + return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString())); } public CompletableFuture deleteLocalPoliciesTenantAsync(String tenant) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 975b23192f949..9d7c60cd34453 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -115,7 +115,7 @@ public void deletePolicies(NamespaceName ns) throws MetadataStoreException{ } public CompletableFuture deletePoliciesAsync(NamespaceName ns){ - return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); + return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString())); } public Optional getPolicies(NamespaceName ns) throws MetadataStoreException{ @@ -155,10 +155,18 @@ public static boolean pathIsNamespaceLocalPolicies(String path) { && path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/"); } - // clear resource of `/namespace/{namespaceName}` for zk-node + /** + * Clear resource of `/namespace/{namespaceName}` for zk-node. + * @param ns the namespace name + * @return a handle to the results of the operation + * */ + // public CompletableFuture deleteNamespaceAsync(NamespaceName ns) { final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString()); - return deleteIfExistsAsync(namespacePath); + // please beware that this will delete all the children of the namespace + // including the ownership nodes (ephemeral nodes) + // see ServiceUnitUtils.path(ns) for the ownership node path + return getStore().deleteRecursive(namespacePath); } // clear resource of `/namespace/{tenant}` for zk-node @@ -303,11 +311,14 @@ public CompletableFuture deletePartitionedTopicAsync(TopicName tn) { public CompletableFuture clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) { final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString()); + log.info("Clearing partitioned topic metadata for namespace {}, path is {}", + namespaceName, globalPartitionedPath); return getStore().deleteRecursive(globalPartitionedPath); } public CompletableFuture clearPartitionedTopicTenantAsync(String tenant) { final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant); + log.info("Clearing partitioned topic metadata for tenant {}, path is {}", tenant, partitionedTopicPath); return deleteIfExistsAsync(partitionedTopicPath); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index 413184764f52b..f607da76b3c11 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -75,11 +75,6 @@ public CompletableFuture> getExistingPartitions(NamespaceName ns, T ); } - public CompletableFuture deletePersistentTopicAsync(TopicName topic) { - String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(); - return store.delete(path, Optional.of(-1L)); - } - public CompletableFuture createPersistentTopicAsync(TopicName topic) { String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding(); return store.put(path, new byte[0], Optional.of(-1L)) @@ -93,38 +88,20 @@ public CompletableFuture persistentTopicExists(TopicName topic) { public CompletableFuture clearNamespacePersistence(NamespaceName ns) { String path = MANAGED_LEDGER_PATH + "/" + ns; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing namespace persistence for namespace: {}, path {}", ns, path); + return store.deleteIfExists(path, Optional.empty()); } public CompletableFuture clearDomainPersistence(NamespaceName ns) { String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent"; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing domain persistence for namespace: {}, path {}", ns, path); + return store.deleteIfExists(path, Optional.empty()); } public CompletableFuture clearTenantPersistence(String tenant) { String path = MANAGED_LEDGER_PATH + "/" + tenant; - return store.exists(path) - .thenCompose(exists -> { - if (exists) { - return store.deleteRecursive(path); - } else { - return CompletableFuture.completedFuture(null); - } - }); + log.info("Clearing tenant persistence for tenant: {}, path {}", tenant, path); + return store.deleteRecursive(path); } void handleNotification(Notification notification) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5f2dccc3e9c24..ca67a24460721 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -309,8 +309,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime clientAppId(), ex); return FutureUtil.failedFuture(ex); } + log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(), + namespaceName, bundle.getBundleRange()); return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), force); + } else { + log.warn("[{}] Skipping deleting namespace bundle {}/{} " + + "as it's not owned by any broker", + clientAppId(), namespaceName, bundle.getBundleRange()); } return CompletableFuture.completedFuture(null); }) @@ -321,8 +327,11 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime final Throwable rc = FutureUtil.unwrapCompletionException(error); if (rc instanceof MetadataStoreException) { if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) { + KeeperException.NotEmptyException ne = + (KeeperException.NotEmptyException) rc.getCause(); log.info("[{}] There are in-flight topics created during the namespace deletion, " - + "retry to delete the namespace again.", namespaceName); + + "retry to delete the namespace again. (path {} is not empty on metadata)", + namespaceName, ne.getPath()); final int next = retryTimes - 1; if (next > 0) { // async recursive @@ -330,7 +339,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime } else { callback.completeExceptionally( new RestException(Status.CONFLICT, "The broker still have in-flight topics" - + " created during namespace deletion, please try again.")); + + " created during namespace deletion (path " + ne.getPath() + ") " + + "is not empty on metadata store, please try again.")); // drop out recursive } return; @@ -476,6 +486,8 @@ protected CompletableFuture internalClearZkSources() { @SuppressWarnings("deprecation") protected CompletableFuture internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative, boolean force) { + log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} force:{}", + clientAppId(), namespaceName, bundleRange, authoritative, force); return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 6d18d6d61b08e..5156246bb5efb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -543,7 +543,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); if (cause instanceof PulsarClientException.AlreadyClosedException) { - log.warn("Read more topic policies exception, close the read now!", ex); + log.info("Closing the topic policies reader for {}", + reader.getSystemTopic().getTopicName()); cleanCacheAndCloseReader( reader.getSystemTopic().getTopicName().getNamespaceObject(), false); } else { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 33942c19520a3..89b0e7a6fe1c0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -23,9 +23,12 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.Consumer; import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Metadata store client interface. @@ -36,6 +39,8 @@ @Beta public interface MetadataStore extends AutoCloseable { + Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class); + /** * Read the value of one key, identified by the path * @@ -121,6 +126,23 @@ default CompletableFuture sync(String path) { */ CompletableFuture delete(String path, Optional expectedVersion); + default CompletableFuture deleteIfExists(String path, Optional expectedVersion) { + return delete(path, expectedVersion) + .exceptionally(e -> { + if (e.getCause() instanceof NotFoundException) { + LOGGER.info("Path {} not found while deleting (this is not a problem)", path); + return null; + } else { + if (expectedVersion.isEmpty()) { + LOGGER.info("Failed to delete path {}", path, e); + } else { + LOGGER.info("Failed to delete path {} with expected version {}", path, expectedVersion, e); + } + throw new CompletionException(e); + } + }); + } + /** * Delete a key-value pair and all the children nodes. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index 0a35664391455..fa827bb40e706 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -360,6 +360,7 @@ public void accept(Notification n) { @Override public final CompletableFuture delete(String path, Optional expectedVersion) { + log.info("Deleting path: {} (v. {})", path, expectedVersion); if (isClosed()) { return FutureUtil.failedFuture( new MetadataStoreException.AlreadyClosedException()); @@ -405,11 +406,13 @@ private CompletableFuture deleteInternal(String path, Optional expec } metadataCaches.forEach(c -> c.invalidate(path)); + log.info("Deleted path: {} (v. {})", path, expectedVersion); }); } @Override public CompletableFuture deleteRecursive(String path) { + log.info("Deleting recursively path: {}", path); if (isClosed()) { return FutureUtil.failedFuture( new MetadataStoreException.AlreadyClosedException()); @@ -419,13 +422,9 @@ public CompletableFuture deleteRecursive(String path) { children.stream() .map(child -> deleteRecursive(path + "/" + child)) .collect(Collectors.toList()))) - .thenCompose(__ -> exists(path)) - .thenCompose(exists -> { - if (exists) { - return delete(path, Optional.empty()); - } else { - return CompletableFuture.completedFuture(null); - } + .thenCompose(__ -> { + log.info("After deleting all children, now deleting path: {}", path); + return deleteIfExists(path, Optional.empty()); }); }