Skip to content

Commit

Permalink
HBASE-15073 Finer grained control over normalization actions for Regi…
Browse files Browse the repository at this point in the history
…onNormalizer
  • Loading branch information
tedyu committed Jan 7, 2016
1 parent a9b671b commit d65978f
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 58 deletions.
Expand Up @@ -40,6 +40,8 @@
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilySchema;
Expand Down Expand Up @@ -185,13 +187,14 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {

/**
* <em>INTERNAL</em> Used by shell/rest interface to access this metadata
* attribute which denotes if the table should be treated by region normalizer.
* attribute which denotes the allowed types of action (split/merge) when the table is treated
* by region normalizer.
*
* @see #isNormalizationEnabled()
* @see #getDesiredNormalizationTypes()
*/
public static final String NORMALIZATION_ENABLED = "NORMALIZATION_ENABLED";
private static final Bytes NORMALIZATION_ENABLED_KEY =
new Bytes(Bytes.toBytes(NORMALIZATION_ENABLED));
public static final String NORMALIZATION_MODE = "NORMALIZATION_MODE";
private static final Bytes NORMALIZATION_MODE_KEY =
new Bytes(Bytes.toBytes(NORMALIZATION_MODE));

/** Default durability for HTD is USE_DEFAULT, which defaults to HBase-global default value */
private static final Durability DEFAULT_DURABLITY = Durability.USE_DEFAULT;
Expand Down Expand Up @@ -219,11 +222,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
*/
public static final boolean DEFAULT_COMPACTION_ENABLED = true;

/**
* Constant that denotes whether the table is normalized by default.
*/
public static final boolean DEFAULT_NORMALIZATION_ENABLED = false;

/**
* Constant that denotes the maximum default size of the memstore after which
* the contents are flushed to the store files
Expand All @@ -249,7 +247,7 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
String.valueOf(DEFAULT_DEFERRED_LOG_FLUSH));
DEFAULT_VALUES.put(DURABILITY, DEFAULT_DURABLITY.name()); //use the enum name
DEFAULT_VALUES.put(REGION_REPLICATION, String.valueOf(DEFAULT_REGION_REPLICATION));
DEFAULT_VALUES.put(NORMALIZATION_ENABLED, String.valueOf(DEFAULT_NORMALIZATION_ENABLED));
DEFAULT_VALUES.put(NORMALIZATION_MODE, "");
for (String s : DEFAULT_VALUES.keySet()) {
RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
}
Expand Down Expand Up @@ -640,22 +638,42 @@ public HTableDescriptor setCompactionEnabled(final boolean isEnable) {
}

/**
* Check if normalization enable flag of the table is true. If flag is
* false then no region normalizer won't attempt to normalize this table.
* Check if normalization flag of the table. If flag is
* empty then region normalizer won't attempt to normalize this table.
*
* @return true if region normalization is enabled for this table
* @return List of PlanType if region normalization is enabled for this table
* null means region normalization is disabled
*/
public boolean isNormalizationEnabled() {
return isSomething(NORMALIZATION_ENABLED_KEY, DEFAULT_NORMALIZATION_ENABLED);
public List<PlanType> getDesiredNormalizationTypes() {
byte [] value = getValue(NORMALIZATION_MODE_KEY);
if (value == null) {
return null;
}
String strValue = Bytes.toString(value);
if (strValue.isEmpty()) {
return null;
}
List<NormalizationPlan.PlanType> types = new ArrayList<>();
if (strValue.toUpperCase().contains("M")) {
types.add(PlanType.MERGE);
}
if (strValue.toUpperCase().contains("S")) {
types.add(PlanType.SPLIT);
}
return types;
}

/**
* Setting the table normalization enable flag.
* Setting the types of action for table normalization mode flag.
*
* @param isEnable True if enable normalization.
*/
public HTableDescriptor setNormalizationEnabled(final boolean isEnable) {
setValue(NORMALIZATION_ENABLED_KEY, isEnable ? TRUE : FALSE);
* @param types String containing desired types of action:
* "M" for region merge
* "S" for region split
* "MS" for region merge / split
*/
public HTableDescriptor setNormalizationMode(final String types) {
setValue(NORMALIZATION_MODE_KEY, types == null || types.isEmpty() ? null :
new Bytes(Bytes.toBytes(types.toUpperCase())));
return this;
}

Expand Down
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.normalizer;
package org.apache.hadoop.hbase.normalizer;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
Expand Down
Expand Up @@ -72,7 +72,7 @@
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
Expand Down
Expand Up @@ -92,7 +92,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
Expand All @@ -114,6 +113,8 @@
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
Expand Down Expand Up @@ -1323,14 +1324,21 @@ public boolean normalizeRegions() throws IOException {

for (TableName table : allEnabledTables) {
TableDescriptor tblDesc = getTableDescriptors().getDescriptor(table);
if (table.isSystemTable() || (tblDesc != null &&
tblDesc.getHTableDescriptor() != null &&
!tblDesc.getHTableDescriptor().isNormalizationEnabled())) {
LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
+ " table or doesn't have auto normalization turned on");
if (table.isSystemTable()) {
LOG.debug("Skipping normalization for table: " + table + ", as it's system table");
continue;
}
NormalizationPlan plan = this.normalizer.computePlanForTable(table);
List<PlanType> types = null;
if (tblDesc != null &&
tblDesc.getHTableDescriptor() != null) {
types = tblDesc.getHTableDescriptor().getDesiredNormalizationTypes();
if (types == null) {
LOG.debug("Skipping normalization for table: " + table + ", as it"
+ " doesn't have auto normalization turned on");
continue;
}
}
NormalizationPlan plan = this.normalizer.computePlanForTable(table, types);
plan.execute(clusterConnection.getAdmin());
}
}
Expand Down
Expand Up @@ -20,7 +20,7 @@

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;

/**
* Plan which signifies that no normalization is required,
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;

import java.io.IOException;

Expand Down
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.util.List;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;

/**
* Performs "normalization" of regions on the cluster, making sure that suboptimal
Expand All @@ -47,9 +50,11 @@ public interface RegionNormalizer {
/**
* Computes next optimal normalization plan.
* @param table table to normalize
* @param types desired types of NormalizationPlan
* @return Next (perhaps most urgent) normalization action to perform
*/
NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException;
NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
throws HBaseIOException;

/**
* Notification for the case where plan couldn't be executed due to constraint violation, such as
Expand Down
Expand Up @@ -27,7 +27,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.util.Triple;

import java.util.ArrayList;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class SimpleRegionNormalizer implements RegionNormalizer {
private static final Log LOG = LogFactory.getLog(SimpleRegionNormalizer.class);
private static final int MIN_REGION_COUNT = 3;
private MasterServices masterServices;
private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
private static long[] skippedCount = new long[PlanType.values().length];

/**
* Set the master service.
Expand Down Expand Up @@ -102,10 +103,12 @@ public long getSkippedCount(NormalizationPlan.PlanType type) {
* Action may be either a split, or a merge, or no action.
*
* @param table table to normalize
* @param types desired types of NormalizationPlan
* @return normalization plan to execute
*/
@Override
public NormalizationPlan computePlanForTable(TableName table) throws HBaseIOException {
public NormalizationPlan computePlanForTable(TableName table, List<PlanType> types)
throws HBaseIOException {
if (table == null || table.isSystemTable()) {
LOG.debug("Normalization of system table " + table + " isn't allowed");
return EmptyNormalizationPlan.getInstance();
Expand Down Expand Up @@ -146,7 +149,7 @@ public NormalizationPlan computePlanForTable(TableName table) throws HBaseIOExce

// now; if the largest region is >2 times large than average, we split it, split
// is more high priority normalization action than merge.
if (largestRegion.getSecond() > 2 * avgRegionSize) {
if (types.contains(PlanType.SPLIT) && largestRegion.getSecond() > 2 * avgRegionSize) {
LOG.debug("Table " + table + ", largest region "
+ largestRegion.getFirst().getRegionNameAsString() + " has size "
+ largestRegion.getSecond() + ", more than 2 times than avg size, splitting");
Expand All @@ -167,7 +170,8 @@ public NormalizationPlan computePlanForTable(TableName table) throws HBaseIOExce
}
Triple<HRegionInfo, Long, Integer> candidateRegion = regionsWithSize.get(candidateIdx);
Triple<HRegionInfo, Long, Integer> candidateRegion2 = regionsWithSize.get(candidateIdx+1);
if (candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) {
if (types.contains(PlanType.MERGE) &&
candidateRegion.getSecond() + candidateRegion2.getSecond() < avgRegionSize) {
LOG.debug("Table " + table + ", smallest region size: " + candidateRegion.getSecond()
+ " and its smallest neighbor size: " + candidateRegion2.getSecond()
+ ", less than the avg size, merging them");
Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;

import java.io.IOException;
import java.util.Arrays;
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -53,6 +55,18 @@ public class TestSimpleRegionNormalizer {
private static final Log LOG = LogFactory.getLog(TestSimpleRegionNormalizer.class);

private static RegionNormalizer normalizer;
private static List<PlanType> bothTypes;
static {
bothTypes = new ArrayList<>();
bothTypes.add(PlanType.SPLIT);
bothTypes.add(PlanType.MERGE);
}

private static List<PlanType> splitType;
static {
splitType = new ArrayList<>();
splitType.add(PlanType.SPLIT);
}

// mocks
private static MasterServices masterServices;
Expand All @@ -69,7 +83,7 @@ public void testNoNormalizationForMetaTable() throws HBaseIOException {
Map<byte[], Integer> regionSizes = new HashMap<>();

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof EmptyNormalizationPlan);
}

Expand All @@ -88,7 +102,7 @@ public void testNoNormalizationIfTooFewRegions() throws HBaseIOException {
regionSizes.put(hri2.getRegionName(), 15);

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue((plan instanceof EmptyNormalizationPlan));
}

Expand All @@ -114,14 +128,18 @@ public void testNoNormalizationOnNormalizedCluster() throws HBaseIOException {
hris.add(hri4);
regionSizes.put(hri4.getRegionName(), 10);


setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);
assertTrue(plan instanceof EmptyNormalizationPlan);
}

@Test
public void testMergeOfSmallRegions() throws HBaseIOException {
testMergeOfSmallRegions(true);
testMergeOfSmallRegions(false);
}

public void testMergeOfSmallRegions(boolean mergeDesired) throws HBaseIOException {
TableName testTable = TableName.valueOf("testMergeOfSmallRegions");
List<HRegionInfo> hris = new ArrayList<>();
Map<byte[], Integer> regionSizes = new HashMap<>();
Expand All @@ -147,11 +165,16 @@ public void testMergeOfSmallRegions() throws HBaseIOException {
regionSizes.put(hri5.getRegionName(), 16);

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);

assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
NormalizationPlan plan = normalizer.computePlanForTable(testTable,
mergeDesired ? bothTypes : splitType);

if (mergeDesired) {
assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri2, ((MergeNormalizationPlan) plan).getFirstRegion());
assertEquals(hri3, ((MergeNormalizationPlan) plan).getSecondRegion());
} else {
assertTrue(plan instanceof EmptyNormalizationPlan);
}
}

// Test for situation illustrated in HBASE-14867
Expand Down Expand Up @@ -186,7 +209,7 @@ public void testMergeOfSecondSmallestRegions() throws HBaseIOException {
regionSizes.put(hri6.getRegionName(), 2700);

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);

assertTrue(plan instanceof MergeNormalizationPlan);
assertEquals(hri5, ((MergeNormalizationPlan) plan).getFirstRegion());
Expand Down Expand Up @@ -220,7 +243,7 @@ public void testMergeOfSmallNonAdjacentRegions() throws HBaseIOException {
regionSizes.put(hri5.getRegionName(), 5);

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);

assertTrue(plan instanceof EmptyNormalizationPlan);
}
Expand Down Expand Up @@ -248,7 +271,7 @@ public void testSplitOfLargeRegion() throws HBaseIOException {
regionSizes.put(hri4.getRegionName(), 30);

setupMocksForNormalizer(regionSizes, hris);
NormalizationPlan plan = normalizer.computePlanForTable(testTable);
NormalizationPlan plan = normalizer.computePlanForTable(testTable, bothTypes);

assertTrue(plan instanceof SplitNormalizationPlan);
assertEquals(hri4, ((SplitNormalizationPlan) plan).getRegionInfo());
Expand Down

0 comments on commit d65978f

Please sign in to comment.