Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new load manager API #303

Merged
merged 32 commits into from
Mar 30, 2017
Merged

Introduce new load manager API #303

merged 32 commits into from
Mar 30, 2017

Conversation

bobbeyreese
Copy link
Contributor

Motivation

The current load balancer implementation has demonstrated poor performance in some scenarios, particularly when doing a cold restart or when the load reaches unexpected heights at some brokers. This PR attempts to first address the cold start issue.

Modifications

  • Added third load balancing strategy based on least messages per second.
  • Added load simulation servers, which contain producers and consumers used to simulate load.
    • com.yahoo.pulsar.testclient.LoadSimulationServer
    • pulsar-perf simulation-server <options>
  • Added controller to send commands to load simulation servers in a shell, such as adding (potentially groups of) producers/consumers, changing message rates/sizes, and stopping them.
    • com.yahoo.pulsar.testclient.LoadSimulationController
    • pulsar-perf simulation-controller <options>
    • Can also run scripts.
  • Added broker monitor which will watch and print load report information.
    • com.yahoo.pulsar.testclient.BrokerMonitor
    • pulsar-perf monitor <options>
    • Also available in new API: com.yahoo.pulsar.testclient.NewBrokerMonitor and pulsar-perf new-monitor <options>
  • Added proposal for new load balancing API which is simpler, more modular, and written with less code.
    • Interface: com.yahoo.pulsar.broker.loadbalance.NewLoadManager
    • Implementation: com.yahoo.pulsar.broker.loadbalance.impl.NewLoadManagerImpl
    • May implement LoadManager via com.yahoo.pulsar.broker.loadbalance.impl.NewLoadManagerWrapper
    • Added broker filter pipeline, a sequence of objects that blacklist brokers for selection in exceptional cases (e.g., high CPU).
      • com.yahoo.pulsar.broker.loadbalance.BrokerFilter
      • Has no current implementations.
    • Added new placement strategy API, which uses all the data available to the leader broker to make bundle assignment decisions.
      • com.yahoo.pulsar.broker.loadbalance.NewPlacementStrategy
      • Currently implemented only by com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
    • Added load shedding strategy, which uses all the information available to the leader broker to decide if any bundles should be unloaded.
      • com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy
      • Implemented only by com.yahoo.pulsar.broker.loadbalance.impl.DeviationShedder (untested, abstract)
  • Added dynamically configurable fields which allow us to switch between strategies and managers (i.e., between old and new API).
  • Added allocated and preallocated data to the load report, as well as the bundle gains/losses since the last update.

Result

  • We will have the option to use, configure, and switch between a new load balancing strategy.
  • We will be able to more easily simulation load and test load balancing strategies.
  • We will be able to use the old load balancer if we want to.

@yahoocla
Copy link

Thank you for submitting this pull request, however I do not see a valid CLA on file for you. Before we can merge this request please visit https://yahoocla.herokuapp.com/ and agree to the terms. Thanks! 😄

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added few comments on non-substantial aspects. Will go through it more in deep later.

conf/broker.conf Outdated
@@ -293,3 +293,30 @@ keepAliveIntervalSeconds=30

# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60

# Number of samples to use for short term time window
numShortSamples=50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group all the settings by using a common prefix. eg: something like brokerLoadManagedXYZ...

conf/broker.conf Outdated
defaultMsgRateOut=50

# Name of load manager to use
loadManagerName=SimpleLoadManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use full class name

conf/broker.conf Outdated
numShortSamples=50

# Number of samples to use for long term time window
numLongSamples=1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try not to abbreviate too much the config variable names. For example this could be called brokerLoadManagerNumberOfSamplesLongTermWindow=1000

// Percentage of change to trigger load report update
private int loadBalancerReportUpdateThresholdPercentage = 10;
// maximum interval to update load report
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
private int loadBalancerReportUpdateMaxIntervalMinutes = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By lowering this interval to 1min, every broker is going to update its own load report every 1min, even if there were no substantial changes in the traffic.

