Skip to content

Commit

Permalink
fix shedding heartbeat ns (apache#13208)
Browse files Browse the repository at this point in the history
Related to apache#12252

I found that the problem mentioned in apache#12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.

1. fix the parttern matching problem
2. add a test case for it

This change is already covered by existing tests.

(cherry picked from commit 78e3d8f)
  • Loading branch information
wuzhanpeng authored and lhotari committed May 5, 2022
1 parent e076367 commit a1b2dc4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;

/**
* This class represents all data that could be relevant when making a load management decision.
Expand Down Expand Up @@ -60,6 +62,13 @@ public Map<String, BundleData> getBundleData() {
return bundleData;
}

public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
.filter(e -> !NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

Expand Down Expand Up @@ -99,10 +97,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
if (localData.getBundles().size() > 1) {
// Sort bundles by throughput, then pick the biggest N which combined make up for at least the minimum throughput to offload

loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
&& localData.getBundles().contains(e.getKey()))
loadData.getBundleDataForLoadShedding().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

Expand Down Expand Up @@ -98,9 +96,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,11 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
}
}

public static boolean isSystemServiceNamespace(String namespace) {
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
}

public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ public static String getBundleRange(String namespaceBundle) {
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}

public static String getBundleNamespace(String namespaceBundle) {
int index = namespaceBundle.lastIndexOf('/');
if (index != -1) {
try {
return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
} catch (Exception e) {
// return itself if meets invalid format
}
}
return namespaceBundle;
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
Expand All @@ -48,7 +46,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -502,6 +499,16 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception {
}
}

@Test
public void testHeartbeatNamespaceMatch() throws Exception {
NamespaceName namespaceName =
NamespaceName.get(NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf));
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down

0 comments on commit a1b2dc4

Please sign in to comment.