From 78e3d8f7d872746db962be36ad3de49dac1ef015 Mon Sep 17 00:00:00 2001 From: Zhanpeng Wu Date: Mon, 13 Dec 2021 19:44:55 +0800 Subject: [PATCH] fix shedding heartbeat ns (#13208) Related to #12252 ### Motivation I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle. ### Modifications 1. fix the parttern matching problem 2. add a test case for it ### Verifying this change This change is already covered by existing tests. --- .../apache/pulsar/broker/loadbalance/LoadData.java | 10 ++++++++++ .../broker/loadbalance/impl/OverloadShedder.java | 8 ++------ .../broker/loadbalance/impl/ThresholdShedder.java | 6 +----- .../broker/loadbalance/impl/UniformLoadShedder.java | 8 ++------ .../pulsar/broker/namespace/NamespaceService.java | 6 ++++++ .../apache/pulsar/common/naming/NamespaceBundle.java | 12 ++++++++++++ .../broker/namespace/NamespaceServiceTest.java | 10 ++++++++++ 7 files changed, 43 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java index a469c5c24ddb9..4243420391993 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java @@ -20,8 +20,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.NamespaceBundle; /** * This class represents all data that could be relevant when making a load management decision. @@ -59,6 +62,13 @@ public Map getBundleData() { return bundleData; } + public Map getBundleDataForLoadShedding() { + return bundleData.entrySet().stream() + .filter(e -> !NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(e.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public Map getRecentlyUnloadedBundles() { return recentlyUnloadedBundles; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index 3f33fa353c2ab..985ed6fd5f81e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.Map; @@ -102,10 +100,8 @@ public Multimap findBundlesForUnloading(final LoadData loadData, // Sort bundles by throughput, then pick the biggest N which combined // make up for at least the minimum throughput to offload - loadData.getBundleData().entrySet().stream() - .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() - && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches() - && localData.getBundles().contains(e.getKey())) + loadData.getBundleDataForLoadShedding().entrySet().stream() + .filter(e -> localData.getBundles().contains(e.getKey())) .map((e) -> { // Map to throughput value // Consider short-term byte rate to address system resource burden diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 1a17384bb00d2..62a29b9f22d49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.HashMap; @@ -105,9 +103,7 @@ public Multimap findBundlesForUnloading(final LoadData loadData, MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false); if (localData.getBundles().size() > 1) { - loadData.getBundleData().entrySet().stream() - .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() - && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()) + loadData.getBundleDataForLoadShedding().entrySet().stream() .map((e) -> { String bundle = e.getKey(); BundleData bundleData = e.getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java index 8133f432ac4c4..800873a99a5c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.Map; @@ -64,7 +62,7 @@ public class UniformLoadShedder implements LoadSheddingStrategy { public Multimap findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { selectedBundlesCache.clear(); Map brokersData = loadData.getBrokerData(); - Map loadBundleData = loadData.getBundleData(); + Map loadBundleData = loadData.getBundleDataForLoadShedding(); Map recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); MutableObject overloadedBroker = new MutableObject<>(); @@ -126,9 +124,7 @@ public Multimap findBundlesForUnloading(final LoadData loadData, // Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with // under-loaded broker loadBundleData.entrySet().stream() - .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() - && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches() - && overloadedBrokerData.getBundles().contains(e.getKey())) + .filter(e -> overloadedBrokerData.getBundles().contains(e.getKey())) .map((e) -> { String bundle = e.getKey(); BundleData bundleData = e.getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 497b7098692a9..e83eaadaa0aa1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1327,6 +1327,12 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) { } } + public static boolean isSystemServiceNamespace(String namespace) { + return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches(); + } + public boolean registerSLANamespace() throws PulsarServerException { boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false); if (isNameSpaceRegistered) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 1531095c32212..98dcb93e7d3db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -152,6 +152,18 @@ public static String getBundleRange(String namespaceBundle) { return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + public static String getBundleNamespace(String namespaceBundle) { + int index = namespaceBundle.lastIndexOf('/'); + if (index != -1) { + try { + return NamespaceName.get(namespaceBundle.substring(0, index)).toString(); + } catch (Exception e) { + // return itself if meets invalid format + } + } + return namespaceBundle; + } + public NamespaceBundleFactory getNamespaceBundleFactory() { return factory; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index e021d79bcc540..d36568f033768 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.namespace; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import static org.junit.Assert.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -594,6 +596,14 @@ public void testSplitBundleWithHighestThroughput() throws Exception { } } + @Test + public void testHeartbeatNamespaceMatch() throws Exception { + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf); + NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName); + assertTrue(NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(namespaceBundle.toString()))); + } + @SuppressWarnings("unchecked") private Pair> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {