Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Bugs Introduced by New Load Manager #332

Merged
merged 21 commits into from
Apr 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.TimeAverageMessageData;
Expand All @@ -50,7 +51,7 @@ public LeastLongTermMessageRate(final ServiceConfiguration conf) {
// max_usage < overload_threshold ? 1 / (overload_threshold - max_usage): Inf
// This weight attempts to discourage the placement of bundles on brokers whose system resource usage is high.
private static double getScore(final BrokerData brokerData, final ServiceConfiguration conf) {
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100;
final double overloadThreshold = conf.getLoadBalancerBrokerOverloadedThresholdPercentage() / 100.0;
double totalMessageRate = 0;
for (BundleData bundleData : brokerData.getPreallocatedBundleData().values()) {
final TimeAverageMessageData longTermData = bundleData.getLongTermData();
Expand All @@ -61,14 +62,10 @@ private static double getScore(final BrokerData brokerData, final ServiceConfigu
if (maxUsage > overloadThreshold) {
return Double.POSITIVE_INFINITY;
}
// 1 / weight is the proportion of load this machine should receive in
// proportion to a machine with no system resource burden.
// This attempts to spread out the load in such a way that
// machines only become overloaded if there is too much
// load for the system to handle (e.g., all machines are
// at least nearly overloaded).
final double weight = maxUsage < overloadThreshold ? 1 / (overloadThreshold - maxUsage)
: Double.POSITIVE_INFINITY;
// 1 / weight is the proportion of load this machine should receive in proportion to a machine with no system
// resource burden. This attempts to spread out the load in such a way that machines only become overloaded if
// there is too much load for the system to handle (e.g., all machines are at least nearly overloaded).
final double weight = 1 / (overloadThreshold - maxUsage);
final double totalMessageRateEstimate = totalMessageRate + timeAverageData.getLongTermMsgRateIn()
+ timeAverageData.getLongTermMsgRateOut();
return weight * totalMessageRateEstimate;
Expand All @@ -95,16 +92,26 @@ public String selectBroker(final Set<String> candidates, final BundleData bundle
// Maintain of list of all the best scoring brokers and then randomly
// select one of them at the end.
for (String broker : candidates) {
final double score = getScore(loadData.getBrokerData().get(broker), conf);
log.info("{} got score {}", broker, score);
final BrokerData brokerData = loadData.getBrokerData().get(broker);
final double score = getScore(brokerData, conf);
if (score == Double.POSITIVE_INFINITY) {
final LocalBrokerData localData = brokerData.getLocalData();
log.warn(
"Broker {} is overloaded: CPU: {}%, MEMORY: {}%, DIRECT MEMORY: {}%, BANDWIDTH IN: {}%, "
+ "BANDWIDTH OUT: {}%",
broker, localData.getCpu().percentUsage(), localData.getMemory().percentUsage(),
localData.getDirectMemory().percentUsage(), localData.getBandwidthIn().percentUsage(),
localData.getBandwidthOut().percentUsage());

}
log.debug("{} got score {}", broker, score);
if (score < minScore) {
// Clear best brokers since this score beats the other brokers.
bestBrokers.clear();
bestBrokers.add(broker);
minScore = score;
} else if (score == minScore) {
// Add this broker to best brokers since it ties with the best
// score.
// Add this broker to best brokers since it ties with the best score.
bestBrokers.add(broker);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ private void updateAllBrokerData() {
brokerDataMap.put(broker, new BrokerData(localData));
}
} catch (Exception e) {
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e);
log.warn("Error reading broker data from cache for broker - [{}], [{}]", broker, e.getMessage());
}
}
} catch (Exception e) {
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]", e);
log.warn("Error reading active brokers list from zookeeper while updating broker data [{}]", e.getMessage());
}
}

Expand Down Expand Up @@ -486,19 +486,15 @@ public void onUpdate(final String path, final LocalBrokerData data, final Stat s
*/
@Override
public synchronized String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
// ?: Is it too inefficient to make this synchronized? If so, it may be
// a good idea to use weighted random
// or atomic data.

final String bundle = serviceUnit.toString();
if (preallocatedBundleToBroker.containsKey(bundle)) {
// If the given bundle is already in preallocated, return the
// selected broker.
// If the given bundle is already in preallocated, return the selected broker.
return preallocatedBundleToBroker.get(bundle);
}
final BundleData data = loadData.getBundleData().computeIfAbsent(bundle, key -> getBundleDataOrDefault(bundle));
brokerCandidateCache.clear();
LoadManagerShared.applyPolicies(serviceUnit, policies, brokerCandidateCache, loadData.getBrokerData().keySet());
log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle);

// Use the filter pipeline to finalize broker candidates.
for (BrokerFilter filter : filterPipeline) {
Expand Down Expand Up @@ -531,6 +527,9 @@ public void start() throws PulsarServerException {
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, localData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException e) {
// Node may already be created by another load manager: in this case update the data.
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
} catch (Exception e) {
// Catching exception here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
Expand All @@ -554,7 +553,10 @@ public void start() throws PulsarServerException {
*/
@Override
public void stop() throws PulsarServerException {
// Do nothing.
availableActiveBrokers.close();
brokerDataCache.clear();
brokerDataCache.close();
scheduler.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ public void start() throws PulsarServerException {
try {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath,
loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (KeeperException.NodeExistsException e) {
// Node may already be created by another load manager: in this case update the data.
if (loadReport != null) {
pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1);
}

} catch (Exception e) {
// Catching excption here to print the right error message
log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e);
Expand Down Expand Up @@ -1423,6 +1429,9 @@ public void doNamespaceBundleSplit() throws Exception {

@Override
public void stop() throws PulsarServerException {
// do nothing
loadReportCacheZk.clear();
loadReportCacheZk.close();
availableActiveBrokers.close();
scheduler.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static com.yahoo.pulsar.broker.web.PulsarWebResource.joinPath;
import static com.yahoo.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
Expand All @@ -36,7 +37,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback.StatCallback;
Expand All @@ -46,13 +46,12 @@

import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.PulsarServerException;
import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.lookup.LookupResult;
import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
Expand All @@ -68,11 +67,12 @@
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.policies.data.LocalPolicies;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ServiceLookupData;
import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache.Deserializer;
import static com.yahoo.pulsar.broker.admin.AdminResource.jsonMapper;

/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,21 @@ public static final void initZK(ZooKeeper zkc, String selfBrokerUrl) {
*/
private static final void cleanupNamespaceNodes(ZooKeeper zkc, String root, String selfBrokerUrl) throws Exception {
// we don't need a watch here since we are only cleaning up the stale ephemeral nodes from previous session
for (String node : zkc.getChildren(root, false)) {
String currentPath = root + "/" + node;
// retrieve the content and try to decode with ServiceLookupData
List<String> children = zkc.getChildren(currentPath, false);
if (children.size() == 0) {
// clean up a single namespace node
cleanupSingleNamespaceNode(zkc, currentPath, selfBrokerUrl);
} else {
// this is an intermediate node, which means this is v2 namespace path
cleanupNamespaceNodes(zkc, currentPath, selfBrokerUrl);
try {
for (String node : zkc.getChildren(root, false)) {
String currentPath = root + "/" + node;
// retrieve the content and try to decode with ServiceLookupData
List<String> children = zkc.getChildren(currentPath, false);
if (children.size() == 0) {
// clean up a single namespace node
cleanupSingleNamespaceNode(zkc, currentPath, selfBrokerUrl);
} else {
// this is an intermediate node, which means this is v2 namespace path
cleanupNamespaceNodes(zkc, currentPath, selfBrokerUrl);
}
}
} catch (NoNodeException nne) {
LOG.info("No children for [{}]", nne.getPath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ private void updateConfigurationAndRegisterListeners() {
try {
final LoadManager newLoadManager = LoadManager.create(pulsar);
log.info("Created load manager: {}", className);
pulsar.getLoadManager().get().disableBroker();
pulsar.getLoadManager().get().stop();
newLoadManager.start();
pulsar.getLoadManager().set(newLoadManager);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.test.PortManager;
Expand Down Expand Up @@ -85,6 +87,7 @@
import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble;
import com.yahoo.pulsar.zookeeper.LocalZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperCache;
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

/**
* Start two brokers in the same cluster and have them connect to the same zookeeper. When the PulsarService starts, it
Expand Down Expand Up @@ -279,7 +282,8 @@ public void testUpdateLoadReportAndCheckUpdatedRanking() throws Exception {

private AtomicReference<Map<Long, Set<ResourceUnit>>> getSortedRanking(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass().getDeclaredField("sortedRankings");
Field ranking = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
.getDeclaredField("sortedRankings");
ranking.setAccessible(true);
AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRanking = (AtomicReference<Map<Long, Set<ResourceUnit>>>) ranking
.get(pulsar.getLoadManager().get());
Expand Down Expand Up @@ -420,6 +424,23 @@ public void testDestinationAssignmentWithExistingBundles() throws Exception {
}
}

/**
* Ensure that the load manager's zookeeper data cache is shutdown after invoking stop().
*/
@Test
public void testStop() throws Exception {
final SimpleLoadManagerImpl loadManager = (SimpleLoadManagerImpl) pulsarServices[0].getLoadManager().get();
loadManager.stop();
Field loadReportCacheField = SimpleLoadManagerImpl.class.getDeclaredField("loadReportCacheZk");
loadReportCacheField.setAccessible(true);
ZooKeeperDataCache<LoadReport> loadReportCache = (ZooKeeperDataCache<LoadReport>) loadReportCacheField
.get(loadManager);
Field IS_SHUTDOWN_UPDATER = ZooKeeperDataCache.class.getDeclaredField("IS_SHUTDOWN_UPDATER");
IS_SHUTDOWN_UPDATER.setAccessible(true);
final int TRUE = 1;
assert (((AtomicIntegerFieldUpdater<ZooKeeperDataCache>) (IS_SHUTDOWN_UPDATER.get(loadReportCache))).get(loadReportCache) == TRUE);
}

private AtomicReference<Map<String, ResourceQuota>> getRealtimeResourceQuota(PulsarService pulsar)
throws NoSuchFieldException, IllegalAccessException {
Field quotasField = ((SimpleLoadManagerImpl) pulsar.getLoadManager().get()).getClass()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.yahoo.pulsar.broker.loadbalance;

import java.util.Map;

import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.BrokerData;
import com.yahoo.pulsar.broker.BundleData;
import com.yahoo.pulsar.broker.LocalBrokerData;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.TimeAverageBrokerData;
import com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate;
import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage;

public class ModularLoadManagerStrategyTest {
// Test that least long term message rate works correctly.
@Test
public void testLeastLongTermMessageRate() {
BundleData bundleData = new BundleData();
BrokerData brokerData1 = initBrokerData();
BrokerData brokerData2 = initBrokerData();
BrokerData brokerData3 = initBrokerData();
brokerData1.getTimeAverageData().setLongTermMsgRateIn(100);
brokerData2.getTimeAverageData().setLongTermMsgRateIn(200);
brokerData3.getTimeAverageData().setLongTermMsgRateIn(300);
LoadData loadData = new LoadData();
Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
brokerDataMap.put("1", brokerData1);
brokerDataMap.put("2", brokerData2);
brokerDataMap.put("3", brokerData3);
ServiceConfiguration conf = new ServiceConfiguration();
ModularLoadManagerStrategy strategy = new LeastLongTermMessageRate(conf);
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("1"));
brokerData1.getTimeAverageData().setLongTermMsgRateIn(400);
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("2"));
brokerData2.getLocalData().setCpu(new ResourceUsage(90, 100));
assert (strategy.selectBroker(brokerDataMap.keySet(), bundleData, loadData, conf).equals("3"));
}

private BrokerData initBrokerData() {
LocalBrokerData localBrokerData = new LocalBrokerData();
localBrokerData.setCpu(new ResourceUsage());
localBrokerData.setMemory(new ResourceUsage());
localBrokerData.setBandwidthIn(new ResourceUsage());
localBrokerData.setBandwidthOut(new ResourceUsage());
BrokerData brokerData = new BrokerData(localBrokerData);
TimeAverageBrokerData timeAverageBrokerData = new TimeAverageBrokerData();
brokerData.setTimeAverageData(timeAverageBrokerData);
return brokerData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -36,10 +37,12 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String>
private final ZooKeeperCache cache;
private final String path;
private final List<ZooKeeperCacheListener<Set<String>>> listeners = Lists.newCopyOnWriteArrayList();
private final AtomicBoolean isShutdown;

public ZooKeeperChildrenCache(ZooKeeperCache cache, String path) {
this.cache = cache;
this.path = path;
isShutdown = new AtomicBoolean(false);
}

public Set<String> get() throws KeeperException, InterruptedException {
Expand Down Expand Up @@ -88,6 +91,12 @@ public void unregisterListener(ZooKeeperCacheListener<Set<String>> listener) {
@Override
public void process(WatchedEvent event) {
LOG.debug("[{}] Received ZooKeeper watch event: {}", cache.zkSession.get(), event);
cache.process(event, this);
if (!isShutdown.get()) {
cache.process(event, this);
}
}

public void close() {
isShutdown.set(true);
}
}
Loading