Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] [broker] support broker isolation for uniform load shedder strategy #13080

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "will have 66% msgRate difference and load balancer can unload bundles from broker-1 "
+ "to broker-2)"
)

private double loadBalancerMsgRateDifferenceShedderThreshold = 50;
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,28 @@
package org.apache.pulsar.broker.loadbalance;

import com.google.common.collect.Multimap;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;

/**
* Load management component which determines the criteria for unloading bundles.
*/
public interface LoadSheddingStrategy {

/**
* Recommend that all of the returned bundles be unloaded.
*
* @param loadData
* The load data to used to make the unloading decision.
* @param conf
* The service configuration.
* @return A map from all selected bundles to the brokers on which they reside.
*/
default Multimap<String, String> findBundlesForUnloading(PulsarService pulsar, LoadData loadData,
ServiceConfiguration conf) {
return findBundlesForUnloading(loadData, conf);
}

/**
* Recommend that all of the returned bundles be unloaded.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ public synchronized void doLoadShedding() {
recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);

for (LoadSheddingStrategy strategy : loadSheddingPipeline) {
final Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(loadData, conf);
final Multimap<String, String> bundlesToUnload = strategy.findBundlesForUnloading(pulsar, loadData, conf);

bundlesToUnload.asMap().forEach((broker, bundles) -> {
bundles.forEach(bundle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,23 @@
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,6 +57,46 @@ public class UniformLoadShedder implements LoadSheddingStrategy {

private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();

@Override
public Multimap<String, String> findBundlesForUnloading(PulsarService pulsar, final LoadData loadData,
final ServiceConfiguration conf) {
selectedBundlesCache.clear();

Map<String, Map<String, BrokerData>> isolatedBrokersGroups = new HashMap<>();
String clusterName = conf.getClusterName();
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = pulsar.getPulsarResources().getNamespaceResources()
.getIsolationPolicies().getIsolationDataPolicies(clusterName);
Map<String, NamespaceIsolationDataImpl> nsPolicies = nsPoliciesResult.get().getPolicies();
if (nsPoliciesResult.isPresent()) {
pulsar.getLoadManager().get().getAvailableBrokers().forEach(broker -> {
if (nsPolicies != null) {
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
if (nsPolicyImpl.isPrimaryBroker(broker)) {
BrokerData brokerData = loadData.getBrokerData().get(broker);
Map<String, BrokerData> brokers = isolatedBrokersGroups.computeIfAbsent(name,
(v) -> new HashMap<>());
brokers.put(broker, brokerData);
}
}
});
}
});

isolatedBrokersGroups.forEach((broker, brokersData) -> {
findBundlesForUnloading(brokersData, loadData.getBundleData(),
loadData.getRecentlyUnloadedBundles(), conf, selectedBundlesCache);
});
return selectedBundlesCache;
}
} catch (Exception e) {
log.error("Failed to get namespace isolation-policies {}", clusterName, e);
}
return findBundlesForUnloading(loadData, conf);
}

/**
* Attempt to shed some bundles off every broker which is overloaded.
*
Expand All @@ -63,10 +109,15 @@ public class UniformLoadShedder implements LoadSheddingStrategy {
@Override
public Multimap<String, String> findBundlesForUnloading(final LoadData loadData, final ServiceConfiguration conf) {
selectedBundlesCache.clear();
Map<String, BrokerData> brokersData = loadData.getBrokerData();
Map<String, BundleData> loadBundleData = loadData.getBundleData();
Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
findBundlesForUnloading(loadData.getBrokerData(), loadData.getBundleData(),
loadData.getRecentlyUnloadedBundles(), conf, selectedBundlesCache);
return selectedBundlesCache;
}

public Multimap<String, String> findBundlesForUnloading(Map<String, BrokerData> brokersData,
Map<String, BundleData> loadBundleData,
Map<String, Long> recentlyUnloadedBundles, ServiceConfiguration conf,
Multimap<String, String> selectedBundlesCache) {
MutableObject<String> overloadedBroker = new MutableObject<>();
MutableObject<String> underloadedBroker = new MutableObject<>();
MutableDouble maxMsgRate = new MutableDouble(-1);
Expand All @@ -77,7 +128,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
double msgRate = data.getLocalData().getMsgRateIn() + data.getLocalData().getMsgRateOut();
double throughputRate = data.getLocalData().getMsgThroughputIn()
+ data.getLocalData().getMsgThroughputOut();
if (data.getLocalData().getBundles().size() > 1 // broker with one bundle can't be considered for
if (data.getLocalData().getBundles().size() > 1 // broker with bundle one bundle can't be considered for
// bundle unloading
&& (msgRate > maxMsgRate.getValue() || throughputRate > maxThroughputRate.getValue())) {
overloadedBroker.setValue(broker);
Expand All @@ -95,7 +146,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
// discrepancy is higher than threshold. if that matches then try to unload bundle from overloaded brokers to
// give chance of uniform load distribution.
double msgRateDifferencePercentage = ((maxMsgRate.getValue() - minMsgRate.getValue()) * 100)
/ (minMsgRate.getValue());
/ (Math.max(minMsgRate.getValue(), 1));
double msgThroughputDifferenceRate = maxThroughputRate.getValue() / minThroughputgRate.getValue();

// if the threshold matches then find out how much load needs to be unloaded by considering number of msgRate
Expand All @@ -108,6 +159,7 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();

if (isMsgRateThresholdExceeded || isMsgThroughputThresholdExceeded) {

if (log.isDebugEnabled()) {
log.debug(
"Found bundles for uniform load balancing. "
Expand All @@ -125,6 +177,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
if (overloadedBrokerData.getBundles().size() > 1) {
// Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
// under-loaded broker
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload
loadBundleData.entrySet().stream()
.filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
&& !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()
Expand All @@ -140,6 +194,8 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
}).filter(e -> !recentlyUnloadedBundles.containsKey(e.getLeft()))
.filter(e -> overloadedBrokerData.getBundles().contains(e.getLeft()))
.sorted((e1, e2) -> Double.compare(e2.getRight(), e1.getRight())).forEach((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getLeft();
BundleData bundleData = e.getMiddle();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
Expand All @@ -163,7 +219,6 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
});
}
}

return selectedBundlesCache;
}
}
Loading