Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26178 Improve data structure and algorithm for BalanceClusterSt… #3575

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ public void postMasterStartupInitialize() {
protected final boolean idleRegionServerExist(BalancerClusterState c){
boolean isServerExistsWithMoreRegions = false;
boolean isServerExistsWithZeroRegions = false;
for (int[] serverList: c.regionsPerServer){
if (serverList.length > 1) {
for (ArrayList<Integer> serverList: c.regionsPerServer){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/List/ArrayList/ ?

if (serverList.size() > 1) {
isServerExistsWithMoreRegions = true;
}
if (serverList.length == 0) {
if (serverList.size() == 0) {
isServerExistsWithZeroRegions = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ abstract class CandidateGenerator {
*/
int pickRandomRegion(BalancerClusterState cluster, int server, double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
if (cluster.regionsPerServer.get(server).size() == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/isEmpty()/size() == 0/ ... its faster.

|| ThreadLocalRandom.current().nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = ThreadLocalRandom.current().nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
int rand = ThreadLocalRandom.current().nextInt(cluster.regionsPerServer.get(server).size());
return cluster.regionsPerServer.get(server).get(rand);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to keep RegionsPerServer as ArrayList<ArrayList> because we need to get random element while making it compatible with regionsPerHost/Rack.

}

int pickRandomServer(BalancerClusterState cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private double computeCostForRegionServer(int regionServerIndex) {
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[regionServerIndex]) {
for (Integer regionIndex : cluster.regionsPerServer.get(regionServerIndex)) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
Expand Down Expand Up @@ -79,4 +79,4 @@ protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList
}

protected abstract double getCostFromRl(BalancerRegionLoad rl);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,9 @@ protected BalanceAction generate(BalancerClusterState cluster) {
thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer);
}
if (thisRegion == -1) {
if (cluster.regionsPerServer[thisServer].length > 0) {
if (cluster.regionsPerServer.get(thisServer).size() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/isEmpty()/size()/ and negate the check?

LOG.trace("Could not pick lowest local region even when region server held "
+ cluster.regionsPerServer[thisServer].length + " regions");
+ cluster.regionsPerServer.get(thisServer).size() + " regions");
}
return BalanceAction.NULL_ACTION;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected double cost() {
// retrieve capacity for each RS
final ServerName sn = this.cluster.servers[i];
final double limit = this.limitPerRS.getOrDefault(sn, defaultNumberOfRegions);
final double nbrRegions = this.cluster.regionsPerServer[i].length;
final double nbrRegions = this.cluster.regionsPerServer.get(i).size();
final double usage = nbrRegions / limit;
if (usage > targetUsage) {
// cost is the number of regions above the local limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster, int
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer) -
getWeightedLocality(cluster, fromRegion, fromServer);
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
int toServertotalRegions = cluster.regionsPerServer.get(toServer).size();
if (toServertotalRegions > 0) {
int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
for (int i = 0; i < toServertotalRegions; i++) {
int toRegionIndex = (startIndex + i) % toServertotalRegions;
int toRegion = cluster.regionsPerServer[toServer][toRegionIndex];
int toRegion = cluster.regionsPerServer.get(toServer).get(toRegionIndex);
double toRegionLocalityDelta = getWeightedLocality(cluster, toRegion, fromServer) -
getWeightedLocality(cluster, toRegion, toServer);
// If locality would remain neutral or improve, attempt the swap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {

private double computeCostForRegionServer(int regionServerIndex) {
int cost = 0;
for (int regionIdx : cluster.regionsPerServer[regionServerIndex]) {
for (Integer regionIdx : cluster.regionsPerServer.get(regionServerIndex)) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
cost++;
}
Expand All @@ -66,8 +66,12 @@ void prepare(BalancerClusterState cluster) {
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.applyCostsChange(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
if (region == cluster.regionIndexToPrimaryIndex[region]) {
if (oldServer >= 0) {
costs[oldServer]--;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O(n) -> O(1)

}
costs[newServer]++;
}
});
}

Expand All @@ -80,4 +84,4 @@ boolean isNeeded() {
protected double cost() {
return cost.cost();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ void prepare(BalancerClusterState cluster) {
cost.prepare(cluster.numServers);
cost.applyCostsChange(costs -> {
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.regionsPerServer[i].length;
costs[i] = cluster.regionsPerServer.get(i).size();
}
});
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
for (int i = 0; i < cluster.numServers; i++) {
LOG.trace("{} sees server '{}' has {} regions", getClass().getSimpleName(),
cluster.servers[i], cluster.regionsPerServer[i].length);
cluster.servers[i], cluster.regionsPerServer.get(i).size());
}
}
}
Expand All @@ -68,8 +68,8 @@ protected double cost() {
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.applyCostsChange(costs -> {
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
costs[newServer] = cluster.regionsPerServer[newServer].length;
costs[oldServer] = cluster.regionsPerServer.get(oldServer).size();
costs[newServer] = cluster.regionsPerServer.get(newServer).size();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

package org.apache.hadoop.hbase.master.balancer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -39,42 +44,37 @@ class RegionReplicaCandidateGenerator extends CandidateGenerator {
* @param regionIndexToPrimaryIndex Cluster.regionsIndexToPrimaryIndex
* @return a regionIndex for the selected primary or -1 if there is no co-locating
*/
int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regionsPerGroup,
int[] regionIndexToPrimaryIndex) {
int currentPrimary = -1;
int currentPrimaryIndex = -1;
int selectedPrimaryIndex = -1;
int selectCoHostedRegionPerGroup(HashMap<Integer, ArrayList<Integer>> primariesOfRegionsPerGroup,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/List/ArrayList/ and s/Map/HashMap/?

Collection<Integer> regionsPerGroup, int[] regionIndexToPrimaryIndex) {

double currentLargestRandom = -1;
// primariesOfRegionsPerGroup is a sorted array. Since it contains the primary region
// ids for the regions hosted in server, a consecutive repetition means that replicas
// are co-hosted
for (int j = 0; j <= primariesOfRegionsPerGroup.length; j++) {
int primary = j < primariesOfRegionsPerGroup.length ? primariesOfRegionsPerGroup[j] : -1;
if (primary != currentPrimary) { // check for whether we see a new primary
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = ThreadLocalRandom.current().nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {
selectedPrimaryIndex = currentPrimary;
currentLargestRandom = currentRandom;
}
Map.Entry<Integer, ArrayList<Integer>> selectedPrimaryIndexEntry = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/List/ArrayList/ ?


// primariesOfRegionsPerGroup is a hashmap of count of primary index on a server. a count > 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my info, whats a 'count of primary index on a server'? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the number of co-located region replicas on one group. Let me update the comment.

// means that replicas are co-hosted
for(Map.Entry<Integer, ArrayList<Integer>> pair : primariesOfRegionsPerGroup.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/LIst/ArrayList/ unless you are using explicit ArrayList methods?

if (pair.getValue().size() > 1) { // indicating co-location
// decide to select this primary region id or not
double currentRandom = ThreadLocalRandom.current().nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {
selectedPrimaryIndexEntry = pair;
currentLargestRandom = currentRandom;
}
currentPrimary = primary;
currentPrimaryIndex = j;
}
}

// we have found the primary id for the region to move. Now find the actual regionIndex
// with the given primary, prefer to move the secondary region.
for (int regionIndex : regionsPerGroup) {
if (selectedPrimaryIndex == regionIndexToPrimaryIndex[regionIndex]) {
if (selectedPrimaryIndexEntry == null) {
return -1;
}

// we have found the primary id and the set of regions for the region to move.
// now to return one of the secondary
for (Integer regionIndex : selectedPrimaryIndexEntry.getValue()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we store the region indexes of all colocated replicas here so we don't need to do O(n) look up on regionIndexToPrimaryIndex.

if (!regionIndex.equals(selectedPrimaryIndexEntry.getKey())) {
// always move the secondary, not the primary
if (selectedPrimaryIndex != regionIndex) {
return regionIndex;
}
return regionIndex;
}
}
return -1;
Expand All @@ -87,8 +87,9 @@ BalanceAction generate(BalancerClusterState cluster) {
return BalanceAction.NULL_ACTION;
}

int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerServer[serverIndex],
cluster.regionsPerServer[serverIndex], cluster.regionIndexToPrimaryIndex);
int regionIndex = selectCoHostedRegionPerGroup(
cluster.primariesOfRegionsPerServer.get(serverIndex),
cluster.regionsPerServer.get(serverIndex), cluster.regionIndexToPrimaryIndex);

// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
Expand All @@ -100,5 +101,4 @@ BalanceAction generate(BalancerClusterState cluster) {
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -44,11 +46,12 @@ final void prepare(BalancerClusterState cluster) {

protected final long getMaxCost(BalancerClusterState cluster) {
// max cost is the case where every region replica is hosted together regardless of host
int[] primariesOfRegions = new int[cluster.numRegions];
System.arraycopy(cluster.regionIndexToPrimaryIndex, 0, primariesOfRegions, 0,
cluster.regions.length);

Arrays.sort(primariesOfRegions);
HashMap<Integer, ArrayList<Integer>> primariesOfRegions =
new HashMap<Integer, ArrayList<Integer>>(cluster.numRegions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't do this double allocation on a single line... we give each allocation and assign its own line... easier to read.

Left hand size should be Map rather than HashMap? Ditto for List and ArrayLIst.

for (int i = 0; i < cluster.regionIndexToPrimaryIndex.length; i++) {
BalancerClusterState.addElement(primariesOfRegions,
cluster.regionIndexToPrimaryIndex[i], i);
}

// compute numReplicas from the sorted array
return costPerGroup(primariesOfRegions);
Expand Down Expand Up @@ -80,22 +83,16 @@ protected double cost() {
* @param primariesOfRegions a sorted array of primary regions ids for the regions hosted
* @return a sum of numReplicas-1 squared for each primary region in the group.
*/
protected final long costPerGroup(int[] primariesOfRegions) {
protected final long costPerGroup(HashMap<Integer, ArrayList<Integer>> primariesOfRegions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Map/HashMap/ and s/List/ArrayList/ ?

long cost = 0;
int currentPrimary = -1;
int currentPrimaryIndex = -1;
// primariesOfRegions is a sorted array of primary ids of regions. Replicas of regions
// sharing the same primary will have consecutive numbers in the array.
for (int j = 0; j <= primariesOfRegions.length; j++) {
int primary = j < primariesOfRegions.length ? primariesOfRegions[j] : -1;
if (primary != currentPrimary) { // we see a new primary
int numReplicas = j - currentPrimaryIndex;
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
currentPrimary = primary;
currentPrimaryIndex = j;

// primariesOfRegionsPerGroup is a hashmap of count of primary index on a server. a count > 1
// means that replicas are co-hosted
for(Map.Entry<Integer, ArrayList<Integer>> pair : primariesOfRegions.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/List/ArrayList/

int numReplicas = pair.getValue().size();
// square the cost
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
cost += (numReplicas - 1) * (numReplicas - 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.ArrayList;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -32,7 +35,7 @@ class RegionReplicaHostCostFunction extends RegionReplicaGroupingCostFunction {
"hbase.master.balancer.stochastic.regionReplicaHostCostKey";
private static final float DEFAULT_REGION_REPLICA_HOST_COST_KEY = 100000;

private int[][] primariesOfRegionsPerGroup;
private ArrayList<HashMap<Integer, ArrayList<Integer>>> primariesOfRegionsPerGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of List of int [] ?

s/List/ArrayList/ ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Map/HashMap/


public RegionReplicaHostCostFunction(Configuration conf) {
this.setMultiplier(
Expand All @@ -46,8 +49,8 @@ protected void loadCosts() {
costsPerGroup = new long[cluster.numHosts];
primariesOfRegionsPerGroup = cluster.multiServersPerHost // either server based or host based
? cluster.primariesOfRegionsPerHost : cluster.primariesOfRegionsPerServer;
for (int i = 0; i < primariesOfRegionsPerGroup.length; i++) {
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup[i]);
for (int i = 0; i < primariesOfRegionsPerGroup.size(); i++) {
costsPerGroup[i] = costPerGroup(primariesOfRegionsPerGroup.get(i));
}
}

Expand All @@ -60,12 +63,12 @@ protected void regionMoved(int region, int oldServer, int newServer) {
int oldHost = cluster.serverIndexToHostIndex[oldServer];
int newHost = cluster.serverIndexToHostIndex[newServer];
if (newHost != oldHost) {
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost[oldHost]);
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost[newHost]);
costsPerGroup[oldHost] = costPerGroup(cluster.primariesOfRegionsPerHost.get(oldHost));
costsPerGroup[newHost] = costPerGroup(cluster.primariesOfRegionsPerHost.get(newHost));
}
} else {
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer[oldServer]);
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer[newServer]);
costsPerGroup[oldServer] = costPerGroup(cluster.primariesOfRegionsPerServer.get(oldServer));
costsPerGroup[newServer] = costPerGroup(cluster.primariesOfRegionsPerServer.get(newServer));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ BalanceAction generate(BalancerClusterState cluster) {
return super.generate(cluster);
}

int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack[rackIndex],
cluster.regionsPerRack[rackIndex], cluster.regionIndexToPrimaryIndex);
int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack.get(rackIndex),
cluster.regionsPerRack.get(rackIndex), cluster.regionIndexToPrimaryIndex);

// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
Expand All @@ -50,4 +50,4 @@ BalanceAction generate(BalancerClusterState cluster) {
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ protected void loadCosts() {
// max cost is the case where every region replica is hosted together regardless of rack
maxCost = getMaxCost(cluster);
costsPerGroup = new long[cluster.numRacks];
for (int i = 0; i < cluster.primariesOfRegionsPerRack.length; i++) {
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack[i]);
for (int i = 0; i < cluster.primariesOfRegionsPerRack.size(); i++) {
costsPerGroup[i] = costPerGroup(cluster.primariesOfRegionsPerRack.get(i));
}
}

Expand All @@ -58,8 +58,8 @@ protected void regionMoved(int region, int oldServer, int newServer) {
int oldRack = cluster.serverIndexToRackIndex[oldServer];
int newRack = cluster.serverIndexToRackIndex[newServer];
if (newRack != oldRack) {
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack[oldRack]);
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack[newRack]);
costsPerGroup[oldRack] = costPerGroup(cluster.primariesOfRegionsPerRack.get(oldRack));
costsPerGroup[newRack] = costPerGroup(cluster.primariesOfRegionsPerRack.get(newRack));
}
}
}
}