diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java index a469c5c24ddb9..4243420391993 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java @@ -20,8 +20,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.NamespaceBundle; /** * This class represents all data that could be relevant when making a load management decision. @@ -59,6 +62,13 @@ public Map getBundleData() { return bundleData; } + public Map getBundleDataForLoadShedding() { + return bundleData.entrySet().stream() + .filter(e -> !NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(e.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public Map getRecentlyUnloadedBundles() { return recentlyUnloadedBundles; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index d1b05b4d67752..0141084e022e1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -96,17 +96,16 @@ public Multimap findBundlesForUnloading(final LoadData loadData, if (localData.getBundles().size() > 1) { // Sort bundles by throughput, then pick the biggest N which combined // make up for at least the minimum throughput to offload - - loadData.getBundleData().entrySet().stream() - .filter(e -> localData.getBundles().contains(e.getKey())) - .map((e) -> { - // Map to throughput value - // Consider short-term byte rate to address system resource burden - String bundle = e.getKey(); - BundleData bundleData = e.getValue(); - TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - double throughput = shortTermData.getMsgThroughputIn() + shortTermData - .getMsgThroughputOut(); + loadData.getBundleDataForLoadShedding().entrySet().stream() + .filter(e -> localData.getBundles().contains(e.getKey())) + .map((e) -> { + // Map to throughput value + // Consider short-term byte rate to address system resource burden + String bundle = e.getKey(); + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double throughput = shortTermData.getMsgThroughputIn() + shortTermData + .getMsgThroughputOut(); return Pair.of(bundle, throughput); }).filter(e -> { // Only consider bundles that were not already unloaded recently diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 29734d6a6ffa3..b9b439ac81c7f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -95,12 +95,13 @@ public Multimap findBundlesForUnloading(final LoadData loadData, MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false); if (localData.getBundles().size() > 1) { - loadData.getBundleData().entrySet().stream().map((e) -> { - String bundle = e.getKey(); - BundleData bundleData = e.getValue(); - TimeAverageMessageData shortTermData = bundleData.getShortTermData(); - double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); - return Pair.of(bundle, throughput); + loadData.getBundleDataForLoadShedding().entrySet().stream() + .map((e) -> { + String bundle = e.getKey(); + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.of(bundle, throughput); }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()) ).filter(e -> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ae8d03014d0b9..d7931196d3da1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -140,6 +140,7 @@ public enum AddressType { public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor"; public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); + public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)"); public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)"); public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s"; public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s"; @@ -1360,6 +1361,12 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) { } } + public static boolean isSystemServiceNamespace(String namespace) { + return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches(); + } + public boolean registerSLANamespace() throws PulsarServerException { boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false); if (isNameSpaceRegistered) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 1531095c32212..98dcb93e7d3db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -152,6 +152,18 @@ public static String getBundleRange(String namespaceBundle) { return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + public static String getBundleNamespace(String namespaceBundle) { + int index = namespaceBundle.lastIndexOf('/'); + if (index != -1) { + try { + return NamespaceName.get(namespaceBundle.substring(0, index)).toString(); + } catch (Exception e) { + // return itself if meets invalid format + } + } + return namespaceBundle; + } + public NamespaceBundleFactory getNamespaceBundleFactory() { return factory; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index eeedef570eb3f..916d4e6145a6b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -20,6 +20,9 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; +import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; +import static org.junit.Assert.assertNotEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -509,6 +512,14 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception { } } + @Test + public void testHeartbeatNamespaceMatch() throws Exception { + NamespaceName namespaceName = NamespaceName.get(NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf)); + NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName); + assertTrue(NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(namespaceBundle.toString()))); + } + @SuppressWarnings("unchecked") private Pair> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {