Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce auto bundle split and unloading of split bundle in ModularLoadManager #857

Merged
merged 5 commits into from
Oct 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ loadBalancerBrokerComfortLoadLevelPercentage=65
# enable/disable namespace bundle auto split
loadBalancerAutoBundleSplitEnabled=false

# enable/disable automatic unloading of split bundles
loadBalancerAutoUnloadSplitBundlesEnabled=false

# maximum topics in a bundle, otherwise bundle split will be triggered
loadBalancerNamespaceBundleMaxTopics=1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Usage threshold to defermine a broker is having just right level of load
private int loadBalancerBrokerComfortLoadLevelPercentage = 65;
// enable/disable automatic namespace bundle split
@FieldContext(dynamic = true)
private boolean loadBalancerAutoBundleSplitEnabled = false;
// enable/disable automatic unloading of split bundles
@FieldContext(dynamic = true)
private boolean loadBalancerAutoUnloadSplitBundlesEnabled = false;
// maximum topics in a bundle, otherwise bundle split will be triggered
private int loadBalancerNamespaceBundleMaxTopics = 1000;
// maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered
Expand Down Expand Up @@ -1100,14 +1104,22 @@ public void setLoadBalancerBrokerComfortLoadLevelPercentage(int percentage) {
this.loadBalancerBrokerComfortLoadLevelPercentage = percentage;
}

public boolean getLoadBalancerAutoBundleSplitEnabled() {
public boolean isLoadBalancerAutoBundleSplitEnabled() {
return this.loadBalancerAutoBundleSplitEnabled;
}

public void setLoadBalancerAutoBundleSplitEnabled(boolean enabled) {
this.loadBalancerAutoBundleSplitEnabled = enabled;
}

public boolean isLoadBalancerAutoUnloadSplitBundlesEnabled() {
return loadBalancerAutoUnloadSplitBundlesEnabled;
}

public void setLoadBalancerAutoUnloadSplitBundlesEnabled(boolean loadBalancerAutoUnloadSplitBundlesEnabled) {
this.loadBalancerAutoUnloadSplitBundlesEnabled = loadBalancerAutoUnloadSplitBundlesEnabled;
}

public void setLoadBalancerNamespaceMaximumBundles(int bundles) {
this.loadBalancerNamespaceMaximumBundles = bundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker;

import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.io.IOException;
import java.net.URL;
import java.util.List;
Expand All @@ -42,7 +44,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -74,7 +76,6 @@
import com.google.common.collect.Lists;

import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

/**
* Main class for Pulsar broker service
Expand Down Expand Up @@ -428,7 +429,7 @@ private void startLoadManagementService() throws PulsarServerException {
if (config.isLoadBalancerEnabled()) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = SimpleLoadManagerImpl.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL;
this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,22 @@ public double getMsgRateOut() {
public void setMsgRateOut(double msgRateOut) {
this.msgRateOut = msgRateOut;
}

/**
* Get the total message rate.
*
* @return Message rate in + message rate out.
*/
public double totalMsgRate() {
return msgRateIn + msgRateOut;
}

/**
* Get the total message throughput.
*
* @return Message throughput in + message throughput out.
*/
public double totalMsgThroughput() {
return msgThroughputIn + msgThroughputOut;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ public void unloadNamespaceBundle(@PathParam("property") String property, @PathP
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public void splitNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the brokers already have the loadBalancerAutoUnloadSplitBundlesEnabled parameter, wouldn't this unload option be redundant?

Just trigger the split and broker will decide whether to unload or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct but this admin-api is triggered by load-balancer and cli-tool. Sometimes we might want to just split the bundle using cli-tool without unloading it. Therefore, this query-param can give flexibility to perform accordingly.??

log.info("[{}] Split namespace bundle {}/{}/{}/{}", clientAppId(), property, cluster, namespace, bundleRange);

validateSuperUserAccess();
Expand All @@ -855,7 +856,7 @@ public void splitNamespaceBundle(@PathParam("property") String property, @PathPa
true);

try {
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle).get();
pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
} catch (IllegalArgumentException e) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), fqnn.toString(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.Set;

import org.apache.pulsar.broker.PulsarService;

/**
* Load Manager component which determines what bundles should be split into two bundles.
*/
public interface BundleSplitStrategy {
/**
* Determines which bundles, if any, should be split.
*
* @param loadData
* Load data to base decisions on (does not have benefit of preallocated data since this may not be the
* leader broker).
* @param pulsar
* Service to use.
* @return A set of the bundles that should be split.
*/
Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface ModularLoadManager {
/**
* As the leader broker, attempt to automatically detect and split hot namespace bundles.
*/
void doNamespaceBundleSplit();
void checkNamespaceBundleSplit();

/**
* Initialize this load manager using the given pulsar service.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.pulsar.broker.LocalBrokerData;
//import org.apache.pulsar.broker.MessageData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Determines which bundles should be split based on various thresholds.
*/
public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class);
private final Set<String> bundleCache;

/**
* Construct a BundleSplitterTask.
*
* @param pulsar
* Service to construct from.
*/
public BundleSplitterTask(final PulsarService pulsar) {
bundleCache = new HashSet<>();
}

/**
* Determines which bundles should be split based on various thresholds.
*
* @param loadData
* Load data to base decisions on (does not have benefit of preallocated data since this may not be the
* leader broker).
* @param localData
* Local data for the broker we are splitting on.
* @param pulsar
* Service to use.
* @return All bundles who have exceeded configured thresholds in number of topics, number of sessions, total
* message rates, or total throughput.
*/
@Override
public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) {
bundleCache.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles();
long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics();
long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions();
long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate();
long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI;
loadData.getBrokerData().forEach((broker, brokerData) -> {
LocalBrokerData localData = brokerData.getLocalData();
for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
double totalMessageRate = 0;
double totalMessageThroughput = 0;
// Attempt to consider long-term message data, otherwise effectively ignore.
if (loadData.getBundleData().containsKey(bundle)) {
final TimeAverageMessageData longTermData = loadData.getBundleData().get(bundle).getLongTermData();
totalMessageRate = longTermData.totalMsgRate();
totalMessageThroughput = longTermData.totalMsgThroughput();
}
if (stats.topics > maxBundleTopics || stats.consumerCount + stats.producerCount > maxBundleSessions
|| totalMessageRate > maxBundleMsgRate || totalMessageThroughput > maxBundleBandwidth) {
final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle);
try {
final int bundleCount = pulsar.getNamespaceService()
.getBundleCount(new NamespaceName(namespace));
if (bundleCount < maxBundleCount) {
bundleCache.add(bundle);
} else {
log.warn(
"Could not split namespace bundle {} because namespace {} has too many bundles: {}",
bundle, namespace, bundleCount);
}
} catch (Exception e) {
log.warn("Error while getting bundle count for namespace {}", namespace, e);
}
}
}
});
return bundleCache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
Expand All @@ -52,6 +53,9 @@ public class LoadManagerShared {

// Cache for shard brokers according to policies.
private static final Set<String> sharedCache = new HashSet<>();

// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);

// Don't allow construction: static method namespace only.
private LoadManagerShared() {
Expand Down
Loading