Skip to content

Commit

Permalink
fix shedding heartbeat ns (apache#13208)
Browse files Browse the repository at this point in the history
Related to apache#12252

### Motivation

I found that the problem mentioned in apache#12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.

### Modifications

1. fix the parttern matching problem
2. add a test case for it

### Verifying this change

This change is already covered by existing tests.
  • Loading branch information
wuzhanpeng committed Dec 13, 2021
1 parent ba00002 commit 78e3d8f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;

/**
* This class represents all data that could be relevant when making a load management decision.
Expand Down Expand Up @@ -59,6 +62,13 @@ public Map<String, BundleData> getBundleData() {
return bundleData;
}

public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
.filter(e -> !NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
Expand Down Expand Up @@ -102,10 +100,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload

loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
&& localData.getBundles().contains(e.getKey()))
loadData.getBundleDataForLoadShedding().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
Expand Down Expand Up @@ -105,9 +103,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches())
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
Expand Down Expand Up @@ -64,7 +62,7 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
Map<String, BrokerData> brokersData = loadData.getBrokerData();
Map<String, BundleData> loadBundleData = loadData.getBundleData();
Map<String, BundleData> loadBundleData = loadData.getBundleDataForLoadShedding();
Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();

MutableObject<String> overloadedBroker = new MutableObject<>();
Expand Down Expand Up @@ -126,9 +124,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
// Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
// under-loaded broker
loadBundleData.entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
&& overloadedBrokerData.getBundles().contains(e.getKey()))
.filter(e -> overloadedBrokerData.getBundles().contains(e.getKey()))
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,12 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
}
}

public static boolean isSystemServiceNamespace(String namespace) {
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
}

public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public static String getBundleRange(String namespaceBundle) {
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}

public static String getBundleNamespace(String namespaceBundle) {
int index = namespaceBundle.lastIndexOf('/');
if (index != -1) {
try {
return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
} catch (Exception e) {
// return itself if meets invalid format
}
}
return namespaceBundle;
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.namespace;

import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -594,6 +596,14 @@ public void testSplitBundleWithHighestThroughput() throws Exception {
}
}

@Test
public void testHeartbeatNamespaceMatch() throws Exception {
NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down

0 comments on commit 78e3d8f

Please sign in to comment.