diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 9ad563f7cf7ff..7af568d8780ee 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -38,6 +38,7 @@ import java.util.function.Supplier; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -50,6 +51,7 @@ import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.Stat; +@Slf4j public class MetadataCacheImpl implements MetadataCache, Consumer { private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); @@ -209,8 +211,17 @@ public CompletableFuture create(String path, T value) { store.put(path, content, Optional.of(-1L)) .thenAccept(stat -> { // Make sure we have the value cached before the operation is completed - objCache.put(path, FutureUtils.value(Optional.of(new CacheGetResult<>(value, stat)))); - future.complete(null); + // In addition to caching the value, we need to add a watch on the path, + // so when/if it changes on any other node, we are notified and we can + // update the cache + objCache.get(path).whenComplete( (stat2, ex) -> { + if (ex == null) { + future.complete(null); + } else { + log.error("Exception while getting path {}", path, ex); + future.completeExceptionally(ex.getCause()); + } + }); }).exceptionally(ex -> { if (ex.getCause() instanceof BadVersionException) { // Use already exists exception to provide more self-explanatory error message @@ -226,11 +237,7 @@ public CompletableFuture create(String path, T value) { @Override public CompletableFuture delete(String path) { - return store.delete(path, Optional.empty()) - .thenAccept(v -> { - // Mark in the cache that the object was removed - objCache.put(path, FutureUtils.value(Optional.empty())); - }); + return store.delete(path, Optional.empty()); } @Override diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index f1b43798c4251..4d8327042cec9 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -28,6 +28,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -50,6 +52,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Stat; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class MetadataCacheTest extends BaseMetadataStoreTest { @@ -90,6 +93,74 @@ public void emptyCacheTest(String provider, String url) throws Exception { } } + @DataProvider(name = "zk") + public Object[][] zkimplementations() { + return new Object[][] { + { "ZooKeeper", zks.getConnectionString() }, + }; + } + + @Test(dataProvider = "zk") + public void crossStoreUpdates(String provider, String url) throws Exception { + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + @Cleanup + MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + @Cleanup + MetadataStore store3 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + MetadataCache objCache1 = store1.getMetadataCache(MyClass.class); + MetadataCache objCache2 = store2.getMetadataCache(MyClass.class); + MetadataCache objCache3 = store3.getMetadataCache(MyClass.class); + + List> allCaches = new ArrayList<>(); + allCaches.add(objCache1); + allCaches.add(objCache2); + allCaches.add(objCache3); + + // Add on one cache and remove from another + multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1"); + // retry same order to rule out any stale state + multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1"); + // Reverse the operations + multiStoreAddDelete(allCaches, 1, 0, "add cache1 del cache0"); + // Ensure that working on same cache continues to work. + multiStoreAddDelete(allCaches, 1, 1, "add cache1 del cache1"); + } + + private void multiStoreAddDelete(List> caches, int addOn, int delFrom, String testName) throws InterruptedException { + MetadataCache addCache = caches.get(addOn); + MetadataCache delCache = caches.get(delFrom); + + String key1 = "/test-key1"; + assertEquals(addCache.getIfCached(key1), Optional.empty()); + + MyClass value1 = new MyClass(testName, 1); + + addCache.create(key1, value1).join(); + // all time for changes to propagate to other caches + Thread.sleep(100); + for (MetadataCache cache: caches) { + if (cache == addCache) { + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + assertEquals(cache.get(key1).join(), Optional.of(value1)); + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + + delCache.delete(key1).join(); + + // all time for changes to propagate to other caches + Thread.sleep(100); + // The entry should get removed from all caches + for (MetadataCache cache: caches) { + assertEquals(cache.getIfCached(key1), Optional.empty()); + assertEquals(cache.get(key1).join(), Optional.empty()); + } + } + @Test(dataProvider = "impl") public void insertionDeletionWitGenericType(String provider, String url) throws Exception { @Cleanup