Skip to content

Commit

Permalink
HBASE-25873 Refactor and cleanup the code for CostFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed May 17, 2021
1 parent 1c6994a commit 42a899e
Show file tree
Hide file tree
Showing 18 changed files with 229 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ public BalanceAction undoAction() {
public String toString() {
return getType() + ": " + region + ":" + server;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.Collection;
import java.util.Iterator;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -30,18 +31,20 @@ abstract class CostFromRegionLoadAsRateFunction extends CostFromRegionLoadFuncti

@Override
protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
Iterator<BalancerRegionLoad> iter = regionLoadList.iterator();
if (!iter.hasNext()) {
return 0;
}
double previous = getCostFromRl(iter.next());
if (!iter.hasNext()) {
return 0;
}
double cost = 0;
double previous = 0;
boolean isFirst = true;
for (BalancerRegionLoad rl : regionLoadList) {
double current = getCostFromRl(rl);
if (isFirst) {
isFirst = false;
} else {
cost += current - previous;
}
do {
double current = getCostFromRl(iter.next());
cost += current - previous;
previous = current;
}
} while (iter.hasNext());
return Math.max(0, cost / (regionLoadList.size() - 1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.Collection;
import java.util.Deque;
import java.util.Map;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -30,39 +27,29 @@
@InterfaceAudience.Private
abstract class CostFromRegionLoadFunction extends CostFunction {

private ClusterMetrics clusterStatus;
private Map<String, Deque<BalancerRegionLoad>> loads;
private double[] stats;

void setClusterMetrics(ClusterMetrics status) {
this.clusterStatus = status;
}

void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
this.loads = l;
}

@Override
protected final double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}

void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
}

@Override
protected final double cost() {
for (int i = 0; i < stats.length; i++) {
// Cost this server has from RegionLoad
long cost = 0;
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[i]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost = (long) (cost + getRegionLoadCost(regionLoadList));
cost += getRegionLoadCost(regionLoadList);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void setMultiplier(float m) {
* Called once per LB invocation to give the cost function to initialize it's state, and perform
* any costly calculation.
*/
void init(BalancerClusterState cluster) {
void prepare(BalancerClusterState cluster) {
this.cluster = cluster;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -119,7 +120,7 @@ public class HeterogeneousRegionCountCostFunction extends CostFunction {
* any costly calculation.
*/
@Override
void init(final BalancerClusterState cluster) {
void prepare(final BalancerClusterState cluster) {
this.cluster = cluster;
this.loadRules();
}
Expand Down Expand Up @@ -148,6 +149,8 @@ protected double cost() {
/**
* used to load the rule files.
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|HeterogeneousRegionCountCostFunction).java")
void loadRules() {
final List<String> lines = readFile(this.rulesPath);
if (null == lines) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ abstract class LocalityBasedCostFunction extends CostFunction {
abstract int regionIndexToEntityIndex(int region);

@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
locality = 0.0;
bestLocality = 0.0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,35 @@ class MoveCostFunction extends CostFunction {
private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;

private final float maxMovesPercent;
private final Configuration conf;
private final OffPeakHours offPeakHours;
private final float moveCost;
private final float moveCostOffPeak;

MoveCostFunction(Configuration conf) {
this.conf = conf;
// What percent of the number of regions a single run of the balancer can move.
maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);

offPeakHours = OffPeakHours.getInstance(conf);
moveCost = conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST);
moveCostOffPeak = conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK);
// Initialize the multiplier so that addCostFunction will add this cost function.
// It may change during later evaluations, due to OffPeakHours.
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
this.setMultiplier(moveCost);
}

@Override
protected double cost() {
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
// Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
// that large benefits are need to overcome the cost of a move.
if (OffPeakHours.getInstance(conf).isOffPeakHour()) {
this.setMultiplier(conf.getFloat(MOVE_COST_OFFPEAK_KEY, DEFAULT_MOVE_COST_OFFPEAK));
if (offPeakHours.isOffPeakHour()) {
this.setMultiplier(moveCostOffPeak);
} else {
this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
this.setMultiplier(moveCost);
}
}

@Override
protected double cost() {
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent), DEFAULT_MAX_MOVES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,25 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;

private final float primaryRegionCountCost;
private double[] stats;

PrimaryRegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
this.setMultiplier(
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
primaryRegionCountCost =
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
this.setMultiplier(primaryRegionCountCost);
}

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (!isNeeded()) {
return;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
}

@Override
Expand All @@ -46,13 +59,6 @@ boolean isNeeded() {

@Override
protected double cost() {
if (!cluster.hasRegionReplicas) {
return 0;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}

for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,19 @@ class RegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;

private double[] stats = null;
private double[] stats;

RegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
}

@Override
void init(BalancerClusterState cluster) {
super.init(cluster);
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
Expand All @@ -56,9 +59,6 @@ void init(BalancerClusterState cluster) {

@Override
protected double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Arrays;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A cost function for region replicas. We give a high cost for hosting replicas of the same region
* in the same server, host or rack. We do not prevent the case though, since if numReplicas >
* numRegionServers, we still want to keep the replica open.
*/
@InterfaceAudience.Private
abstract class RegionReplicaGroupingCostFunction extends CostFunction {

protected long maxCost = 0;
protected long[] costsPerGroup; // group is either server, host or rack

@Override
final void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (!isNeeded()) {
return;
}
loadCosts();
}

protected abstract void loadCosts();

protected final long getMaxCost(BalancerClusterState cluster) {
// max cost is the case where every region replica is hosted together regardless of host
int[] primariesOfRegions = new int[cluster.numRegions];
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
cluster.regions.length);

Arrays.sort(primariesOfRegions);

// compute numReplicas from the sorted array
return costPerGroup(primariesOfRegions);
}

@Override
boolean isNeeded() {
return cluster.hasRegionReplicas;
}

@Override
protected double cost() {
if (maxCost <= 0) {
return 0;
}

long totalCost = 0;
for (int i = 0; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
return scale(0, maxCost, totalCost);
}

/**
* For each primary region, it computes the total number of replicas in the array (numReplicas)
* and returns a sum of numReplicas-1 squared. For example, if the server hosts regions a, b, c,
* d, e, f where a and b are same replicas, and c,d,e are same replicas, it returns (2-1) * (2-1)
* + (3-1) * (3-1) + (1-1) * (1-1).
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
protected final long costPerGroup(int[] primariesOfRegions) {
long cost = 0;
int currentPrimary = -1;
int currentPrimaryIndex = -1;
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
for (int j = 0; j <= primariesOfRegions.length; j++) {
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
if (primary != currentPrimary) { // we see a new primary
int numReplicas = j - currentPrimaryIndex;
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}

return cost;
}
}

0 comments on commit 42a899e

Please sign in to comment.