Whenever 1 broker updates the z-node, all the other broker will get the watch and reload that load report and redo the ranking. That could be a bit expensive and it was the reason to wait up to 15min to update, in the case the load doesn't change more than X%

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I will revert the default back to 15 minutes.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.ZooKeeper;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Eclipse formatter profile and do not group the includes

} catch (KeeperException.NodeExistsException e) {
// ignore the exception, node might be present already
}
}

Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sleep needed?

LoadReport loadReport = null;
Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

"Not enough of primaries [{}] available for namespace - [{}], "
+ "adding shared [{}] as possible candidate owners",
primaries.size(), namespace.toString(), shared.size());
"Not enough of primaries [{}] available for namespace - [{}], "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the eclipse formatter to minimize formatting diffs

bin/pulsar-perf Outdated
@@ -137,6 +137,14 @@ if [ "$COMMAND" == "produce" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "consume" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "monitor" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between BrokerMonitor and NewBrokerMonitor ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is that one monitors data when SimpleLoadManagerImpl is used and the other monitors data when ModularLoadManagerImpl is used. I have renamed them to SimpleLoadManagerBrokerMonitor and ModularLoadManagerBrokerMonitor to better reflect the difference.

bin/pulsar-perf Outdated
elif [ "$COMMAND" == "simulation-server" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationServer "$@"
elif [ "$COMMAND" == "simulation-controller" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationController "$@"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also update the pulsar_help() above to add the new commands in the help message

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Mar 24, 2017
@merlimat merlimat added this to the 1.17 milestone Mar 24, 2017
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks very nice. Just few comments and formatting (use Eclipse with the formatting profile from src/formatter.xml).

Once this is merged, then it would also be good to have a document in the "Internal Docs" section to explain the mechanism behind the load manager and, even nicer, to have some visualization of the load distribution comparing old vs new load manager.

private double loadManagerDefaultMessageRateOut = 50;

// Name of load manager to use
private String loadManagerClassName = "com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SimpleLoadManagerImpl.class.getName(), so that it gets updated if the class is renamed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of an issue with this, in that pulsar-broker depends on pulsar-broker-common and maven doesn't allow circular dependencies. Without package reorganization I'm not sure it can be done this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no problem then

* ZooKeeper but are maintained by the leader broker (Map<String, BundleData>).
*/
public class BrokerData {
private LocalBrokerData localData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting seems a bit off, it should be 4 spaces and no tabs

@@ -201,6 +208,28 @@ public void close() throws PulsarServerException {
}
}

private class LoadManagerWatcher implements Watcher {
public void process(final WatchedEvent event) {
new Thread(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's preferable to reuse an existing executor instead of spawing new threads

public void process(final WatchedEvent event) {
new Thread(() -> {
try {
LOG.info("Attempting to change load manager");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ServiceConfiguration already support "dynamic" variables that can be overridded in ZK. You can register to be notified whenever a particular field is updated. Please check with @rdhabalia for more info.

try {
final ServiceConfiguration conf = pulsar.getConfiguration();
final Class<?> loadManagerClass = Class.forName(conf.getLoadManagerClassName());
// Assume there is a constructor with one argument of PulsarService.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of assuming a particular constructor, a common practice with interfaces and reflection is to assume default constructor and then enforce an init method in the interface:

void initialize(PulsarService service);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds fine, the only problem I would have with that is that I would lose the ability to finalize many fields, but that probably is not a big deal so I will change it

private Producer getNewProducer() throws Exception {
while (true) {
try {
return client.createProducerAsync(topic, producerConf).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be just

return client.createProducer(topic, producerConf);

if (!stop.get()) {
// The Producer failed due to an exception: attempt to get
// another producer.
producer = getNewProducer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once you have a producer, it's good forever. Even if you get an except, the producer instance is still valid and it will recover by itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems that for whatever reason, I am experiencing issues related to netty/direct memory if I do not attempt to make a new producer:

io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
        at io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:63)
        at com.yahoo.pulsar.common.compression.CompressionCodecNone.encode(CompressionCodecNone.java:27)
        at com.yahoo.pulsar.client.impl.BatchMessageContainer.getCompressedBatchMetadataAndPayload(BatchMessageContainer.java:107)
        at com.yahoo.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1019)
        at com.yahoo.pulsar.client.impl.ProducerImpl.doBatchSendAndAdd(ProducerImpl.java:293)
        at com.yahoo.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:237)
        at com.yahoo.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:134)
        at com.yahoo.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:47)
        at com.yahoo.pulsar.testclient.LoadSimulationServer$TradeUnit.start(LoadSimulationServer.java:112)
        at com.yahoo.pulsar.testclient.LoadSimulationServer.lambda$handle$5(LoadSimulationServer.java:212)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)

As a result, I can end up losing a bunch of bundles in the simulation. I will leave the get new producer logic for now and try to find a better solution later

@@ -39,11 +39,26 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't specify artifact version outside <dependencyManagement> section in the parent pom. In this case just omit the version.

@@ -159,8 +160,7 @@ private ResourceUsage getMemUsage() {
}

private boolean isPhysicalNic(Path path) {
try {
path = Files.isSymbolicLink(path) ? Files.readSymbolicLink(path) : path;
path = Files.isSymbolicLink(path) ? path.toAbsolutePath() : path;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing try/catch here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readSymbolicLink can throw IOException, but since it was changed, there is no possibility of a checked exception being thrown. I could change it to catch Exception instead, though this catches more exceptions than in the original code.

// select one of them at the end.
for (String broker : candidates) {
final double score = getScore(loadData.getBrokerData().get(broker), conf);
log.info("{} got score {}", broker, score);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be more like debug?

bin/pulsar-perf Outdated
consume Run a consumer
simple-monitor Continuously receive broker data when using SimpleLoadManagerImpl
modular-monitor Continuously receive broker data when using ModularLoadManagerImpl
simulation-server Run a simulation server acting as a Pulsar client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name it as simulation-client?

@@ -0,0 +1,52 @@
package com.yahoo.pulsar.broker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will require to have copy-right License here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why this diff is not outdated: the copy-right is present if you look at Files Changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can see now.

final String newLoadManagerName =
new String(getZkClient().getData(DYNAMIC_LOAD_MANAGER_ZPATH, this, null));

config.setLoadManagerClassName(newLoadManagerName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make LoadManagerClassName dynamic as per feature introduce in #186 and we can register the listener so, we may not have to create this class to update the loadbalancer.

if (lm instanceof SimpleLoadManagerImpl) {
return ((SimpleLoadManagerImpl) lm).getResourceAvailabilityFor(ns).asMap();
} else {
return Collections.emptyMap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if we just return null rather creating new collections.

@@ -79,6 +83,11 @@
void doNamespaceBundleSplit() throws Exception;

/**
* Determine the broker root.
*/
String getBrokerRoot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name it as : getBrokerRootName() as we are returning name rather root node?

}

// Form a score for a broker using its preallocated bundle data and time
// average data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here again, is it possible to define how do we calculate score?

// wireless nics don't report speed, ignore them.
return false;
}
path = Files.isSymbolicLink(path) ? path.toAbsolutePath() : path;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we read readSymbolicLink if file isSymbolicLink?

import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache;

public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> {
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/new-brokers";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we don't write into /loadbalance/brokers anymore once we enable this load-manager? because while migration old broker still tries to get broker's url from path /loadbalance/brokers while redirection of lookup request. and if this node will not be present then lookup may fail?

Copy link
Contributor Author

@bobbeyreese bobbeyreese Mar 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was resolved by having NamespaceService call getBrokerRoot instead of using the hardcoded path

EDIT: I see now there is little necessity to separate the two broker roots and have removed getBrokerRoot in favor of simply using /loadbalance/brokers

return broker;
}

private void policyFilter(final ServiceUnitId serviceUnit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the same logic compare to SimpleLoadManagerImpl.getFinalCandidates() to filter out candidate if yes then should we put it into some common place to avoid for any new loadManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very similar, but at the moment the different in the APIs (Multimap<Long, ResourceUnit> vs. Set<String>) complicate refactoring of this portion of the code significantly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some time today and decided to go ahead and share the code, along with other duplicate code. It makes a somewhat invasive change to SimpleLoadManagerImpl which could possibly impact performance: If there are major concerns there I will revert by to the previous version.


final Set<String> oldBundles = lastLoadReport.getBundles();
final Set<String> newBundles = loadReport.getBundles();
final Set<String> bundleGains = new HashSet<>();
Copy link
Contributor

@rdhabalia rdhabalia Mar 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to move this collection to class level and reuse it? as load-report run every x minutes and it may lil bit contribute in gc?

// the ranks are bounded. Otherwise (as is the case in LOADBALANCER_STRATEGY_LEAST_MSG, the ranks are simply
// the total message rate which is in the range [0,Infinity) so they are unbounded. The
// "boundedness" affects how two ranks are compared to see which one is better
boolean unboundedRanks = getLoadBalancerPlacementStrategy().equals(LOADBALANCER_STRATEGY_LEAST_MSG);
synchronized (resourceUnitRankings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both the methods accessing this map is synchronized, can we remove this synchronization block?

// Long term data for this bundle. The time frame of this data is determined
// by the number of long term samples
// and the bundle update period.
private TimeAverageMessageData longTermData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since BundleData is only maintained in memory by the leader, we will loose that info when leader restarts. Isn't it useful to have bundle specific historic data preserved across restart?

Copy link
Contributor Author

@bobbeyreese bobbeyreese Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is maintained on ZooKeeper also in /loadbalance/broker-data (it is basically the replacement for resource quotas). It is updated to ZooKeeper via writeBundleDataOnZooKeeper.

*/
@Override
public void onUpdate(final String path, final LocalBrokerData data, final Stat stat) {
scheduler.submit(this::updateAll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is invoked when load data is updated by every broker, shouldn't we update the data for that broker instead of all?

Copy link
Contributor Author

@bobbeyreese bobbeyreese Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, good catch.

EDIT: Actually, the local data is updated via writeBrokerDataOnZooKeeper, which is called periodically by LoadReportUpdaterTask. This updateAll periodic invocation ensures that the load data is up to date; while we could delegate it only to the leader broker, this could complicate things a little bit because:

  • The load manager does not know directly whether it is the leader broker.
  • Since it is implemented as a watch we would have to make special stipulations based on whether the broker is the leader broker.

I think that, since updating invokes reads and not writes, it is not so bad for every broker to do it. It is actually similar to SimpleLoadManagerImpl's behavior, since onUpdate updates the rankings for it, for each broker.

* @return The name of the selected broker, as it appears on ZooKeeper.
*/
@Override
public synchronized String selectBrokerForAssignment(final ServiceUnitId serviceUnit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need synchronized block here?

Copy link
Contributor Author

@bobbeyreese bobbeyreese Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the synchronized block, decisions will be made without the latest preallocated data. If many selections are processed within a short time frame, this could result in the least loaded broker receiving too many assignments. A weighted random procedure could counteract this, but it is a little less reliable and more complicated.

@@ -199,6 +199,10 @@
private boolean loadBalancerEnabled = false;
// load placement strategy
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection
// load placement secondary strategy (used to silently test an alternate strategy)
private String loadBalancerSecondaryStrategy = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need loadBalancerSecondaryStrategy? can't we update loadBalancerPlacementStrategy and dynamically keep changing the strategy?


// Name of load manager to use
@FieldContext(dynamic = true)
private String loadManagerClassName = "com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should place all the variables in the beginning and then getters/setters.

// load placement secondary strategy (used to silently test an alternate strategy)
private String loadBalancerSecondaryStrategy = null;
// are all bundle placement operations forwarded to a lead broker
private boolean loadBalancerIsCentralized = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this field. right now, we derive loadBalancerIsCentralized based on strategy. if strategy is leastLoadedServer then loadBalancerIsCentralized will be true else false. So, each LoadBalancer implements isCentralized method.
So, as we already have loadBalancer implementation then LoadBalancerIml.isCentralized() should have predefine value and it shouldn't be dynamic. right?

@@ -0,0 +1,52 @@
package com.yahoo.pulsar.broker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can see now.

System.out.println("\nBroker Data for " + broker + ":");
System.out.println("---------------");

System.out.println("\nNum Topics: " + localBrokerData.getNumTopics());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please replace System.out.print with logger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this strictly necessary? This is meant to feed continuous information in a pretty/human-readable way and the info lines are going to add a lot of clutter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, though the advantage of using a logger is that you can chose to change levels or save to a file with configuration. Also, every line will get the timestamp printed

private Set<String> bundles;

// The bundles gained since the last invocation of update.
private Set<String> lastBundleGains;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use lastBundleGains in loadManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In is not used in the load manager, but it is used in the monitor to alert users to recent bundle gains.

* which are not written to ZooKeeper but are maintained by the leader broker (Map<String, BundleData>).
*/
public class BrokerData {
private LocalBrokerData localData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LocalBrokerData is a replacement of LoadReport so, can't we add more information into existing LoadReport and use it? In that way we don't have to maintain two classes and any load-manager can deserialize it. is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that is true, I think pushing the new fields into LoadReport would add unnecessary clutter to the JSON for users of SimpleLoadManagerImpl and vice-versa for ModularLoadManagerImpl. I think they are different enough to keep separate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@saandrews
Copy link
Contributor

Can we create a separate pull request to document the steps to enable/disable the new load manager?

/ newUsage.directMemory.limit)
: 0;
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100
/ newUsage.directMemory.limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting. spaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be formatted correctly according to our formatter

try {
log.info("Attempting to change load manager");
final LoadManager newLoadManager = LoadManager.create(pulsar);
log.info("Created load manager: {}", className);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just have one info statement after creating load-manager.

@@ -199,6 +199,7 @@
private boolean loadBalancerEnabled = false;
// load placement strategy
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection
// load placement secondary strategy (used to silently test an alternate strategy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed to remove this line

bin/pulsar-perf Outdated
produce Run a producer
consume Run a consumer
simple-monitor Continuously receive broker data when using SimpleLoadManagerImpl
modular-monitor Continuously receive broker data when using ModularLoadManagerImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit-picking: how about calling the 2 commands like :

monitor-simple-load-manager
monitor-modular-load-manager

Otherwise the adjective seems to be referred to the "monitor" itself.

Files.readAllBytes(path.resolve("speed"));
return true;
} catch (Exception e) {
// wireless nics don't report speed, ignore them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still be skipping "virtual" NICs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided that, since virtual nics will not be able to resolve speed (?), we may as well let catch block deal with it. That way, we do not need to resolve symbolic links or get absolute paths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're 100% sure that it works, go ahead 😄 . Otherwise just pull back the if-not-virtual check.

Copy link
Contributor

@saandrews saandrews left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good effort to revive the load balancer Brad & Bobbey 👍

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@saandrews saandrews merged commit 1e02afb into apache:master Mar 30, 2017
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
…vert properly (apache#311)

When `KafkaConsumer` send OffsetCommitRequest,
the field `retentionTime` is always set to -1,
and kafka server convert the retentionTime according to server config.

this patch add convert logic the same as kafka server
to avoid kop expire the wrong commit offset in
`GroupMetadataManager.cleanupGroupMetadata` logic.

test in `KafkaRequestHandlerTest.testOffsetCommitRequestRetentionMs`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants