Skip to content

Commit

Permalink
HBASE-25767 CandidateGenerator.getRandomIterationOrder is too slow on…
Browse files Browse the repository at this point in the history
… large cluster (#3149)

Signed-off-by: XinSun <ddupgs@gmail.com>
Signed-off-by: Yulin Niu <niuyulin@apache.org>
  • Loading branch information
Apache9 committed Apr 13, 2021
1 parent de012d7 commit 5910e9e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 70 deletions.
Expand Up @@ -18,13 +18,8 @@

package org.apache.hadoop.hbase.master.balancer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.client.RegionInfo;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -135,17 +130,4 @@ protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegi
return BaseLoadBalancer.Cluster.NullAction;
}
}

/**
* Returns a random iteration order of indexes of an array with size length
*/
List<Integer> getRandomIterationOrder(int length) {
ArrayList<Integer> order = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
order.add(i);
}
Collections.shuffle(order);
return order;
}

}
Expand Up @@ -18,42 +18,30 @@

package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.master.MasterServices;

import org.apache.hbase.thirdparty.com.google.common.base.Optional;

import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class LocalityBasedCandidateGenerator extends CandidateGenerator {

private MasterServices masterServices;

LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}

@Override
BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) {
if (this.masterServices == null) {
int thisServer = pickRandomServer(cluster);
// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);
return pickRandomRegions(cluster, thisServer, otherServer);
}

// Randomly iterate through regions until you find one that is not on ideal host
for (int region : getRandomIterationOrder(cluster.numRegions)) {
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
currentServer, region,
cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]
);
if (potential.isPresent()) {
return potential.get();
// iterate through regions until you find one that is not on ideal host
// start from a random point to avoid always balance the regions in front
if (cluster.numRegions > 0) {
int startIndex = ThreadLocalRandom.current().nextInt(cluster.numRegions);
for (int i = 0; i < cluster.numRegions; i++) {
int region = (startIndex + i) % cluster.numRegions;
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) {
Optional<BaseLoadBalancer.Cluster.Action> potential = tryMoveOrSwap(cluster,
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]);
if (potential.isPresent()) {
return potential.get();
}
}
}
}
Expand All @@ -69,25 +57,25 @@ private Optional<BaseLoadBalancer.Cluster.Action> tryMoveOrSwap(BaseLoadBalancer
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
- getWeightedLocality(cluster, fromRegion, fromServer);
for (int toRegionIndex : getRandomIterationOrder(cluster.regionsPerServer[toServer].length)) {
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer)
- getWeightedLocality(cluster, toRegion, toServer);
// If locality would remain neutral or improve, attempt the swap
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
if (toServertotalRegions > 0) {
int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
for (int i = 0; i < toServertotalRegions; i++) {
int toRegionIndex = (startIndex + i) % toServertotalRegions;
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer) -
getWeightedLocality(cluster, toRegion, toServer);
// If locality would remain neutral or improve, attempt the swap
if (fromRegionLocalityDelta + toRegionLocalityDelta >= 0) {
return Optional.of(getAction(fromServer, fromRegion, toServer, toRegion));
}
}
}
return Optional.absent();
return Optional.empty();
}

private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) {
return cluster.getOrComputeWeightedLocality(region, server,
BaseLoadBalancer.Cluster.LocalityType.SERVER);
}

void setServices(MasterServices services) {
this.masterServices = services;
}

}
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
Expand Down Expand Up @@ -187,7 +186,7 @@ public synchronized void setConf(Configuration conf) {
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
if (localityCandidateGenerator == null) {
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
localityCandidateGenerator = new LocalityBasedCandidateGenerator();
}
localityCost = new ServerLocalityCostFunction(conf);
rackLocalityCost = new RackLocalityCostFunction(conf);
Expand Down Expand Up @@ -309,18 +308,16 @@ public void updateMetricsSize(int size) {
}
}

@Override
public synchronized void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
this.localityCandidateGenerator.setServices(masterServices);
}

@Override
protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) {
regionReplicaHostCostFunction.init(c);
if (regionReplicaHostCostFunction.cost() > 0) return true;
if (regionReplicaHostCostFunction.cost() > 0) {
return true;
}
regionReplicaRackCostFunction.init(c);
if (regionReplicaRackCostFunction.cost() > 0) return true;
if (regionReplicaRackCostFunction.cost() > 0) {
return true;
}
return false;
}

Expand Down

0 comments on commit 5910e9e

Please sign in to comment.