Skip to content

Commit

Permalink
fix shedding heartbeat ns (#13208)
Browse files Browse the repository at this point in the history
Related to #12252

I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.

1. fix the parttern matching problem
2. add a test case for it

This change is already covered by existing tests.

(cherry picked from commit 78e3d8f)
  • Loading branch information
wuzhanpeng authored and michaeljmarshall committed Feb 11, 2022
1 parent 06219e2 commit 418589b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;

/**
* This class represents all data that could be relevant when making a load management decision.
Expand Down Expand Up @@ -59,6 +62,13 @@ public Map<String, BundleData> getBundleData() {
return bundleData;
}

public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
.filter(e -> !NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
Expand Down Expand Up @@ -98,9 +97,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload

loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& localData.getBundles().contains(e.getKey()))
loadData.getBundleDataForLoadShedding().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
Expand Down Expand Up @@ -92,8 +91,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches())
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,11 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
}
}

public static boolean isSystemServiceNamespace(String namespace) {
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
}

public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public static String getBundleRange(String namespaceBundle) {
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}

public static String getBundleNamespace(String namespaceBundle) {
int index = namespaceBundle.lastIndexOf('/');
if (index != -1) {
try {
return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
} catch (Exception e) {
// return itself if meets invalid format
}
}
return namespaceBundle;
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.pulsar.broker.namespace;

import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -509,6 +511,15 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception {
}
}

@Test
public void testHeartbeatNamespaceMatch() throws Exception {
String namespaceNameString = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
NamespaceName namespaceName = NamespaceName.get(namespaceNameString);
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down

0 comments on commit 418589b

Please sign in to comment.