Skip to content

Commit

Permalink
HBASE-17515 Reduce memory footprint of RegionLoads kept by Stochastic…
Browse files Browse the repository at this point in the history
…LoadBalancer (Tim Brown)
  • Loading branch information
tedyu committed Jan 26, 2017
1 parent 4254d7e commit a8bb27b
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 30 deletions.
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.master.balancer;

import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.classification.InterfaceStability;

/**
* Wrapper class for the few fields required by the {@link StochasticLoadBalancer}
* from the full {@link RegionLoad}.
*/
@InterfaceStability.Evolving
class BalancerRegionLoad {
private final long readRequestsCount;
private final long writeRequestsCount;
private final int memStoreSizeMB;
private final int storefileSizeMB;

BalancerRegionLoad(RegionLoad regionLoad) {
readRequestsCount = regionLoad.getReadRequestsCount();
writeRequestsCount = regionLoad.getWriteRequestsCount();
memStoreSizeMB = regionLoad.getMemStoreSizeMB();
storefileSizeMB = regionLoad.getStorefileSizeMB();
}

public long getReadRequestsCount() {
return readRequestsCount;
}

public long getWriteRequestsCount() {
return writeRequestsCount;
}

public int getMemStoreSizeMB() {
return memStoreSizeMB;
}

public int getStorefileSizeMB() {
return storefileSizeMB;
}
}
Expand Up @@ -118,7 +118,7 @@ protected static class Cluster {


ArrayList<String> tables; ArrayList<String> tables;
HRegionInfo[] regions; HRegionInfo[] regions;
Deque<RegionLoad>[] regionLoads; Deque<BalancerRegionLoad>[] regionLoads;
private RegionLocationFinder regionFinder; private RegionLocationFinder regionFinder;


int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
Expand Down Expand Up @@ -166,7 +166,7 @@ protected static class Cluster {


protected Cluster( protected Cluster(
Map<ServerName, List<HRegionInfo>> clusterState, Map<ServerName, List<HRegionInfo>> clusterState,
Map<String, Deque<RegionLoad>> loads, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RegionLocationFinder regionFinder,
RackManager rackManager) { RackManager rackManager) {
this(null, clusterState, loads, regionFinder, rackManager); this(null, clusterState, loads, regionFinder, rackManager);
Expand All @@ -176,7 +176,7 @@ protected Cluster(
protected Cluster( protected Cluster(
Collection<HRegionInfo> unassignedRegions, Collection<HRegionInfo> unassignedRegions,
Map<ServerName, List<HRegionInfo>> clusterState, Map<ServerName, List<HRegionInfo>> clusterState,
Map<String, Deque<RegionLoad>> loads, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder, RegionLocationFinder regionFinder,
RackManager rackManager) { RackManager rackManager) {


Expand Down Expand Up @@ -431,7 +431,7 @@ protected Cluster(


/** Helper for Cluster constructor to handle a region */ /** Helper for Cluster constructor to handle a region */
private void registerRegion(HRegionInfo region, int regionIndex, private void registerRegion(HRegionInfo region, int regionIndex,
int serverIndex, Map<String, Deque<RegionLoad>> loads, int serverIndex, Map<String, Deque<BalancerRegionLoad>> loads,
RegionLocationFinder regionFinder) { RegionLocationFinder regionFinder) {
String tableName = region.getTable().getNameAsString(); String tableName = region.getTable().getNameAsString();
if (!tablesToIndex.containsKey(tableName)) { if (!tablesToIndex.containsKey(tableName)) {
Expand All @@ -448,7 +448,7 @@ private void registerRegion(HRegionInfo region, int regionIndex,


// region load // region load
if (loads != null) { if (loads != null) {
Deque<RegionLoad> rl = loads.get(region.getRegionNameAsString()); Deque<BalancerRegionLoad> rl = loads.get(region.getRegionNameAsString());
// That could have failed if the RegionLoad is using the other regionName // That could have failed if the RegionLoad is using the other regionName
if (rl == null) { if (rl == null) {
// Try getting the region load using encoded name. // Try getting the region load using encoded name.
Expand Down
Expand Up @@ -113,7 +113,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class); private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);


Map<String, Deque<RegionLoad>> loads = new HashMap<String, Deque<RegionLoad>>(); Map<String, Deque<BalancerRegionLoad>> loads = new HashMap<String, Deque<BalancerRegionLoad>>();


// values are defaults // values are defaults
private int maxSteps = 1000000; private int maxSteps = 1000000;
Expand Down Expand Up @@ -498,25 +498,24 @@ private List<RegionPlan> createRegionPlans(Cluster cluster) {
private synchronized void updateRegionLoad() { private synchronized void updateRegionLoad() {
// We create a new hashmap so that regions that are no longer there are removed. // We create a new hashmap so that regions that are no longer there are removed.
// However we temporarily need the old loads so we can use them to keep the rolling average. // However we temporarily need the old loads so we can use them to keep the rolling average.
Map<String, Deque<RegionLoad>> oldLoads = loads; Map<String, Deque<BalancerRegionLoad>> oldLoads = loads;
loads = new HashMap<String, Deque<RegionLoad>>(); loads = new HashMap<String, Deque<BalancerRegionLoad>>();


for (ServerName sn : clusterStatus.getServers()) { for (ServerName sn : clusterStatus.getServers()) {
ServerLoad sl = clusterStatus.getLoad(sn); ServerLoad sl = clusterStatus.getLoad(sn);
if (sl == null) { if (sl == null) {
continue; continue;
} }
for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) { for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
Deque<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey())); Deque<BalancerRegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
if (rLoads == null) { if (rLoads == null) {
// There was nothing there // There was nothing there
rLoads = new ArrayDeque<RegionLoad>(); rLoads = new ArrayDeque<BalancerRegionLoad>();
} else if (rLoads.size() >= numRegionLoadsToRemember) { } else if (rLoads.size() >= numRegionLoadsToRemember) {
rLoads.remove(); rLoads.remove();
} }
rLoads.add(entry.getValue()); rLoads.add(new BalancerRegionLoad(entry.getValue()));
loads.put(Bytes.toString(entry.getKey()), rLoads); loads.put(Bytes.toString(entry.getKey()), rLoads);

} }
} }


Expand Down Expand Up @@ -1251,7 +1250,7 @@ void setServices(MasterServices srvc) {
abstract static class CostFromRegionLoadFunction extends CostFunction { abstract static class CostFromRegionLoadFunction extends CostFunction {


private ClusterStatus clusterStatus = null; private ClusterStatus clusterStatus = null;
private Map<String, Deque<RegionLoad>> loads = null; private Map<String, Deque<BalancerRegionLoad>> loads = null;
private double[] stats = null; private double[] stats = null;
CostFromRegionLoadFunction(Configuration conf) { CostFromRegionLoadFunction(Configuration conf) {
super(conf); super(conf);
Expand All @@ -1261,7 +1260,7 @@ void setClusterStatus(ClusterStatus status) {
this.clusterStatus = status; this.clusterStatus = status;
} }


void setLoads(Map<String, Deque<RegionLoad>> l) { void setLoads(Map<String, Deque<BalancerRegionLoad>> l) {
this.loads = l; this.loads = l;
} }


Expand All @@ -1281,7 +1280,7 @@ void setLoads(Map<String, Deque<RegionLoad>> l) {


// for every region on this server get the rl // for every region on this server get the rl
for(int regionIndex:cluster.regionsPerServer[i]) { for(int regionIndex:cluster.regionsPerServer[i]) {
Collection<RegionLoad> regionLoadList = cluster.regionLoads[regionIndex]; Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];


// Now if we found a region load get the type of cost that was requested. // Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) { if (regionLoadList != null) {
Expand All @@ -1297,15 +1296,15 @@ void setLoads(Map<String, Deque<RegionLoad>> l) {
return costFromArray(stats); return costFromArray(stats);
} }


protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) { protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0; double cost = 0;
for (RegionLoad rl : regionLoadList) { for (BalancerRegionLoad rl : regionLoadList) {
cost += getCostFromRl(rl); cost += getCostFromRl(rl);
} }
return cost / regionLoadList.size(); return cost / regionLoadList.size();
} }


protected abstract double getCostFromRl(RegionLoad rl); protected abstract double getCostFromRl(BalancerRegionLoad rl);
} }


/** /**
Expand All @@ -1320,11 +1319,11 @@ abstract static class CostFromRegionLoadAsRateFunction extends CostFromRegionLoa
} }


@Override @Override
protected double getRegionLoadCost(Collection<RegionLoad> regionLoadList) { protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
double cost = 0; double cost = 0;
double previous = 0; double previous = 0;
boolean isFirst = true; boolean isFirst = true;
for (RegionLoad rl : regionLoadList) { for (BalancerRegionLoad rl : regionLoadList) {
double current = getCostFromRl(rl); double current = getCostFromRl(rl);
if (isFirst) { if (isFirst) {
isFirst = false; isFirst = false;
Expand Down Expand Up @@ -1354,7 +1353,7 @@ static class ReadRequestCostFunction extends CostFromRegionLoadAsRateFunction {
} }


@Override @Override
protected double getCostFromRl(RegionLoad rl) { protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getReadRequestsCount(); return rl.getReadRequestsCount();
} }
} }
Expand All @@ -1375,7 +1374,7 @@ static class WriteRequestCostFunction extends CostFromRegionLoadAsRateFunction {
} }


@Override @Override
protected double getCostFromRl(RegionLoad rl) { protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getWriteRequestsCount(); return rl.getWriteRequestsCount();
} }
} }
Expand Down Expand Up @@ -1553,7 +1552,7 @@ static class MemstoreSizeCostFunction extends CostFromRegionLoadAsRateFunction {
} }


@Override @Override
protected double getCostFromRl(RegionLoad rl) { protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getMemStoreSizeMB(); return rl.getMemStoreSizeMB();
} }
} }
Expand All @@ -1573,7 +1572,7 @@ static class StoreFileCostFunction extends CostFromRegionLoadFunction {
} }


@Override @Override
protected double getCostFromRl(RegionLoad rl) { protected double getCostFromRl(BalancerRegionLoad rl) {
return rl.getStorefileSizeMB(); return rl.getStorefileSizeMB();
} }
} }
Expand Down
Expand Up @@ -69,7 +69,7 @@ public void testKeepRegionLoad() throws Exception {
ServerLoad sl = mock(ServerLoad.class); ServerLoad sl = mock(ServerLoad.class);


RegionLoad rl = mock(RegionLoad.class); RegionLoad rl = mock(RegionLoad.class);
when(rl.getStores()).thenReturn(i); when(rl.getStorefileSizeMB()).thenReturn(i);


Map<byte[], RegionLoad> regionLoadMap = Map<byte[], RegionLoad> regionLoadMap =
new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR); new TreeMap<byte[], RegionLoad>(Bytes.BYTES_COMPARATOR);
Expand All @@ -85,11 +85,11 @@ public void testKeepRegionLoad() throws Exception {
assertTrue(loadBalancer.loads.get(REGION_KEY) != null); assertTrue(loadBalancer.loads.get(REGION_KEY) != null);
assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15); assertTrue(loadBalancer.loads.get(REGION_KEY).size() == 15);


Queue<RegionLoad> loads = loadBalancer.loads.get(REGION_KEY); Queue<BalancerRegionLoad> loads = loadBalancer.loads.get(REGION_KEY);
int i = 0; int i = 0;
while(loads.size() > 0) { while(loads.size() > 0) {
RegionLoad rl = loads.remove(); BalancerRegionLoad rl = loads.remove();
assertEquals(i + (numClusterStatusToAdd - 15), rl.getStores()); assertEquals(i + (numClusterStatusToAdd - 15), rl.getStorefileSizeMB());
i ++; i ++;
} }
} }
Expand Down Expand Up @@ -232,9 +232,9 @@ public void testTableSkewCost() {


@Test @Test
public void testRegionLoadCost() { public void testRegionLoadCost() {
List<RegionLoad> regionLoads = new ArrayList<>(); List<BalancerRegionLoad> regionLoads = new ArrayList<>();
for (int i = 1; i < 5; i++) { for (int i = 1; i < 5; i++) {
RegionLoad regionLoad = mock(RegionLoad.class); BalancerRegionLoad regionLoad = mock(BalancerRegionLoad.class);
when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i)); when(regionLoad.getReadRequestsCount()).thenReturn(new Long(i));
when(regionLoad.getStorefileSizeMB()).thenReturn(i); when(regionLoad.getStorefileSizeMB()).thenReturn(i);
regionLoads.add(regionLoad); regionLoads.add(regionLoad);
Expand Down

0 comments on commit a8bb27b

Please sign in to comment.