Skip to content

Commit

Permalink
Use historical data from old API in new API (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbeyreese authored and rdhabalia committed May 2, 2017
1 parent f7bcff6 commit a7f6973
Showing 1 changed file with 26 additions and 0 deletions.
Expand Up @@ -32,7 +32,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.yahoo.pulsar.broker.TimeAverageMessageData;
import com.yahoo.pulsar.broker.loadbalance.LoadManager;
import com.yahoo.pulsar.common.policies.data.ResourceQuota;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
Expand Down Expand Up @@ -91,6 +93,9 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
// The number of effective samples to keep for observing short term data.
public static final int NUM_SHORT_SAMPLES = 10;

// Path to ZNode whose children contain ResourceQuota jsons.
public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace";

// Path to ZNode containing TimeAverageBrokerData jsons for each broker.
public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average";

Expand Down Expand Up @@ -270,8 +275,29 @@ private BundleData getBundleDataOrDefault(final String bundle) {
BundleData bundleData = null;
try {
final String bundleZPath = getBundleDataZooKeeperPath(bundle);
final String quotaZPath = String.format("%s/%s", RESOURCE_QUOTA_ZPATH, bundle);
if (zkClient.exists(bundleZPath, null) != null) {
bundleData = readJson(zkClient.getData(bundleZPath, null, null), BundleData.class);
} else if (zkClient.exists(quotaZPath, null) != null) {
final ResourceQuota quota = readJson(zkClient.getData(quotaZPath, null, null), ResourceQuota.class);
bundleData = new BundleData(NUM_SHORT_SAMPLES, NUM_LONG_SAMPLES);
// Initialize from existing resource quotas if new API ZNodes do not exist.
final TimeAverageMessageData shortTermData = bundleData.getShortTermData();
final TimeAverageMessageData longTermData = bundleData.getLongTermData();

shortTermData.setMsgRateIn(quota.getMsgRateIn());
shortTermData.setMsgRateOut(quota.getMsgRateOut());
shortTermData.setMsgThroughputIn(quota.getBandwidthIn());
shortTermData.setMsgThroughputOut(quota.getBandwidthOut());

longTermData.setMsgRateIn(quota.getMsgRateIn());
longTermData.setMsgRateOut(quota.getMsgRateOut());
longTermData.setMsgThroughputIn(quota.getBandwidthIn());
longTermData.setMsgThroughputOut(quota.getBandwidthOut());

// Assume ample history.
shortTermData.setNumSamples(NUM_SHORT_SAMPLES);
longTermData.setNumSamples(NUM_LONG_SAMPLES);
}
} catch (Exception e) {
log.warn("Error when trying to find bundle {} on zookeeper: {}", bundle, e);
Expand Down

0 comments on commit a7f6973

Please sign in to comment.