Skip to content

Commit

Permalink
Rebase + add split-bundle-optimization(avoid not-eligible bundle) + f…
Browse files Browse the repository at this point in the history
…ix testModularLoadManagerSplitBundle
  • Loading branch information
rdhabalia committed Oct 26, 2017
1 parent e43ac65 commit 18073ac
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ResourceQuota;
Expand Down Expand Up @@ -599,12 +600,18 @@ public void checkNamespaceBundleSplit() {
final boolean unloadSplitBundles = pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
synchronized (bundleSplitStrategy) {
final Set<String> bundlesToBeSplit = bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
NamespaceBundleFactory namespaceBundleFactory = pulsar.getNamespaceService().getNamespaceBundleFactory();
for (String bundleName : bundlesToBeSplit) {
try {
log.info("Load-manager splitting budnle {} and unloading {}", bundleName, unloadSplitBundles);
final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName,
LoadManagerShared.getBundleRangeFromBundleName(bundleName), unloadSplitBundles);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName);
if (!namespaceBundleFactory
.canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) {
continue;
}
log.info("Load-manager splitting budnle {} and unloading {}", bundleName, unloadSplitBundles);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, bundleRange,
unloadSplitBundles);
// Make sure the same bundle is not selected again.
loadData.getBundleData().remove(bundleName);
localData.getLastStats().remove(bundleName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;

Expand Down Expand Up @@ -147,6 +148,16 @@ public NamespaceBundle getBundle(NamespaceName nsname, Range<Long> hashRange) {
return new NamespaceBundle(nsname, hashRange, this);
}

public NamespaceBundle getBundle(String namespace, String bundleRange) {
checkArgument(bundleRange.contains("_"), "Invalid bundle range");
String[] boundaries = bundleRange.split("_");
Long lowerEndpoint = Long.decode(boundaries[0]);
Long upperEndpoint = Long.decode(boundaries[1]);
Range<Long> hashRange = Range.range(lowerEndpoint, BoundType.CLOSED, upperEndpoint,
(upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN);
return getBundle(new NamespaceName(namespace), hashRange);
}

public NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundlesCache.synchronous().get(fqnn).getFullBundle();
}
Expand Down Expand Up @@ -231,7 +242,7 @@ public Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundl
return null;
}

private boolean canSplitBundle(NamespaceBundle bundle) {
public boolean canSplitBundle(NamespaceBundle bundle) {
Range<Long> range = bundle.getKeyRange();
return range.upperEndpoint() - range.lowerEndpoint() > 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ public void testSplitBundleWithUnDividedRange() throws Exception {
// split bundles
try {
namespaces.splitNamespaceBundle(testProperty, testLocalCluster, bundledNsLocal, "0x08375b1a_0x08375b1b",
false);
false, false);
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ public void testSplitUnloadLookupTest() throws Exception {

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic2 = "persistent://" + namespace + "/topic2";
Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());
Consumer consumer2 = pulsarClient.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());

NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
.getBundle(DestinationName.get(topic2));
Expand All @@ -848,7 +848,7 @@ public void testSplitUnloadLookupTest() throws Exception {
/**
*
* <pre>
* When broker-1's Modula-rload-manager splits the bundle and update local-policies, broker-2 should get watch of
* When broker-1's Modular-load-manager splits the bundle and update local-policies, broker-2 should get watch of
* local-policies and update bundleCache so, new lookup can be redirected properly.
*
* (1) Start broker-1 and broker-2
Expand Down Expand Up @@ -958,7 +958,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic2 = "persistent://" + namespace + "/topic2";
Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());
Consumer consumer2 = pulsarClient.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());

NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
.getBundle(DestinationName.get(topic2));
Expand Down

0 comments on commit 18073ac

Please sign in to comment.