From 798b34fd2f4d4998ef2b7836dcd0d7f28883338b Mon Sep 17 00:00:00 2001 From: Surinder Singh Date: Tue, 8 Jun 2021 14:09:47 -0700 Subject: [PATCH] [Issue 10860][pulsar-metadata] Ensure metadata cache consistency across brokers (#10862) * Added metadata cache test to simulate multi broker cache * fix create and delete ops on cache 1. During create we should add a watch for the path in zookeeper. Without this we will not be notified if the znode is changed on another brokers 2. similarly when deleting, the cache should be invalidated. But we shouldn't add an entry to the cache. This could get added again on some other broker. In that case we need to go a fetch the entry from the zookeeper. Adding an empty entry to the cache prevents that. * Address review comments Also add a small delay in test to allow notifications to propagate to other caches. Without this the tests are occasionally failing Co-authored-by: Surinder Singh --- .../cache/impl/MetadataCacheImpl.java | 21 ++++-- .../pulsar/metadata/MetadataCacheTest.java | 71 +++++++++++++++++++ 2 files changed, 85 insertions(+), 7 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index 9ad563f7cf7ff..7af568d8780ee 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -38,6 +38,7 @@ import java.util.function.Supplier; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.metadata.api.CacheGetResult; import org.apache.pulsar.metadata.api.MetadataCache; @@ -50,6 +51,7 @@ import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.Stat; +@Slf4j public class MetadataCacheImpl implements MetadataCache, Consumer { private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); @@ -209,8 +211,17 @@ public CompletableFuture create(String path, T value) { store.put(path, content, Optional.of(-1L)) .thenAccept(stat -> { // Make sure we have the value cached before the operation is completed - objCache.put(path, FutureUtils.value(Optional.of(new CacheGetResult<>(value, stat)))); - future.complete(null); + // In addition to caching the value, we need to add a watch on the path, + // so when/if it changes on any other node, we are notified and we can + // update the cache + objCache.get(path).whenComplete( (stat2, ex) -> { + if (ex == null) { + future.complete(null); + } else { + log.error("Exception while getting path {}", path, ex); + future.completeExceptionally(ex.getCause()); + } + }); }).exceptionally(ex -> { if (ex.getCause() instanceof BadVersionException) { // Use already exists exception to provide more self-explanatory error message @@ -226,11 +237,7 @@ public CompletableFuture create(String path, T value) { @Override public CompletableFuture delete(String path) { - return store.delete(path, Optional.empty()) - .thenAccept(v -> { - // Mark in the cache that the object was removed - objCache.put(path, FutureUtils.value(Optional.empty())); - }); + return store.delete(path, Optional.empty()); } @Override diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java index f1b43798c4251..4d8327042cec9 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java @@ -28,6 +28,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -50,6 +52,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.apache.pulsar.metadata.api.Stat; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class MetadataCacheTest extends BaseMetadataStoreTest { @@ -90,6 +93,74 @@ public void emptyCacheTest(String provider, String url) throws Exception { } } + @DataProvider(name = "zk") + public Object[][] zkimplementations() { + return new Object[][] { + { "ZooKeeper", zks.getConnectionString() }, + }; + } + + @Test(dataProvider = "zk") + public void crossStoreUpdates(String provider, String url) throws Exception { + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + @Cleanup + MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + @Cleanup + MetadataStore store3 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build()); + + MetadataCache objCache1 = store1.getMetadataCache(MyClass.class); + MetadataCache objCache2 = store2.getMetadataCache(MyClass.class); + MetadataCache objCache3 = store3.getMetadataCache(MyClass.class); + + List> allCaches = new ArrayList<>(); + allCaches.add(objCache1); + allCaches.add(objCache2); + allCaches.add(objCache3); + + // Add on one cache and remove from another + multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1"); + // retry same order to rule out any stale state + multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1"); + // Reverse the operations + multiStoreAddDelete(allCaches, 1, 0, "add cache1 del cache0"); + // Ensure that working on same cache continues to work. + multiStoreAddDelete(allCaches, 1, 1, "add cache1 del cache1"); + } + + private void multiStoreAddDelete(List> caches, int addOn, int delFrom, String testName) throws InterruptedException { + MetadataCache addCache = caches.get(addOn); + MetadataCache delCache = caches.get(delFrom); + + String key1 = "/test-key1"; + assertEquals(addCache.getIfCached(key1), Optional.empty()); + + MyClass value1 = new MyClass(testName, 1); + + addCache.create(key1, value1).join(); + // all time for changes to propagate to other caches + Thread.sleep(100); + for (MetadataCache cache: caches) { + if (cache == addCache) { + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + assertEquals(cache.get(key1).join(), Optional.of(value1)); + assertEquals(cache.getIfCached(key1), Optional.of(value1)); + } + + delCache.delete(key1).join(); + + // all time for changes to propagate to other caches + Thread.sleep(100); + // The entry should get removed from all caches + for (MetadataCache cache: caches) { + assertEquals(cache.getIfCached(key1), Optional.empty()); + assertEquals(cache.get(key1).join(), Optional.empty()); + } + } + @Test(dataProvider = "impl") public void insertionDeletionWitGenericType(String provider, String url) throws Exception { @Cleanup