From c7b7c318c39a07dc9b71b69019e3d1f3c2c86d7e Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 17 Nov 2022 10:04:15 +0100 Subject: [PATCH] [proof-of-concept] Introduce the sync() API to guaratee consistency on critical metadata operation paths --- .../pulsar/broker/resources/BaseResources.java | 7 +++++++ .../broker/resources/NamespaceResources.java | 16 +++++++++++++--- .../pulsar/metadata/api/MetadataStore.java | 11 +++++++++++ .../pulsar/metadata/impl/ZKMetadataStore.java | 18 ++++++++++++++++++ 4 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index e705581a9d54d..42add4271f684 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -93,6 +93,13 @@ protected CompletableFuture> getAsync(String path) { return cache.get(path); } + protected CompletableFuture> refreshAndGetAsync(String path) { + return store.sync(path).thenCompose(___ -> { + cache.invalidate(path); + return cache.get(path); + }); + } + protected void set(String path, Function modifyFunction) throws MetadataStoreException { try { setAsync(path, modifyFunction).get(operationTimeoutSec, TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 899cf01bc4bfc..dd1c428380bad 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -258,8 +258,18 @@ public CompletableFuture> listPartitionedTopicsAsync(NamespaceName } public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn) { - return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), - tn.getEncodedLocalName())); + return getPartitionedTopicMetadataAsync(tn, false); + } + + public CompletableFuture> getPartitionedTopicMetadataAsync(TopicName tn, + boolean refresh) { + if (refresh) { + return refreshAndGetAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } else { + return getAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } } public boolean partitionedTopicExists(TopicName tn) throws MetadataStoreException { @@ -317,7 +327,7 @@ public CompletableFuture isPartitionedTopicBeingDeletedAsync(TopicName if (tn.isPartitioned()) { tn = TopicName.get(tn.getPartitionedTopicName()); } - return getPartitionedTopicMetadataAsync(tn) + return getPartitionedTopicMetadataAsync(tn, true) .thenApply(mdOpt -> mdOpt.map(partitionedTopicMetadata -> partitionedTopicMetadata.deleted) .orElse(false)); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java index 66c9f44d3426a..33942c19520a3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java @@ -50,6 +50,17 @@ public interface MetadataStore extends AutoCloseable { */ CompletableFuture> get(String path); + + /** + * Ensure that the next value read from the local client will be up-to-date with the latest version of the value + * as it can be seen by all the other clients. + * @param path + * @return a handle to the operation + */ + default CompletableFuture sync(String path) { + return CompletableFuture.completedFuture(null); + } + /** * Return all the nodes (lexicographically sorted) that are children to the specific path. * diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 79f05365cfbe1..fdd54bd1013fa 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -163,6 +163,24 @@ protected void receivedSessionEvent(SessionEvent event) { } } + @Override + public CompletableFuture sync(String path) { + CompletableFuture result = new CompletableFuture<>(); + zkc.sync(path, new AsyncCallback.VoidCallback() { + @Override + public void processResult(int rc, String s, Object o) { + Code code = Code.get(rc); + if (code == Code.OK) { + result.complete(null); + } else { + MetadataStoreException e = getException(code, path); + result.completeExceptionally(e); + } + } + }, null); + return result; + } + @Override protected void batchOperation(List ops) { try {