Skip to content

Commit

Permalink
Use Optional<String> for returning least loaded broker (#1122)
Browse files Browse the repository at this point in the history
* Use Optional<String> for returning least loaded broker

* Addressed comments

* Fixed unit tests

* More test fixes
  • Loading branch information
merlimat committed Jan 30, 2018
1 parent 53619ff commit d6d4fa3
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 123 deletions.
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance;

import java.util.List;
import java.util.Optional;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -55,7 +56,7 @@ public interface LoadManager {
/**
* Returns the Least Loaded Resource Unit decided by some algorithm or criteria which is implementation specific
*/
ResourceUnit getLeastLoaded(ServiceUnitId su) throws Exception;
Optional<ResourceUnit> getLeastLoaded(ServiceUnitId su) throws Exception;

/**
* Generate the load report
Expand Down
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.Optional;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.ServiceUnitId;
Expand All @@ -33,7 +35,7 @@ public interface ModularLoadManager {

/**
* As any broker, disable the broker this manager is running on.
*
*
* @throws PulsarServerException
* If ZooKeeper failed to disable the broker.
*/
Expand All @@ -57,24 +59,24 @@ public interface ModularLoadManager {

/**
* As the leader broker, find a suitable broker for the assignment of the given bundle.
*
*
* @param serviceUnit
* ServiceUnitId for the bundle.
* @return The name of the selected broker, as it appears on ZooKeeper.
*/
String selectBrokerForAssignment(ServiceUnitId serviceUnit);
Optional<String> selectBrokerForAssignment(ServiceUnitId serviceUnit);

/**
* As any broker, start the load manager.
*
*
* @throws PulsarServerException
* If an unexpected error prevented the load manager from being started.
*/
void start() throws PulsarServerException;

/**
* As any broker, stop the load manager.
*
*
* @throws PulsarServerException
* If an unexpected error occurred when attempting to stop the load manager.
*/
Expand All @@ -97,7 +99,7 @@ public interface ModularLoadManager {

/**
* Return :{@link Deserializer} to deserialize load-manager load report
*
*
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.BundleData;
Expand All @@ -32,7 +33,7 @@ public interface ModularLoadManagerStrategy {

/**
* Find a suitable broker to assign the given bundle to.
*
*
* @param candidates
* The candidates for which the bundle may be assigned.
* @param bundleToAssign
Expand All @@ -43,12 +44,12 @@ public interface ModularLoadManagerStrategy {
* The service configuration.
* @return The name of the selected broker as it appears on ZooKeeper.
*/
String selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
Optional<String> selectBroker(Set<String> candidates, BundleData bundleToAssign, LoadData loadData,
ServiceConfiguration conf);

/**
* Create a placement strategy using the configuration.
*
*
* @param conf
* ServiceConfiguration to use.
* @return A placement strategy from the given configurations.
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.ArrayList;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

Expand Down Expand Up @@ -57,7 +58,7 @@ private static double getScore(final BrokerData brokerData, final ServiceConfigu
log.warn("Broker {} is overloaded: max usage={}", brokerData.getLocalData().getWebServiceUrl(), maxUsage);
return Double.POSITIVE_INFINITY;
}

double totalMessageRate = 0;
for (BundleData bundleData : brokerData.getPreallocatedBundleData().values()) {
final TimeAverageMessageData longTermData = bundleData.getLongTermData();
Expand All @@ -79,7 +80,7 @@ private static double getScore(final BrokerData brokerData, final ServiceConfigu

/**
* Find a suitable broker to assign the given bundle to.
*
*
* @param candidates
* The candidates for which the bundle may be assigned.
* @param bundleToAssign
Expand All @@ -91,7 +92,7 @@ private static double getScore(final BrokerData brokerData, final ServiceConfigu
* @return The name of the selected broker as it appears on ZooKeeper.
*/
@Override
public String selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
public Optional<String> selectBroker(final Set<String> candidates, final BundleData bundleToAssign, final LoadData loadData,
final ServiceConfiguration conf) {
bestBrokers.clear();
double minScore = Double.POSITIVE_INFINITY;
Expand Down Expand Up @@ -125,6 +126,12 @@ public String selectBroker(final Set<String> candidates, final BundleData bundle
// Assign randomly in this case.
bestBrokers.addAll(candidates);
}
return bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size()));

if (bestBrokers.isEmpty()) {
// If still, it means there are no available brokers at this point
return Optional.empty();
}

return Optional.of(bestBrokers.get(ThreadLocalRandom.current().nextInt(bestBrokers.size())));
}
}
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -131,7 +130,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach

// Strategy to use for splitting bundles.
private BundleSplitStrategy bundleSplitStrategy;

// Service configuration belonging to the pulsar service.
private ServiceConfiguration conf;

Expand Down Expand Up @@ -174,10 +173,10 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach

// ZooKeeper belonging to the pulsar service.
private ZooKeeper zkClient;

// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;

private Map<String, String> brokerToFailureDomainMap;

private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
Expand All @@ -197,7 +196,7 @@ public ModularLoadManagerImpl() {
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();

this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -253,7 +252,7 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
}

bundleSplitStrategy = new BundleSplitterTask(pulsar);

conf = pulsar.getConfiguration();

// Initialize the default stats to assume for unseen bundles (hard-coded for now).
Expand All @@ -273,12 +272,12 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
localData.setPersistentTopicsEnabled(pulsar.getConfiguration().isEnablePersistentTopics());
localData.setNonPersistentTopicsEnabled(pulsar.getConfiguration().isEnableNonPersistentTopics());


placementStrategy = ModularLoadManagerStrategy.create(conf);
policies = new SimpleResourceAllocationPolicies(pulsar);
zkClient = pulsar.getZkClient();
filterPipeline.add(new BrokerVersionFilter());

refreshBrokerToFailureDomainMap();
// register listeners for domain changes
pulsar.getConfigurationCache().failureDomainListCache()
Expand Down Expand Up @@ -591,7 +590,7 @@ public synchronized void doLoadShedding() {
for (Map.Entry<String, String> entry : bundlesToUnload.entrySet()) {
final String broker = entry.getKey();
final String bundle = entry.getValue();

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
if(!shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
Expand Down Expand Up @@ -640,7 +639,7 @@ public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle
*/
@Override
public void checkNamespaceBundleSplit() {

if (!conf.isLoadBalancerAutoBundleSplitEnabled() || pulsar.getLeaderElectionService() == null
|| !pulsar.getLeaderElectionService().isLeader()) {
return;
Expand Down Expand Up @@ -673,7 +672,7 @@ public void checkNamespaceBundleSplit() {
}
}
}

}

/**
Expand All @@ -692,13 +691,13 @@ public void onUpdate(final String path, final LocalBrokerData data, final Stat s
* @return The name of the selected broker, as it appears on ZooKeeper.
*/
@Override
public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
public Optional<String> selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// Use brokerCandidateCache as a lock to reduce synchronization.
synchronized (brokerCandidateCache) {
final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
return Optional.of(preallocatedBundleToBroker.get(bundle));
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle,
key -> getBundleDataOrDefault(bundle));
Expand Down Expand Up @@ -737,13 +736,18 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
}

// Choose a broker among the potentially smaller filtered list, when possible
String broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
Optional<String> broker = placementStrategy.selectBroker(brokerCandidateCache, data, loadData, conf);
if (log.isDebugEnabled()) {
log.debug("Selected broker {} from candidate brokers {}", broker, brokerCandidateCache);
}

if (!broker.isPresent()) {
// No brokers available
return broker;
}

final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
final double maxUsage = loadData.getBrokerData().get(broker).getLocalData().getMaxResourceUsage();
final double maxUsage = loadData.getBrokerData().get(broker.get()).getLocalData().getMaxResourceUsage();
if (maxUsage > overloadThreshold) {
// All brokers that were in the filtered list were overloaded, so check if there is a better broker
LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, brokerCandidateCache, getAvailableBrokers(),
Expand All @@ -752,12 +756,12 @@ public String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
}

// Add new bundle to preallocated.
loadData.getBrokerData().get(broker).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker);
loadData.getBrokerData().get(broker.get()).getPreallocatedBundleData().put(bundle, data);
preallocatedBundleToBroker.put(bundle, broker.get());

final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle);
brokerToNamespaceToBundleRange.get(broker).computeIfAbsent(namespaceName, k -> new HashSet<>())
brokerToNamespaceToBundleRange.get(broker.get()).computeIfAbsent(namespaceName, k -> new HashSet<>())
.add(bundleRange);
return broker;
}
Expand Down Expand Up @@ -827,7 +831,7 @@ public void stop() throws PulsarServerException {

/**
* As any broker, retrieve the namespace bundle stats and system resource usage to update data local to this broker.
* @return
* @return
*/
@Override
public LocalBrokerData updateLocalBrokerData() {
Expand Down Expand Up @@ -902,7 +906,7 @@ public void writeBundleDataOnZooKeeper() {
}
}
}

private void deleteBundleDataFromZookeeper(String bundle) {
final String zooKeeperPath = getBundleDataZooKeeperPath(bundle);
try {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -63,9 +64,14 @@ public LoadManagerReport generateLoadReport() {
}

@Override
public ResourceUnit getLeastLoaded(final ServiceUnitId serviceUnit) {
return new SimpleResourceUnit(String.format("http://%s", loadManager.selectBrokerForAssignment(serviceUnit)),
new PulsarResourceDescription());
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
if (leastLoadedBroker.isPresent()) {
return Optional.of(new SimpleResourceUnit(String.format("http://%s", leastLoadedBroker.get()),
new PulsarResourceDescription()));
} else {
return Optional.empty();
}
}

@Override
Expand Down Expand Up @@ -112,7 +118,7 @@ public void writeResourceQuotasToZooKeeper() {
public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
return loadManager.getLoadReportDeserializer();
}

public ModularLoadManager getLoadManager() {
return loadManager;
}
Expand Down

0 comments on commit d6d4fa3

Please sign in to comment.