diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c65476bf01f64..a13c121b970fd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1797,6 +1797,7 @@ public class ServiceConfiguration implements PulsarConfiguration { + "will have 66% msgRate difference and load balancer can unload bundles from broker-1 " + "to broker-2)" ) + private double loadBalancerMsgRateDifferenceShedderThreshold = 50; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java index 6e102061c9981..a562cf7df0e69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingStrategy.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance; import com.google.common.collect.Multimap; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; /** @@ -26,6 +27,20 @@ */ public interface LoadSheddingStrategy { + /** + * Recommend that all of the returned bundles be unloaded. + * + * @param loadData + * The load data to used to make the unloading decision. + * @param conf + * The service configuration. + * @return A map from all selected bundles to the brokers on which they reside. + */ + default Multimap findBundlesForUnloading(PulsarService pulsar, LoadData loadData, + ServiceConfiguration conf) { + return findBundlesForUnloading(loadData, conf); + } + /** * Recommend that all of the returned bundles be unloaded. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index e4c0266707703..23978fee07f2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -598,7 +598,7 @@ public synchronized void doLoadShedding() { recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout); for (LoadSheddingStrategy strategy : loadSheddingPipeline) { - final Multimap bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf); + final Multimap bundlesToUnload = strategy.findBundlesForUnloading(pulsar, loadData, conf); bundlesToUnload.asMap().forEach((broker, bundles) -> { bundles.forEach(bundle -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java index 8133f432ac4c4..8110114cfe333 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java @@ -22,17 +22,23 @@ import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.commons.lang3.mutable.MutableDouble; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TimeAverageMessageData; import org.apache.pulsar.broker.loadbalance.LoadData; import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; +import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +57,46 @@ public class UniformLoadShedder implements LoadSheddingStrategy { private final Multimap selectedBundlesCache = ArrayListMultimap.create(); + @Override + public Multimap findBundlesForUnloading(PulsarService pulsar, final LoadData loadData, + final ServiceConfiguration conf) { + selectedBundlesCache.clear(); + + Map> isolatedBrokersGroups = new HashMap<>(); + String clusterName = conf.getClusterName(); + try { + Optional nsPoliciesResult = pulsar.getPulsarResources().getNamespaceResources() + .getIsolationPolicies().getIsolationDataPolicies(clusterName); + Map nsPolicies = nsPoliciesResult.get().getPolicies(); + if (nsPoliciesResult.isPresent()) { + pulsar.getLoadManager().get().getAvailableBrokers().forEach(broker -> { + if (nsPolicies != null) { + nsPolicies.forEach((name, policyData) -> { + NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData); + if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) { + if (nsPolicyImpl.isPrimaryBroker(broker)) { + BrokerData brokerData = loadData.getBrokerData().get(broker); + Map brokers = isolatedBrokersGroups.computeIfAbsent(name, + (v) -> new HashMap<>()); + brokers.put(broker, brokerData); + } + } + }); + } + }); + + isolatedBrokersGroups.forEach((broker, brokersData) -> { + findBundlesForUnloading(brokersData, loadData.getBundleData(), + loadData.getRecentlyUnloadedBundles(), conf, selectedBundlesCache); + }); + return selectedBundlesCache; + } + } catch (Exception e) { + log.error("Failed to get namespace isolation-policies {}", clusterName, e); + } + return findBundlesForUnloading(loadData, conf); + } + /** * Attempt to shed some bundles off every broker which is overloaded. * @@ -63,10 +109,15 @@ public class UniformLoadShedder implements LoadSheddingStrategy { @Override public Multimap findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) { selectedBundlesCache.clear(); - Map brokersData = loadData.getBrokerData(); - Map loadBundleData = loadData.getBundleData(); - Map recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles(); + findBundlesForUnloading(loadData.getBrokerData(), loadData.getBundleData(), + loadData.getRecentlyUnloadedBundles(), conf, selectedBundlesCache); + return selectedBundlesCache; + } + public Multimap findBundlesForUnloading(Map brokersData, + Map loadBundleData, + Map recentlyUnloadedBundles, ServiceConfiguration conf, + Multimap selectedBundlesCache) { MutableObject overloadedBroker = new MutableObject<>(); MutableObject underloadedBroker = new MutableObject<>(); MutableDouble maxMsgRate = new MutableDouble(-1); @@ -77,7 +128,7 @@ public Multimap findBundlesForUnloading(final LoadData loadData, double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut(); double throughputRate = data.getLocalData().getMsgThroughputIn() + data.getLocalData().getMsgThroughputOut(); - if (data.getLocalData().getBundles().size() > 1 // broker with one bundle can't be considered for + if (data.getLocalData().getBundles().size() > 1 // broker with bundle one bundle can't be considered for // bundle unloading && (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue())) { overloadedBroker.setValue(broker); @@ -95,7 +146,7 @@ public Multimap findBundlesForUnloading(final LoadData loadData, // discrepancy is higher than threshold. if that matches then try to unload bundle from overloaded brokers to // give chance of uniform load distribution. double msgRateDifferencePercentage = ((maxMsgRate.getValue() - minMsgRate.getValue()) * 100) - / (minMsgRate.getValue()); + / (Math.max(minMsgRate.getValue(), 1)); double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputgRate.getValue(); // if the threshold matches then find out how much load needs to be unloaded by considering number of msgRate @@ -108,6 +159,7 @@ public Multimap findBundlesForUnloading(final LoadData loadData, .getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold(); if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) { + if (log.isDebugEnabled()) { log.debug( "Found bundles for uniform load balancing. " @@ -125,6 +177,8 @@ public Multimap findBundlesForUnloading(final LoadData loadData, if (overloadedBrokerData.getBundles().size() > 1) { // Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with // under-loaded broker + // Sort bundles by throughput, then pick the biggest N which combined + // make up for at least the minimum throughput to offload loadBundleData.entrySet().stream() .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches() @@ -140,6 +194,8 @@ public Multimap findBundlesForUnloading(final LoadData loadData, }).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft())) .filter(e -> overloadedBrokerData.getBundles().contains(e.getLeft())) .sorted((e1, e2) -> Double.compare(e2.getRight(), e1.getRight())).forEach((e) -> { + // Map to throughput value + // Consider short-term byte rate to address system resource burden String bundle = e.getLeft(); BundleData bundleData = e.getMiddle(); TimeAverageMessageData shortTermData = bundleData.getShortTermData(); @@ -163,7 +219,6 @@ public Multimap findBundlesForUnloading(final LoadData loadData, }); } } - return selectedBundlesCache; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java new file mode 100644 index 0000000000000..d24b096a64c31 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedderTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.pulsar.broker.BrokerData; +import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TimeAverageMessageData; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyResources; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.impl.MinAvailablePolicy; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +@Test(groups = "broker") +public class UniformLoadShedderTest { + + private final UniformLoadShedder shedder = new UniformLoadShedder(); + private final ServiceConfiguration conf; + + public UniformLoadShedderTest() { + conf = new ServiceConfiguration(); + } + + @Test + public void testBrokerWithSingleBundle() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + broker1.setBandwidthIn(new ResourceUsage(999, 1000)); + broker1.setBandwidthOut(new ResourceUsage(999, 1000)); + broker1.setBundles(Sets.newHashSet("bundle-1")); + + BundleData bundle1 = new BundleData(); + TimeAverageMessageData db1 = new TimeAverageMessageData(); + db1.setMsgThroughputIn(1000); + db1.setMsgThroughputOut(1000); + bundle1.setShortTermData(db1); + loadData.getBundleData().put("bundle-1", bundle1); + + loadData.getBrokerData().put("broker-1", new BrokerData(broker1)); + + assertTrue(shedder.findBundlesForUnloading(loadData, conf).isEmpty()); + } + + @Test + public void testBrokerWithMsgRateIn() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + LocalBrokerData broker2 = new LocalBrokerData(); + LocalBrokerData broker3 = new LocalBrokerData(); + + // broker-1 with 80K msgRate + updateBundleData(loadData, "broker1", broker1, "bundle11", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle12", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle13", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle14", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle15", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle16", 10000, 10000, true); + // broker-2 with 40K msgRate + updateBundleData(loadData, "broker2", broker2, "bundle21", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle22", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle23", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle24", 5000, 5000, true); + // broker-3 with 20K msgRate + updateBundleData(loadData, "broker3", broker3, "bundle31", 5000, 5000, true); + updateBundleData(loadData, "broker3", broker3, "bundle32", 5000, 5000, true); + + Multimap bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf); + + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker1"), Lists.newArrayList("bundle14", "bundle13")); + } + + @Test + public void testBrokerWithMsgRateOut() { + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + LocalBrokerData broker2 = new LocalBrokerData(); + LocalBrokerData broker3 = new LocalBrokerData(); + + // broker-1 with 700MB throughput + updateBundleData(loadData, "broker1", broker1, "bundle11", 50, 50, false); + updateBundleData(loadData, "broker1", broker1, "bundle12", 100, 100, false); + updateBundleData(loadData, "broker1", broker1, "bundle13", 50, 100, false); + updateBundleData(loadData, "broker1", broker1, "bundle14", 100, 100, false); + updateBundleData(loadData, "broker1", broker1, "bundle15", 100, 100, false); + // broker-2 with 400MB throughput + updateBundleData(loadData, "broker2", broker2, "bundle21", 50, 50, false); + updateBundleData(loadData, "broker2", broker2, "bundle22", 50, 50, false); + updateBundleData(loadData, "broker2", broker2, "bundle23", 50, 50, false); + updateBundleData(loadData, "broker2", broker2, "bundle24", 50, 50, false); + // broker-3 with 200MB throughput + updateBundleData(loadData, "broker3", broker3, "bundle31", 50, 50, false); + updateBundleData(loadData, "broker3", broker3, "bundle32", 50, 50, false); + + Multimap bundlesToUnload = shedder.findBundlesForUnloading(loadData, conf); + + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker1"), Lists.newArrayList("bundle14", "bundle11")); + } + + @Test + public void testBrokerWithoutIsolationGroup() throws Exception { + conf.setClusterName("test"); + PulsarService pulsar = mock(PulsarService.class); + PulsarResources resources = mock(PulsarResources.class); + doReturn(resources).when(pulsar).getPulsarResources(); + NamespaceResources nsResouce = mock(NamespaceResources.class); + doReturn(nsResouce).when(resources).getNamespaceResources(); + IsolationPolicyResources isolationPolicies = mock(IsolationPolicyResources.class); + doReturn(isolationPolicies).when(nsResouce).getIsolationPolicies(); + NamespaceIsolationPolicies nsIsolation = mock(NamespaceIsolationPolicies.class); + Optional policies = Optional.empty(); + doReturn(policies).when(isolationPolicies).getIsolationDataPolicies(anyString()); + + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + LocalBrokerData broker2 = new LocalBrokerData(); + LocalBrokerData broker3 = new LocalBrokerData(); + + // broker-1 with 80K msgRate + updateBundleData(loadData, "broker1", broker1, "bundle11", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle12", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle13", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle14", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle15", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle16", 10000, 10000, true); + // broker-2 with 40K msgRate + updateBundleData(loadData, "broker2", broker2, "bundle21", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle22", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle23", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle24", 5000, 5000, true); + // broker-3 with 20K msgRate + updateBundleData(loadData, "broker3", broker3, "bundle31", 5000, 5000, true); + updateBundleData(loadData, "broker3", broker3, "bundle32", 5000, 5000, true); + + Multimap bundlesToUnload = shedder.findBundlesForUnloading(pulsar, loadData, conf); + + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker1"), Lists.newArrayList("bundle14", "bundle13")); + } + + @Test + public void testBrokerWithIsolationGroup() throws Exception { + + conf.setClusterName("test"); + PulsarService pulsar = mock(PulsarService.class); + PulsarResources resources = mock(PulsarResources.class); + doReturn(resources).when(pulsar).getPulsarResources(); + NamespaceResources nsResouce = mock(NamespaceResources.class); + doReturn(nsResouce).when(resources).getNamespaceResources(); + IsolationPolicyResources isolationPolicies = mock(IsolationPolicyResources.class); + doReturn(isolationPolicies).when(nsResouce).getIsolationPolicies(); + // get available brokers + LoadManager manager = mock(LoadManager.class); + doReturn(Sets.newHashSet("broker1", "broker2", "broker3")).when(manager).getAvailableBrokers(); + AtomicReference loadManager = new AtomicReference<>(manager); + doReturn(loadManager).when(pulsar).getLoadManager(); + // isolation + NamespaceIsolationPolicies nsIsolation = mock(NamespaceIsolationPolicies.class); + Optional policies = Optional.of(nsIsolation); + NamespaceIsolationDataImpl isolationImpl = mock(NamespaceIsolationDataImpl.class); + doReturn(Collections.singletonMap("policy1", isolationImpl)).when(nsIsolation).getPolicies(); + doReturn(policies).when(isolationPolicies).getIsolationDataPolicies(anyString()); + doReturn(Lists.newArrayList()).when(isolationImpl).getSecondary(); + AutoFailoverPolicyType type = AutoFailoverPolicyType.min_available; + Map params = new HashMap<>(); + params.put(MinAvailablePolicy.MIN_LIMIT_KEY, "10"); + params.put(MinAvailablePolicy.USAGE_THRESHOLD_KEY, "10"); + doReturn(AutoFailoverPolicyData.builder().policyType(type).parameters(params).build()).when(isolationImpl) + .getAutoFailoverPolicy(); + + Optional nsPoliciesResult = pulsar.getPulsarResources().getNamespaceResources() + .getIsolationPolicies().getIsolationDataPolicies("test"); + + doReturn(Lists.newArrayList()).when(isolationImpl).getNamespaces(); + + LoadData loadData = new LoadData(); + + LocalBrokerData broker1 = new LocalBrokerData(); + LocalBrokerData broker2 = new LocalBrokerData(); + LocalBrokerData broker3 = new LocalBrokerData(); + + // broker-1 with 80K msgRate + updateBundleData(loadData, "broker1", broker1, "bundle11", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle12", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle13", 5000, 5000, true); + updateBundleData(loadData, "broker1", broker1, "bundle14", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle15", 10000, 10000, true); + updateBundleData(loadData, "broker1", broker1, "bundle16", 10000, 10000, true); + // broker-2 with 40K msgRate + updateBundleData(loadData, "broker2", broker2, "bundle21", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle22", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle23", 5000, 5000, true); + updateBundleData(loadData, "broker2", broker2, "bundle24", 5000, 5000, true); + // broker-3 with 20K msgRate + updateBundleData(loadData, "broker3", broker3, "bundle31", 5000, 5000, true); + updateBundleData(loadData, "broker3", broker3, "bundle32", 5000, 5000, true); + + // (1) isolation broker2,3 + doReturn(Lists.newArrayList("broker2", "broker3")).when(isolationImpl).getPrimary(); + Multimap bundlesToUnload = shedder.findBundlesForUnloading(pulsar, loadData, conf); + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker2"), Lists.newArrayList("bundle24")); + + // (2) isolation broker1,3 + doReturn(Lists.newArrayList("broker1", "broker3")).when(isolationImpl).getPrimary(); + bundlesToUnload = shedder.findBundlesForUnloading(pulsar, loadData, conf); + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker1"), Lists.newArrayList("bundle14", "bundle13")); + + // (3) isolation broker1,2 + doReturn(Lists.newArrayList("broker1", "broker2")).when(isolationImpl).getPrimary(); + bundlesToUnload = shedder.findBundlesForUnloading(pulsar, loadData, conf); + assertFalse(bundlesToUnload.isEmpty()); + assertEquals(bundlesToUnload.get("broker1"), Lists.newArrayList("bundle14")); + } + + private void updateBundleData(LoadData loadData, String brokerName, LocalBrokerData brokerData, String bundleName, + int rateIn, int rateOut, boolean msgRate) { + Map bundleData = loadData.getBundleData(); + BundleData bundle1 = new BundleData(); + TimeAverageMessageData bundleData1 = new TimeAverageMessageData(); + if (msgRate) { + bundleData1.setMsgRateIn(rateIn); + bundleData1.setMsgRateOut(rateOut); + bundle1.setShortTermData(bundleData1); + } else { + bundleData1.setMsgThroughputIn(rateIn); + bundleData1.setMsgThroughputOut(rateOut); + bundle1.setShortTermData(bundleData1); + } + bundleData.put(bundleName, bundle1); + + if (msgRate) { + brokerData.setMsgRateIn(brokerData.getMsgRateIn() + rateIn); + brokerData.setMsgRateOut(brokerData.getMsgRateOut() + rateOut); + } else { + brokerData.setMsgThroughputIn(brokerData.getMsgThroughputIn() + rateIn); + ; + brokerData.setMsgThroughputOut(brokerData.getMsgThroughputOut() + rateOut); + } + brokerData.getBundles().add(bundleName); + + BrokerData brokerData1 = new BrokerData(brokerData); + loadData.getBrokerData().put(brokerName, brokerData1); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java index 6d9cece4ae5f2..f923ae82a5160 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/impl/MinAvailablePolicy.java @@ -32,8 +32,8 @@ @Data @AllArgsConstructor public class MinAvailablePolicy extends AutoFailoverPolicy { - private static final String MIN_LIMIT_KEY = "min_limit"; - private static final String USAGE_THRESHOLD_KEY = "usage_threshold"; + public static final String MIN_LIMIT_KEY = "min_limit"; + public static final String USAGE_THRESHOLD_KEY = "usage_threshold"; private static final int MAX_USAGE_THRESHOLD = 100; @SuppressWarnings("checkstyle:MemberName")