From 076325974d14547d1eac198a607bebfb23698b16 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 12 Oct 2017 16:44:41 -0700 Subject: [PATCH 1/3] Update bundle-cache on split-bundle and avoid disabling main bundle --- .../cache/LocalZooKeeperCacheService.java | 4 +- .../broker/namespace/NamespaceService.java | 2 - .../common/naming/NamespaceBundleFactory.java | 9 ++ .../client/api/BrokerServiceLookupTest.java | 101 ++++++++++++++++++ 4 files changed, 113 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java index 71deae987e2ee..66a1ffaab65a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.cache; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; @@ -90,6 +89,9 @@ public CompletableFuture> getAsync(String path) { // create new policies node under Local ZK by coping it from Global ZK createPolicies(path, true).thenAccept(p -> { LOG.info("Successfully created local policies for {} -- {}", path, p); + // local-policies have been created but it's not part of policiesCache. so, call + // super.getAsync() which will load it and set the watch on local-policies path + super.getAsync(path); future.complete(p); }).exceptionally(ex -> { future.completeExceptionally(ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 7c8ad491a7d81..f87cc5faddf2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -571,9 +571,7 @@ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle) throws updateNamespaceBundles(nsname, splittedBundles.getLeft(), (rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> { if (rc == KeeperException.Code.OK.intValue()) { - // disable old bundle try { - ownershipCache.disableOwnership(bundle); // invalidate cache as zookeeper has new split // namespace bundle bundleFactory.invalidateBundleCache(nsname); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index b6a617efb0919..58a7481ee17bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -89,6 +89,15 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { return future; }); + // local-policies have been changed which has contains namespace bundles + pulsar.getLocalZkCacheService().policiesCache() + .registerListener((String path, LocalPolicies data, Stat stat) -> { + String[] paths = path.split(LOCAL_POLICIES_ROOT + "/"); + if (paths.length == 2) { + invalidateBundleCache(new NamespaceName(paths[1])); + } + }); + if (pulsar != null && pulsar.getConfigurationCache() != null) { pulsar.getLocalZkCacheService().policiesCache().registerListener(this); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 9ba6f43074a08..ccd1ba4c46ef7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.HttpsURLConnection; @@ -68,6 +69,7 @@ import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; @@ -738,6 +740,105 @@ public void start() throws PulsarClientException { } } + /** + * + *
+     * When broker-1's load-manager splits the bundle and update local-policies, broker-2 should get watch of
+     * local-policies and update bundleCache so, new lookup can be redirected properly.
+     * 
+     * (1) Start broker-1 and broker-2
+     * (2) Make sure broker-2 always assign bundle to broker1
+     * (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
+     * (4) Broker-1 will own topic-1
+     * (5) Split the bundle for topic-1
+     * (6) Broker-2 should get the watch and update bundle cache
+     * (7) Make lookup request again to Broker-2 which should succeed.
+     * 
+     * 
+ * + * @throws Exception + */ + @Test(timeOut = 5000) + public void testSplitUnloadLookupTest() throws Exception { + + log.info("-- Starting {} test --", methodName); + + final String namespace = "my-property/use/my-ns"; + // (1) Start broker-1 + ServiceConfiguration conf2 = new ServiceConfiguration(); + conf2.setBrokerServicePort(PortManager.nextFreePort()); + conf2.setBrokerServicePortTls(PortManager.nextFreePort()); + conf2.setWebServicePort(PortManager.nextFreePort()); + conf2.setWebServicePortTls(PortManager.nextFreePort()); + conf2.setAdvertisedAddress("localhost"); + conf2.setClusterName(conf.getClusterName()); + PulsarService pulsar2 = startBroker(conf2); + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); + + pulsar.getLoadManager().get().writeLoadReportOnZookeeper(); + pulsar2.getLoadManager().get().writeLoadReportOnZookeeper(); + + LoadManager loadManager1 = spy(pulsar.getLoadManager().get()); + LoadManager loadManager2 = spy(pulsar2.getLoadManager().get()); + Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); + loadManagerField.setAccessible(true); + + // (2) Make sure broker-2 always assign bundle to broker1 + // mock: redirect request to leader [2] + doReturn(true).when(loadManager2).isCentralized(); + loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2)); + // mock: return Broker1 as a Least-loaded broker when leader receies request [3] + doReturn(true).when(loadManager1).isCentralized(); + SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null); + doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class)); + loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1)); + + URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort()); + PulsarClient pulsarClient2 = PulsarClient.create(broker2ServiceUrl.toString(), new ClientConfiguration()); + + // (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch + final String topic1 = "persistent://" + namespace + "/topic1"; + Consumer consumer1 = pulsarClient2.subscribe(topic1, "my-subscriber-name", new ConsumerConfiguration()); + + Set serviceUnits1 = pulsar.getNamespaceService().getOwnedServiceUnits().stream() + .map(nb -> nb.toString()).collect(Collectors.toSet()); + + // (4) Broker-1 will own topic-1 + final String unsplitBundle = namespace + "/0x00000000_0xffffffff"; + Assert.assertTrue(serviceUnits1.contains(unsplitBundle)); + // broker-2 should have this bundle into the cache + DestinationName destination = DestinationName.get(topic1); + NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination); + Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle); + + // (5) Split the bundle for topic-1 + admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff"); + + // (6) Broker-2 should get the watch and update bundle cache + final int retry = 5; + for (int i = 0; i < retry; i++) { + if (pulsar2.getNamespaceService().getBundle(destination).equals(bundleInBroker2) && i != retry - 1) { + Thread.sleep(200); + } else { + break; + } + } + + // (7) Make lookup request again to Broker-2 which should succeed. + final String topic2 = "persistent://" + namespace + "/topic2"; + Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration()); + + NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService() + .getBundle(DestinationName.get(topic2)); + Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle)); + + consumer1.close(); + consumer2.close(); + pulsarClient2.close(); + pulsar2.close(); + + } /**** helper classes ****/ public static class MockAuthenticationProvider implements AuthenticationProvider { From 027445a3849a5515dc2316eba6e6c0bdd2bb996f Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 12 Oct 2017 18:23:58 -0700 Subject: [PATCH 2/3] disable bundle temporary until refresh cache with new bundles and invalidate old bundle --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index f87cc5faddf2f..fef11d5036a7f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -572,6 +572,8 @@ public CompletableFuture splitAndOwnBundle(NamespaceBundle bundle) throws (rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> { if (rc == KeeperException.Code.OK.intValue()) { try { + // disable old bundle in memory + getOwnershipCache().updateBundleState(bundle, false); // invalidate cache as zookeeper has new split // namespace bundle bundleFactory.invalidateBundleCache(nsname); From 256b697d435677cad21de5cb8aabd4c973c00786 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Fri, 13 Oct 2017 11:09:27 -0700 Subject: [PATCH 3/3] Fix NamespacebundleFactory test init --- .../common/naming/NamespaceBundleFactory.java | 4 ++-- .../broker/cache/ResourceQuotaCacheTest.java | 12 ++++++++++ .../namespace/NamespaceServiceTest.java | 6 ++++- .../broker/namespace/OwnershipCacheTest.java | 14 +++++++++-- .../common/naming/NamespaceBundleTest.java | 20 +++++++++++++++- .../common/naming/NamespaceBundlesTest.java | 24 ++++++++++++++++--- 6 files changed, 71 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 58a7481ee17bc..b5944886a1fd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -234,8 +234,8 @@ public static void validateFullRange(SortedSet partitions) { checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY)); } - public static NamespaceBundleFactory createFactory(HashFunction hashFunc) { - return new NamespaceBundleFactory(null, hashFunc); + public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) { + return new NamespaceBundleFactory(pulsar, hashFunc); } public static boolean isFullBundle(String bundleRange) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java index b044663ff67de..860b9f10eb950 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.broker.cache; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import java.util.concurrent.Executors; @@ -32,9 +35,11 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.zookeeper.LocalZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperCache; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.MockZooKeeper; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -58,6 +63,13 @@ public void setup() throws Exception { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); localCache = new LocalZooKeeperCacheService(zkCache, null); + + // set mock pulsar localzkcache + LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class); + ZooKeeperDataCache poilciesCache = mock(ZooKeeperDataCache.class); + when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache); + when(localZkCache.policiesCache()).thenReturn(poilciesCache); + doNothing().when(poilciesCache).registerListener(any()); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); doReturn(zkCache).when(pulsar).getLocalZkCache(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index f584e8a881cd4..523c8c6e745a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -45,6 +45,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.LocalBrokerData; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; @@ -59,10 +61,12 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; @@ -124,7 +128,7 @@ public void testSplitAndOwnBundles() throws Exception { List bundleList = updatedNsBundles.getBundles(); assertNotNull(bundles); - NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(Hashing.crc32()); + NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); // (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle Pair> splitBundles = splitBundles(utilityFactory, nsname, bundles, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index dce8b50a05082..dd4879e31bc35 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -20,9 +20,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.PulsarService.webAddress; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -31,7 +35,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -48,8 +51,10 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.zookeeper.LocalZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperCache; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.MockZooKeeper; @@ -81,7 +86,12 @@ public void setup() throws Exception { executor = new OrderedSafeExecutor(1, "test"); scheduledExecutor = Executors.newScheduledThreadPool(2); zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor); - localCache = new LocalZooKeeperCacheService(zkCache, null); + localCache = spy(new LocalZooKeeperCacheService(zkCache, null)); + ZooKeeperDataCache poilciesCache = mock(ZooKeeperDataCache.class); + when(pulsar.getLocalZkCacheService()).thenReturn(localCache); + when(localCache.policiesCache()).thenReturn(poilciesCache); + doNothing().when(poilciesCache).registerListener(any()); + bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java index 2ae869aca18fc..7b3261fce6c67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleTest.java @@ -18,16 +18,24 @@ */ package org.apache.pulsar.common.naming; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.testng.annotations.Test; import com.google.common.collect.BoundType; @@ -35,7 +43,7 @@ import com.google.common.hash.Hashing; public class NamespaceBundleTest { - private final NamespaceBundleFactory factory = NamespaceBundleFactory.createFactory(Hashing.crc32()); + private final NamespaceBundleFactory factory = getNamespaceBundleFactory(); @Test public void testConstructor() { @@ -110,6 +118,16 @@ public void testConstructor() { assertEquals(bundle.getNamespaceObject().toString(), "pulsar/use/ns"); } + private NamespaceBundleFactory getNamespaceBundleFactory() { + PulsarService pulsar = mock(PulsarService.class); + LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class); + ZooKeeperDataCache poilciesCache = mock(ZooKeeperDataCache.class); + when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache); + when(localZkCache.policiesCache()).thenReturn(poilciesCache); + doNothing().when(poilciesCache).registerListener(any()); + return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + } + @Test public void testGetBundle() throws Exception { NamespaceBundle bundle = factory.getBundle(new NamespaceName("pulsar/use/ns1"), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java index c2cfd6a266d1f..d56984eb55873 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundlesTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.common.naming; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -31,11 +35,15 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.zookeeper.ZooKeeperDataCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; @@ -48,7 +56,7 @@ public class NamespaceBundlesTest { - private final NamespaceBundleFactory factory = NamespaceBundleFactory.createFactory(Hashing.crc32()); + private final NamespaceBundleFactory factory = getNamespaceBundleFactory(); @SuppressWarnings("unchecked") @Test @@ -115,6 +123,16 @@ public void testConstructor() throws Exception { factory.getBundle(nsFld, Range.range(0x40000000l, BoundType.CLOSED, 0xffffffffl, BoundType.CLOSED))); } + private NamespaceBundleFactory getNamespaceBundleFactory() { + PulsarService pulsar = mock(PulsarService.class); + LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class); + ZooKeeperDataCache poilciesCache = mock(ZooKeeperDataCache.class); + when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache); + when(localZkCache.policiesCache()).thenReturn(poilciesCache); + doNothing().when(poilciesCache).registerListener(any()); + return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32()); + } + @Test public void testFindBundle() throws Exception { SortedSet partitions = Sets.newTreeSet(); @@ -172,7 +190,7 @@ public void testsplitBundles() throws Exception { assertEquals(totalExpectedSplitBundles, splitBundles.getLeft().getBundles().size()); // (2) split in 4: first bundle from above split bundles - NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(Hashing.crc32()); + NamespaceBundleFactory utilityFactory = getNamespaceBundleFactory(); NamespaceBundles bundles2 = splitBundles.getLeft(); NamespaceBundle testChildBundle = bundles2.getBundles().get(0); @@ -212,7 +230,7 @@ public void testSplitBundleInTwo() throws Exception { // (2) split: [0x00000000,0x7fffffff] => [0x00000000_0x3fffffff,0x3fffffff_0x7fffffff], // [0x7fffffff,0xffffffff] => [0x7fffffff_0xbfffffff,0xbfffffff_0xffffffff] - NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(Hashing.crc32()); + NamespaceBundleFactory utilityFactory = getNamespaceBundleFactory(); assertBundles(utilityFactory, nsname, bundle, splitBundles, NO_BUNDLES); // (3) split: [0x00000000,0x3fffffff] => [0x00000000_0x1fffffff,0x1fffffff_0x3fffffff],