Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-22618 Provide a way to have Heterogeneous deployment #439

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,6 +85,13 @@
* <li>hbase.master.balancer.stochastic.storefileSizeCost</li>
* </ul>
*
* <p>You can also add custom Cost function by setting the the following configuration value:</p>
* <ul>
* <li>hbase.master.balancer.stochastic.additionalCostFunctions</li>
* </ul>
*
* <p>All custom Cost Functions needs to extends {@link StochasticLoadBalancer.CostFunction}</p>
*
* <p>In addition to the above configurations, the balancer can be tuned by the following
* configuration values:</p>
* <ul>
Expand Down Expand Up @@ -117,6 +127,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final String TABLE_FUNCTION_SEP = "_";
protected static final String MIN_COST_NEED_BALANCE_KEY =
"hbase.master.balancer.stochastic.minCostNeedBalance";
protected static final String COST_FUNCTIONS_COST_FUNCTIONS_KEY =
"hbase.master.balancer.stochastic.additionalCostFunctions";

protected static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(StochasticLoadBalancer.class);
Expand All @@ -133,7 +145,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {

private List<CandidateGenerator> candidateGenerators;
private CostFromRegionLoadFunction[] regionLoadFunctions;
private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC

// to save and report costs to JMX
private Double curOverallCost = 0d;
Expand Down Expand Up @@ -196,25 +208,57 @@ public synchronized void setConf(Configuration conf) {
};
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
costFunctions = new CostFunction[]{
new RegionCountSkewCostFunction(conf),
new PrimaryRegionCountSkewCostFunction(conf),
new MoveCostFunction(conf),
localityCost,
rackLocalityCost,
new TableSkewCostFunction(conf),
regionReplicaHostCostFunction,
regionReplicaRackCostFunction,
regionLoadFunctions[0],
regionLoadFunctions[1],
regionLoadFunctions[2],
regionLoadFunctions[3],
regionLoadFunctions[4]
};
curFunctionCosts= new Double[costFunctions.length];
tempFunctionCosts= new Double[costFunctions.length];

costFunctions = new ArrayList<>();
costFunctions.add(new RegionCountSkewCostFunction(conf));
costFunctions.add(new PrimaryRegionCountSkewCostFunction(conf));
costFunctions.add(new MoveCostFunction(conf));
costFunctions.add(localityCost);
costFunctions.add(rackLocalityCost);
costFunctions.add(new TableSkewCostFunction(conf));
costFunctions.add(regionReplicaHostCostFunction);
costFunctions.add(regionReplicaRackCostFunction);
costFunctions.add(regionLoadFunctions[0]);
costFunctions.add(regionLoadFunctions[1]);
costFunctions.add(regionLoadFunctions[2]);
costFunctions.add(regionLoadFunctions[3]);
costFunctions.add(regionLoadFunctions[4]);
loadCustomCostFunctions(conf);

curFunctionCosts= new Double[costFunctions.size()];
tempFunctionCosts= new Double[costFunctions.size()];
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", CostFunctions=" +
Arrays.toString(getCostFunctionNames()) + " etc.");
}

