Skip to content

Commit

Permalink
HBASE-13965 Stochastic Load Balancer JMX Metrics (Lei Chen)
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu committed Aug 3, 2015
1 parent 20d1fa3 commit 598cfeb
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 16 deletions.
Expand Up @@ -181,6 +181,10 @@ public enum ReturnCode {
* Seek to next key which is given as hint by the filter. * Seek to next key which is given as hint by the filter.
*/ */
SEEK_NEXT_USING_HINT, SEEK_NEXT_USING_HINT,
/**
* Include KeyValue and done with row, seek to next.
*/
INCLUDE_AND_SEEK_NEXT_ROW,
} }


/** /**
Expand Down
Expand Up @@ -134,7 +134,8 @@ public void filterRowCells(List<Cell> kvs) throws IOException {
public enum FilterRowRetCode { public enum FilterRowRetCode {
NOT_CALLED, NOT_CALLED,
INCLUDE, // corresponds to filter.filterRow() returning false INCLUDE, // corresponds to filter.filterRow() returning false
EXCLUDE // corresponds to filter.filterRow() returning true EXCLUDE, // corresponds to filter.filterRow() returning true
INCLUDE_THIS_FAMILY // exclude other families
} }
public FilterRowRetCode filterRowCellsWithRet(List<Cell> kvs) throws IOException { public FilterRowRetCode filterRowCellsWithRet(List<Cell> kvs) throws IOException {
//To fix HBASE-6429, //To fix HBASE-6429,
Expand Down
Expand Up @@ -124,6 +124,12 @@ public enum OperationStatusCode {
/** Config for pluggable load balancers */ /** Config for pluggable load balancers */
public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class"; public static final String HBASE_MASTER_LOADBALANCER_CLASS = "hbase.master.loadbalancer.class";


/** Config for balancing the cluster by table */
public static final String HBASE_MASTER_LOADBALANCE_BYTABLE = "hbase.master.loadbalance.bytable";

/** The name of the ensemble table */
public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";

/** Config for pluggable region normalizer */ /** Config for pluggable region normalizer */
public static final String HBASE_MASTER_NORMALIZER_CLASS = public static final String HBASE_MASTER_NORMALIZER_CLASS =
"hbase.master.normalizer.class"; "hbase.master.normalizer.class";
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1256,12 +1257,14 @@ public boolean balance() throws IOException {
this.assignmentManager.getRegionStates().getAssignmentsByTable(); this.assignmentManager.getRegionStates().getAssignmentsByTable();


List<RegionPlan> plans = new ArrayList<RegionPlan>(); List<RegionPlan> plans = new ArrayList<RegionPlan>();

//Give the balancer the current cluster state. //Give the balancer the current cluster state.
this.balancer.setClusterStatus(getClusterStatus()); this.balancer.setClusterStatus(getClusterStatus());
for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) { for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments); List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
if (partialPlans != null) plans.addAll(partialPlans); if (partialPlans != null) plans.addAll(partialPlans);
} }

long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
int rpCount = 0; // number of RegionPlans balanced so far int rpCount = 0; // number of RegionPlans balanced so far
long totalRegPlanExecTime = 0; long totalRegPlanExecTime = 0;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;


/** /**
* Makes decisions about the placement and movement of Regions across * Makes decisions about the placement and movement of Regions across
Expand Down Expand Up @@ -64,6 +65,15 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
*/ */
void setMasterServices(MasterServices masterServices); void setMasterServices(MasterServices masterServices);


/**
* Perform the major balance operation
* @param tableName
* @param clusterState
* @return List of plans
*/
List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName,
List<HRegionInfo>> clusterState) throws HBaseIOException;

/** /**
* Perform the major balance operation * Perform the major balance operation
* @param clusterState * @param clusterState
Expand Down
Expand Up @@ -953,13 +953,14 @@ protected synchronized double getAverageLoad() {
Map<TableName, Map<ServerName, List<HRegionInfo>>> result = Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>(); new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
synchronized (this) { synchronized (this) {
if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) { if (!server.getConfiguration().getBoolean(
HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false)) {
Map<ServerName, List<HRegionInfo>> svrToRegions = Map<ServerName, List<HRegionInfo>> svrToRegions =
new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size()); new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) { for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue())); svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
} }
result.put(TableName.valueOf("ensemble"), svrToRegions); result.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), svrToRegions);
} else { } else {
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) { for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
for (HRegionInfo hri: e.getValue()) { for (HRegionInfo hri: e.getValue()) {
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
Expand Down Expand Up @@ -80,6 +79,21 @@ public String getRack(ServerName server) {
return UNKNOWN_RACK; return UNKNOWN_RACK;
} }
} }

/**
* The constructor that uses the basic MetricsBalancer
*/
protected BaseLoadBalancer() {
metricsBalancer = new MetricsBalancer();
}

/**
* This Constructor accepts an instance of MetricsBalancer,
* which will be used instead of creating a new one
*/
protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
}


/** /**
* An efficient array based implementation similar to ClusterState for keeping * An efficient array based implementation similar to ClusterState for keeping
Expand Down Expand Up @@ -803,7 +817,7 @@ public String toString() {
"hbase.balancer.tablesOnMaster"; "hbase.balancer.tablesOnMaster";


protected final Set<String> tablesOnMaster = new HashSet<String>(); protected final Set<String> tablesOnMaster = new HashSet<String>();
protected final MetricsBalancer metricsBalancer = new MetricsBalancer(); protected MetricsBalancer metricsBalancer = null;
protected ClusterStatus clusterStatus = null; protected ClusterStatus clusterStatus = null;
protected ServerName masterServerName; protected ServerName masterServerName;
protected MasterServices services; protected MasterServices services;
Expand Down
Expand Up @@ -28,11 +28,13 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.ServerManager;
Expand Down Expand Up @@ -345,4 +347,10 @@ private void assignSecondaryAndTertiaryNodesForRegion(
globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion); globalFavoredNodesAssignmentPlan.updateFavoredNodesMap(region, favoredNodesForRegion);
} }
} }

@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
return balanceCluster(clusterState);
}
} }
Expand Up @@ -25,9 +25,17 @@
*/ */
public class MetricsBalancer { public class MetricsBalancer {


private final MetricsBalancerSource source; private MetricsBalancerSource source = null;


public MetricsBalancer() { public MetricsBalancer() {
initSource();
}

/**
* A function to instantiate the metrics source. This function can be overridden in its
* subclasses to provide extended sources
*/
protected void initSource() {
source = CompatibilitySingletonFactory.getInstance(MetricsBalancerSource.class); source = CompatibilitySingletonFactory.getInstance(MetricsBalancerSource.class);
} }


Expand Down
Expand Up @@ -30,9 +30,11 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.RegionPlan;


import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.MinMaxPriorityQueue;
Expand Down Expand Up @@ -433,4 +435,10 @@ private void addRegionPlan(final MinMaxPriorityQueue<RegionPlan> regionsToMove,
rp.setDestination(sn); rp.setDestination(sn);
regionsToReturn.add(rp); regionsToReturn.add(rp);
} }

@Override
public List<RegionPlan> balanceCluster(TableName tableName,
Map<ServerName, List<HRegionInfo>> clusterState) throws HBaseIOException {
return balanceCluster(clusterState);
}
} }

0 comments on commit 598cfeb

Please sign in to comment.