Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -38,17 +38,17 @@
interface RegionNormalizer extends Configurable {
/**
* Set the master service. Must be called before first call to
* {@link #computePlansForTable(TableName)}.
* {@link #computePlansForTable(TableDescriptor)}.
* @param masterServices master services to use
*/
void setMasterServices(MasterServices masterServices);

/**
* Computes a list of normalizer actions to perform on the target table. This is the primary
* entry-point from the Master driving a normalization activity.
* @param table table to normalize
* @param tableDescriptor table descriptor for table which needs normalize
* @return A list of the normalization actions to perform, or an empty list
* if there's nothing to do.
*/
List<NormalizationPlan> computePlansForTable(TableName table);
List<NormalizationPlan> computePlansForTable(TableDescriptor tableDescriptor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final TableDescriptor tblDesc;
try {
final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
tblDesc = masterServices.getTableDescriptors().get(tableName);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
tableName);
Expand All @@ -190,7 +191,7 @@ private List<NormalizationPlan> calculatePlans(final TableName tableName) {
return Collections.emptyList();
}

final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", tableName);
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.normalizer;

import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
import java.io.IOException;
import java.time.Instant;
import java.time.Period;
import java.util.ArrayList;
Expand All @@ -27,6 +26,7 @@
import java.util.List;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.RegionMetrics;
Expand Down Expand Up @@ -184,23 +184,24 @@ public void setMasterServices(final MasterServices masterServices) {
}

@Override
public List<NormalizationPlan> computePlansForTable(final TableName table) {
if (table == null) {
public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableDescriptor) {
if (tableDescriptor == null) {
return Collections.emptyList();
}
TableName table = tableDescriptor.getTableName();
if (table.isSystemTable()) {
LOG.debug("Normalization of system table {} isn't allowed", table);
return Collections.emptyList();
}

final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
final boolean proceedWithMergePlanning = proceedWithMergePlanning();
final boolean proceedWithSplitPlanning = proceedWithSplitPlanning(tableDescriptor);
final boolean proceedWithMergePlanning = proceedWithMergePlanning(tableDescriptor);
if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
return Collections.emptyList();
}

final NormalizeContext ctx = new NormalizeContext(table);
final NormalizeContext ctx = new NormalizeContext(tableDescriptor);
if (isEmpty(ctx.getTableRegions())) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -254,41 +255,38 @@ private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
return masterServices.isSplitOrMergeEnabled(masterSwitchType);
}

private boolean proceedWithSplitPlanning() {
return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
private boolean proceedWithSplitPlanning(TableDescriptor tableDescriptor) {
String value = tableDescriptor.getValue(SPLIT_ENABLED_KEY);
return (value == null ? isSplitEnabled() : Boolean.parseBoolean(value)) &&
isMasterSwitchEnabled(MasterSwitchType.SPLIT);
}

private boolean proceedWithMergePlanning() {
return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
private boolean proceedWithMergePlanning(TableDescriptor tableDescriptor) {
String value = tableDescriptor.getValue(MERGE_ENABLED_KEY);
return (value == null ? isMergeEnabled() : Boolean.parseBoolean(value)) &&
isMasterSwitchEnabled(MasterSwitchType.MERGE);
}

/**
* @param tableRegions regions of table to normalize
* @param tableDescriptor the TableDescriptor
* @return average region size depending on
* @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
* Also make sure tableRegions contains regions of the same table
*/
private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions,
final TableDescriptor tableDescriptor) {
if (isEmpty(tableRegions)) {
throw new IllegalStateException(
"Cannot calculate average size of a table without any regions.");
}
TableName table = tableRegions.get(0).getTable();
int targetRegionCount = -1;
long targetRegionSize = -1;
TableName table = tableDescriptor.getTableName();
double avgRegionSize;
try {
TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
if (tableDescriptor != null) {
targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {} configured with target region count {}, target region size {}", table,
targetRegionCount, targetRegionSize);
}
} catch (IOException e) {
LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
+ " configurations cannot be considered.", table, e);
}
int targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
long targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
LOG.debug("Table {} configured with target region count {}, target region size {}", table,
targetRegionCount, targetRegionSize);

if (targetRegionSize > 0) {
avgRegionSize = targetRegionSize;
} else {
Expand Down Expand Up @@ -316,10 +314,10 @@ private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
*/
private boolean skipForMerge(
final NormalizerConfiguration normalizerConfiguration,
final RegionStates regionStates,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
final RegionState state = regionStates.getRegionState(regionInfo);
final RegionState state = ctx.getRegionStates().getRegionState(regionInfo);
final String name = regionInfo.getEncodedName();
return
logTraceReason(
Expand All @@ -329,10 +327,10 @@ private boolean skipForMerge(
() -> !Objects.equals(state.getState(), RegionState.State.OPEN),
"skipping merge of region {} because it is not open.", name)
|| logTraceReason(
() -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
() -> !isOldEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
"skipping merge of region {} because it is not old enough.", name)
|| logTraceReason(
() -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
() -> !isLargeEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
"skipping merge region {} because it is not large enough.", name);
}

Expand All @@ -342,15 +340,15 @@ private boolean skipForMerge(
*/
private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
final NormalizerConfiguration configuration = normalizerConfiguration;
if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
if (ctx.getTableRegions().size() < configuration.getMinRegionCount(ctx)) {
LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
+ " is {}, not computing merge plans.", ctx.getTableName(),
ctx.getTableRegions().size(), configuration.getMinRegionCount());
return Collections.emptyList();
}

final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb(ctx)) {
return Collections.emptyList();
}
LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
Expand All @@ -373,7 +371,7 @@ private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeCo
for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
final RegionInfo regionInfo = ctx.getTableRegions().get(current);
final long regionSizeMb = getRegionSizeMB(regionInfo);
if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
if (skipForMerge(configuration, ctx, regionInfo)) {
// this region cannot participate in a range. resume the outer loop.
rangeStart = Math.max(current, rangeStart + 1);
break;
Expand Down Expand Up @@ -451,12 +449,13 @@ private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeCo
*/
private static boolean isOldEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
return currentTime.isAfter(
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge(ctx)));
}

/**
Expand All @@ -468,9 +467,10 @@ private static boolean isOldEnoughForMerge(
*/
private boolean isLargeEnoughForMerge(
final NormalizerConfiguration normalizerConfiguration,
final NormalizeContext ctx,
final RegionInfo regionInfo
) {
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
}

private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
Expand Down Expand Up @@ -541,28 +541,56 @@ public int getMinRegionCount() {
return minRegionCount;
}

public int getMinRegionCount(NormalizeContext context) {
int minRegionCount = context.getOrDefault(MIN_REGION_COUNT_KEY, Integer::parseInt, 0);
if (minRegionCount <= 0) {
minRegionCount = getMinRegionCount();
}
return minRegionCount;
}

public Period getMergeMinRegionAge() {
return mergeMinRegionAge;
}

public Period getMergeMinRegionAge(NormalizeContext context) {
int mergeMinRegionAge = context.getOrDefault(MERGE_MIN_REGION_AGE_DAYS_KEY,
Integer::parseInt, -1);
if (mergeMinRegionAge < 0) {
return getMergeMinRegionAge();
}
return Period.ofDays(mergeMinRegionAge);
}

public long getMergeMinRegionSizeMb() {
return mergeMinRegionSizeMb;
}

public long getMergeMinRegionSizeMb(NormalizeContext context) {
long mergeMinRegionSizeMb = context.getOrDefault(MERGE_MIN_REGION_SIZE_MB_KEY,
Long::parseLong, (long)-1);
if (mergeMinRegionSizeMb < 0) {
mergeMinRegionSizeMb = getMergeMinRegionSizeMb();
}
return mergeMinRegionSizeMb;
}
}

/**
* Inner class caries the state necessary to perform a single invocation of
* {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
* {@link #computePlansForTable(TableDescriptor)}. Grabbing this data from the assignment manager
* up-front allows any computed values to be realized just once.
*/
private class NormalizeContext {
private final TableName tableName;
private final RegionStates regionStates;
private final List<RegionInfo> tableRegions;
private final double averageRegionSizeMb;
private final TableDescriptor tableDescriptor;

public NormalizeContext(final TableName tableName) {
this.tableName = tableName;
public NormalizeContext(final TableDescriptor tableDescriptor) {
this.tableDescriptor = tableDescriptor;
tableName = tableDescriptor.getTableName();
regionStates = SimpleRegionNormalizer.this.masterServices
.getAssignmentManager()
.getRegionStates();
Expand All @@ -574,7 +602,8 @@ public NormalizeContext(final TableName tableName) {
// In order to avoid that, sort the list by RegionInfo.COMPARATOR.
// See HBASE-24376
tableRegions.sort(RegionInfo.COMPARATOR);
averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions,
this.tableDescriptor);
}

public TableName getTableName() {
Expand All @@ -592,5 +621,14 @@ public List<RegionInfo> getTableRegions() {
public double getAverageRegionSizeMb() {
return averageRegionSizeMb;
}

public <T> T getOrDefault(String key, Function<String, T> function, T defaultValue) {
String value = tableDescriptor.getValue(key);
if (value == null) {
return defaultValue;
} else {
return function.apply(value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testMergeCounter() throws Exception {
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(singletonList(new MergeNormalizationPlan.Builder()
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
.addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
Expand All @@ -160,7 +160,7 @@ public void testSplitCounter() throws Exception {
when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(singletonList(
new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));

Expand Down Expand Up @@ -192,7 +192,7 @@ public void testRateLimit() throws Exception {
.thenReturn(1L);
when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
.thenReturn(1L);
when(regionNormalizer.computePlansForTable(tn))
when(regionNormalizer.computePlansForTable(tnDescriptor))
.thenReturn(Arrays.asList(
new SplitNormalizationPlan(splitRegionInfo, 2),
new MergeNormalizationPlan.Builder()
Expand Down
Loading