private void loadCustomCostFunctions(Configuration conf) {
String[] functionsNames = conf.getStrings(COST_FUNCTIONS_COST_FUNCTIONS_KEY);

if (null == functionsNames) {
return;
}

costFunctions.addAll(Arrays.stream(functionsNames)
.map(c -> {
Class<? extends CostFunction> klass = null;
try {
klass = (Class<? extends CostFunction>) Class.forName(c);
} catch (ClassNotFoundException e) {
LOG.warn("Cannot load class " + c + "': " + e.getMessage());
}
if (null == klass) {
return null;
}

CostFunction reflected = ReflectionUtils.newInstance(klass, conf);
LOG.info("Successfully loaded custom CostFunction '" +
reflected.getClass().getSimpleName() + "'");

return reflected;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
}

protected void setCandidateGenerators(List<CandidateGenerator> customCandidateGenerators) {
Expand Down Expand Up @@ -468,8 +512,8 @@ private void updateStochasticCosts(TableName tableName, Double overall, Double[]
"Overall", "Overall cost", overall);

// each cost function
for (int i = 0; i < costFunctions.length; i++) {
CostFunction costFunction = costFunctions[i];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction costFunction = costFunctions.get(i);
String costFunctionName = costFunction.getClass().getSimpleName();
Double costPercent = (overall == 0) ? 0 : (subCosts[i] / overall);
// TODO: cost function may need a specific description
Expand Down Expand Up @@ -567,9 +611,9 @@ protected void updateCostsWithAction(Cluster cluster, Action action) {
*/
public String[] getCostFunctionNames() {
if (costFunctions == null) return null;
String[] ret = new String[costFunctions.length];
for (int i = 0; i < costFunctions.length; i++) {
CostFunction c = costFunctions[i];
String[] ret = new String[costFunctions.size()];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
ret[i] = c.getClass().getSimpleName();
}

Expand All @@ -588,8 +632,8 @@ public String[] getCostFunctionNames() {
protected double computeCost(Cluster cluster, double previousCost) {
double total = 0;

for (int i = 0; i < costFunctions.length; i++) {
CostFunction c = costFunctions[i];
for (int i = 0; i < costFunctions.size(); i++) {
CostFunction c = costFunctions.get(i);
this.tempFunctionCosts[i] = 0.0;

if (c.getMultiplier() <= 0) {
Expand Down Expand Up @@ -972,13 +1016,13 @@ Cluster.Action generate(Cluster cluster) {
/**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
abstract static class CostFunction {
public abstract static class CostFunction {

private float multiplier = 0;

protected Cluster cluster;

CostFunction(Configuration c) {
public CostFunction(Configuration c) {
}

boolean isNeeded() {
Expand Down Expand Up @@ -1027,7 +1071,7 @@ void postAction(Action action) {
protected void regionMoved(int region, int oldServer, int newServer) {
}

abstract double cost();
protected abstract double cost();

@SuppressWarnings("checkstyle:linelength")
/**
Expand Down Expand Up @@ -1124,7 +1168,7 @@ static class MoveCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
// Try and size the max number of Moves, but always be prepared to move some.
int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
DEFAULT_MAX_MOVES);
Expand Down Expand Up @@ -1159,7 +1203,7 @@ static class RegionCountSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
Expand Down Expand Up @@ -1191,7 +1235,7 @@ static class PrimaryRegionCountSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
if (!cluster.hasRegionReplicas) {
return 0;
}
Expand Down Expand Up @@ -1228,7 +1272,7 @@ static class TableSkewCostFunction extends CostFunction {
}

@Override
double cost() {
protected double cost() {
double max = cluster.numRegions;
double min = ((double) cluster.numRegions) / cluster.numServers;
double value = 0;
Expand Down Expand Up @@ -1311,7 +1355,7 @@ protected void regionMoved(int region, int oldServer, int newServer) {
}

@Override
double cost() {
protected double cost() {
return 1 - locality;
}

Expand Down Expand Up @@ -1389,7 +1433,7 @@ void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
}

@Override
double cost() {
protected double cost() {
if (clusterStatus == null || loads == null) {
return 0;
}
Expand Down Expand Up @@ -1581,7 +1625,7 @@ boolean isNeeded() {
}

@Override
double cost() {
protected double cost() {
if (maxCost <= 0) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.conf.Configuration;

public class DummyCostFunction extends StochasticLoadBalancer.CostFunction {
public DummyCostFunction(Configuration c) {
super(c);
}

@Override
protected double cost() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -53,6 +47,10 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.junit.Assert.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am getting the following checkstyle issues, when running checkstyle locally:

Error imports AvoidStarImport Using the '.' form of import should be avoided - org.junit.Assert.. 50
Error imports ImportOrder Wrong order for 'org.junit.Assert.*' import. 50

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for the trouble, I must admit that I was trusting the import style of my IDE 😄

Will fix it in another commit that will be checked locally this time

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@Category({ MasterTests.class, MediumTests.class })
public class TestStochasticLoadBalancer extends BalancerTestBase {

Expand Down Expand Up @@ -121,7 +119,8 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
};

private ServerMetrics mockServerMetricsWithCpRequests(ServerName server,
List<RegionInfo> regionsOnServer, long cpRequestCount) {
List<RegionInfo> regionsOnServer,
long cpRequestCount) {
ServerMetrics serverMetrics = mock(ServerMetrics.class);
Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(RegionInfo info : regionsOnServer){
Expand Down Expand Up @@ -457,6 +456,17 @@ public void testLosingRs() throws Exception {
assertNull(plans);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about test that, upon StochasticLoadBalancer.balanceCluster() call, verify that DummyCostFunctions.cost() get indeed called? That would validate custom cost functions are indeed used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about adding tests with the real cost function I want to implement, is that OK for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should keep this jira concerned only with the implementation to allow additional cost functions to be added. Then you could propose another jira with the real implementation of your cost functions, which then would also define those functions specific tests. What would you say?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

@Test
public void testAdditionalCostFunction() {
conf.set(StochasticLoadBalancer.COST_FUNCTIONS_COST_FUNCTIONS_KEY,
DummyCostFunction.class.getName());

loadBalancer.setConf(conf);
assertTrue(Arrays.
asList(loadBalancer.getCostFunctionNames()).
contains(DummyCostFunction.class.getSimpleName()));
}

// This mock allows us to test the LocalityCostFunction
private class MockCluster extends BaseLoadBalancer.Cluster {

Expand Down