Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker]Ensure namespace deletion doesn't fail #22627

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,21 @@ protected CompletableFuture<Void> deleteAsync(String path) {
}

protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
return cache.exists(path).thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(null);
log.info("Deleting path: {}", path);
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
log.info("Path {} did not exist in metadata store", path);
future.complete(null);
} else if (ex != null) {
log.info("Cannot path from metadata store: {}", path, ex);
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
future.completeExceptionally(ex);
} else {
log.info("Deleted path from metadata store: {}", path);
future.complete(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
future.complete(null);
} else if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});
return future;
});
return future;
}

protected boolean exists(String path) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void deleteLocalPolicies(NamespaceName ns) throws MetadataStoreException
}

public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
}

public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String tenant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void deletePolicies(NamespaceName ns) throws MetadataStoreException{
}

public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreException{
Expand Down Expand Up @@ -155,10 +155,18 @@ public static boolean pathIsNamespaceLocalPolicies(String path) {
&& path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/");
}

// clear resource of `/namespace/{namespaceName}` for zk-node
/**
* Clear resource of `/namespace/{namespaceName}` for zk-node.
* @param ns the namespace name
* @return a handle to the results of the operation
* */
//
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
return deleteIfExistsAsync(namespacePath);
// please beware that this will delete all the children of the namespace
// including the ownership nodes (ephemeral nodes)
// see ServiceUnitUtils.path(ns) for the ownership node path
return getStore().deleteRecursive(namespacePath);
}

// clear resource of `/namespace/{tenant}` for zk-node
Expand Down Expand Up @@ -303,11 +311,14 @@ public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {

public CompletableFuture<Void> clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
log.info("Clearing partitioned topic metadata for namespace {}, path is {}",
namespaceName, globalPartitionedPath);
return getStore().deleteRecursive(globalPartitionedPath);
}

public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
log.info("Clearing partitioned topic metadata for tenant {}, path is {}", tenant, partitionedTopicPath);
return deleteIfExistsAsync(partitionedTopicPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, T
);
}

public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.delete(path, Optional.of(-1L));
}

public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.put(path, new byte[0], Optional.of(-1L))
Expand All @@ -93,38 +88,20 @@ public CompletableFuture<Boolean> persistentTopicExists(TopicName topic) {

public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing namespace persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing domain persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearTenantPersistence(String tenant) {
String path = MANAGED_LEDGER_PATH + "/" + tenant;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.deleteRecursive(path);
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing tenant persistence for tenant: {}, path {}", tenant, path);
return store.deleteIfExists(path, Optional.empty());
}

void handleNotification(Notification notification) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,14 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
clientAppId(), ex);
return FutureUtil.failedFuture(ex);
}
log.info("[{}] Deleting namespace bundle {}/{}", clientAppId(),
namespaceName, bundle.getBundleRange());
return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
bundle.getBundleRange(), force);
} else {
log.warn("[{}] Skipping deleting namespace bundle {}/{} "
+ "as it's not owned by any broker",
clientAppId(), namespaceName, bundle.getBundleRange());
}
return CompletableFuture.completedFuture(null);
})
Expand All @@ -321,16 +327,20 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
final Throwable rc = FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
KeeperException.NotEmptyException ne =
(KeeperException.NotEmptyException) rc.getCause();
log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
+ "retry to delete the namespace again. (path {} is not empty on metadata)",
namespaceName, ne.getPath());
final int next = retryTimes - 1;
if (next > 0) {
// async recursive
internalRetryableDeleteNamespaceAsync0(force, next, callback);
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion, please try again."));
+ " created during namespace deletion (path " + ne.getPath() + ") "
+ "is not empty on metadata store, please try again."));
// drop out recursive
}
return;
Expand Down Expand Up @@ -476,6 +486,8 @@ protected CompletableFuture<Void> internalClearZkSources() {
@SuppressWarnings("deprecation")
protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
boolean force) {
log.info("[{}] Deleting namespace bundle {}/{} authoritative:{} force:{}",
clientAppId(), namespaceName, bundleRange, authoritative, force);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
log.warn("Read more topic policies exception, close the read now!", ex);
log.info("Closing the topic policies reader for {}",
reader.getSystemTopic().getTopicName());
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Metadata store client interface.
Expand All @@ -36,6 +39,8 @@
@Beta
public interface MetadataStore extends AutoCloseable {

Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class);

/**
* Read the value of one key, identified by the path
*
Expand Down Expand Up @@ -121,6 +126,23 @@ default CompletableFuture<Void> sync(String path) {
*/
CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion);

default CompletableFuture<Void> deleteIfExists(String path, Optional<Long> expectedVersion) {
return delete(path, expectedVersion)
.exceptionally(e -> {
if (e.getCause() instanceof NotFoundException) {
LOGGER.info("Path {} not found while deleting (this is not a problem)", path);
return null;
} else {
if (expectedVersion.isEmpty()) {
LOGGER.info("Failed to delete path {}", path, e);
} else {
LOGGER.info("Failed to delete path {} with expected version {}", path, expectedVersion, e);
}
throw new CompletionException(e);
}
});
}

/**
* Delete a key-value pair and all the children nodes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public void accept(Notification n) {

@Override
public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
log.info("Deleting path: {} (v. {})", path, expectedVersion);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
Expand Down Expand Up @@ -405,11 +406,13 @@ private CompletableFuture<Void> deleteInternal(String path, Optional<Long> expec
}

metadataCaches.forEach(c -> c.invalidate(path));
log.info("Deleted path: {} (v. {})", path, expectedVersion);
});
}

@Override
public CompletableFuture<Void> deleteRecursive(String path) {
log.info("Deleting recursively path: {}", path);
if (isClosed()) {
return FutureUtil.failedFuture(
new MetadataStoreException.AlreadyClosedException());
Expand All @@ -419,13 +422,9 @@ public CompletableFuture<Void> deleteRecursive(String path) {
children.stream()
.map(child -> deleteRecursive(path + "/" + child))
.collect(Collectors.toList())))
.thenCompose(__ -> exists(path))
.thenCompose(exists -> {
if (exists) {
return delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
.thenCompose(__ -> {
log.info("After deleting all children, now deleting path: {}", path);
return deleteIfExists(path, Optional.empty());
});
}

Expand Down