Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update bundle-cache on split-bundle and avoid disabling main bundle #822

Merged
merged 3 commits into from
Oct 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
Expand Down Expand Up @@ -90,6 +89,9 @@ public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
// create new policies node under Local ZK by coping it from Global ZK
createPolicies(path, true).thenAccept(p -> {
LOG.info("Successfully created local policies for {} -- {}", path, p);
// local-policies have been created but it's not part of policiesCache. so, call
// super.getAsync() which will load it and set the watch on local-policies path
super.getAsync(path);
future.complete(p);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,9 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle) throws
updateNamespaceBundles(nsname, splittedBundles.getLeft(),
(rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
if (rc == KeeperException.Code.OK.intValue()) {
// disable old bundle
try {
ownershipCache.disableOwnership(bundle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed, or just a safety measure to have the parent bundle to be still "active" in case other brokers don't have the local cache updated yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that protection may need while broker is refreshing cache (invalidate old bundle and assign topic-ownership to new bundles). So, I will add that protection again by disabling namespace in memory only but other broker can still redirect to this broker.

// disable old bundle in memory
getOwnershipCache().updateBundleState(bundle, false);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
return future;
});

// local-policies have been changed which has contains namespace bundles
pulsar.getLocalZkCacheService().policiesCache()
.registerListener((String path, LocalPolicies data, Stat stat) -> {
String[] paths = path.split(LOCAL_POLICIES_ROOT + "/");
if (paths.length == 2) {
invalidateBundleCache(new NamespaceName(paths[1]));
}
});

if (pulsar != null && pulsar.getConfigurationCache() != null) {
pulsar.getLocalZkCacheService().policiesCache().registerListener(this);
}
Expand Down Expand Up @@ -225,8 +234,8 @@ public static void validateFullRange(SortedSet<String> partitions) {
checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY));
}

public static NamespaceBundleFactory createFactory(HashFunction hashFunc) {
return new NamespaceBundleFactory(null, hashFunc);
public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
return new NamespaceBundleFactory(pulsar, hashFunc);
}

public static boolean isFullBundle(String bundleRange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.cache;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import java.util.concurrent.Executors;
Expand All @@ -32,9 +35,11 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -58,6 +63,13 @@ public void setup() throws Exception {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);

// set mock pulsar localzkcache
LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class);
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache);
when(localZkCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());

doReturn(zkCache).when(pulsar).getLocalZkCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.LocalBrokerData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
Expand All @@ -59,10 +61,12 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -124,7 +128,7 @@ public void testSplitAndOwnBundles() throws Exception {
List<NamespaceBundle> bundleList = updatedNsBundles.getBundles();
assertNotNull(bundles);

NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(Hashing.crc32());
NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());

// (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle
Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(utilityFactory, nsname, bundles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.PulsarService.webAddress;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -31,7 +35,6 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -48,8 +51,10 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.MockZooKeeper;
Expand Down Expand Up @@ -81,7 +86,12 @@ public void setup() throws Exception {
executor = new OrderedSafeExecutor(1, "test");
scheduledExecutor = Executors.newScheduledThreadPool(2);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
when(localCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());

bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.naming.AuthenticationException;
import javax.net.ssl.HttpsURLConnection;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
Expand Down Expand Up @@ -738,6 +740,105 @@ public void start() throws PulsarClientException {
}
}

/**
*
* <pre>
* When broker-1's load-manager splits the bundle and update local-policies, broker-2 should get watch of
* local-policies and update bundleCache so, new lookup can be redirected properly.
*
* (1) Start broker-1 and broker-2
* (2) Make sure broker-2 always assign bundle to broker1
* (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
* (4) Broker-1 will own topic-1
* (5) Split the bundle for topic-1
* (6) Broker-2 should get the watch and update bundle cache
* (7) Make lookup request again to Broker-2 which should succeed.
*
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testSplitUnloadLookupTest() throws Exception {

log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/use/my-ns";
// (1) Start broker-1
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
PulsarService pulsar2 = startBroker(conf2);
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();

pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();

LoadManager loadManager1 = spy(pulsar.getLoadManager().get());
LoadManager loadManager2 = spy(pulsar2.getLoadManager().get());
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);

// (2) Make sure broker-2 always assign bundle to broker1
// mock: redirect request to leader [2]
doReturn(true).when(loadManager2).isCentralized();
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));

URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
PulsarClient pulsarClient2 = PulsarClient.create(broker2ServiceUrl.toString(), new ClientConfiguration());

// (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
final String topic1 = "persistent://" + namespace + "/topic1";
Consumer consumer1 = pulsarClient2.subscribe(topic1, "my-subscriber-name", new ConsumerConfiguration());

Set<String> serviceUnits1 = pulsar.getNamespaceService().getOwnedServiceUnits().stream()
.map(nb -> nb.toString()).collect(Collectors.toSet());

// (4) Broker-1 will own topic-1
final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
Assert.assertTrue(serviceUnits1.contains(unsplitBundle));
// broker-2 should have this bundle into the cache
DestinationName destination = DestinationName.get(topic1);
NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination);
Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle);

// (5) Split the bundle for topic-1
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff");

// (6) Broker-2 should get the watch and update bundle cache
final int retry = 5;
for (int i = 0; i < retry; i++) {
if (pulsar2.getNamespaceService().getBundle(destination).equals(bundleInBroker2) && i != retry - 1) {
Thread.sleep(200);
} else {
break;
}
}

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic2 = "persistent://" + namespace + "/topic2";
Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());

NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
.getBundle(DestinationName.get(topic2));
Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));

consumer1.close();
consumer2.close();
pulsarClient2.close();
pulsar2.close();

}
/**** helper classes ****/

public static class MockAuthenticationProvider implements AuthenticationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@
*/
package org.apache.pulsar.common.naming;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.testng.annotations.Test;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;

public class NamespaceBundleTest {
private final NamespaceBundleFactory factory = NamespaceBundleFactory.createFactory(Hashing.crc32());
private final NamespaceBundleFactory factory = getNamespaceBundleFactory();

@Test
public void testConstructor() {
Expand Down Expand Up @@ -110,6 +118,16 @@ public void testConstructor() {
assertEquals(bundle.getNamespaceObject().toString(), "pulsar/use/ns");
}

private NamespaceBundleFactory getNamespaceBundleFactory() {
PulsarService pulsar = mock(PulsarService.class);
LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class);
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache);
when(localZkCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());
return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
}

@Test
public void testGetBundle() throws Exception {
NamespaceBundle bundle = factory.getBundle(new NamespaceName("pulsar/use/ns1"),
Expand Down
Loading