Skip to content

Commit

Permalink
HBASE-24418 Consolidate Normalizer implementations
Browse files Browse the repository at this point in the history
Simplify our Normalizer story to have just a single, configurable
implementation.

* fold the features of `MergeNormalizer` into
  `SimpleRegionNormalizer`, removing the intermediate abstract class.
* configuration keys for merge-only features now share a common
  structure.
* add configuration to selectively disable normalizer split/merge
  operations.
* `RegionNormalizer` now extends `Configurable` instead of creating a
  new instance of `HBaseConfiguration` or snooping one off of other
  fields.
* avoid the extra RPCs by using `MasterServices` instead of
  `MasterRpcServices`.
* boost test coverage of all the various flags and feature
  combinations.
  • Loading branch information
ndimiduk committed May 27, 2020
1 parent a9205f8 commit 171d1fc
Show file tree
Hide file tree
Showing 10 changed files with 1,097 additions and 1,378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -400,6 +401,8 @@ public void run() {
private final LockManager lockManager = new LockManager(this);

private RSGroupBasedLoadBalancer balancer;
// a lock to prevent concurrent normalization actions.
private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
private RegionNormalizer normalizer;
private BalancerChore balancerChore;
private RegionNormalizerChore normalizerChore;
Expand Down Expand Up @@ -797,7 +800,6 @@ protected void initializeZKBasedSystemTrackers()
this.balancer.setConf(conf);
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();

Expand Down Expand Up @@ -1887,7 +1889,6 @@ public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans)
}

@Override
@VisibleForTesting
public RegionNormalizer getRegionNormalizer() {
return this.normalizer;
}
Expand All @@ -1911,43 +1912,51 @@ public boolean normalizeRegions() throws IOException {
return false;
}

synchronized (this.normalizer) {
if (!normalizationInProgressLock.tryLock()) {
// Don't run the normalizer concurrently
LOG.info("Normalization already in progress. Skipping request.");
} else {
try {
List<TableName> allEnabledTables = new ArrayList<>(
tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables);

List<TableName> allEnabledTables = new ArrayList<>(
this.tableStateManager.getTablesInStates(TableState.State.ENABLED));

Collections.shuffle(allEnabledTables);

for (TableName table : allEnabledTables) {
TableDescriptor tblDesc = getTableDescriptors().get(table);
if (table.isSystemTable() || (tblDesc != null &&
!tblDesc.isNormalizationEnabled())) {
LOG.trace("Skipping normalization for {}, as it's either system"
+ " table or doesn't have auto normalization turned on", table);
continue;
}
for (TableName table : allEnabledTables) {
if (table.isSystemTable()) {
continue;
}
final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug(
"Skipping {} because normalization is disabled in its table properties.", table);
continue;
}

// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}
// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}

final List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
if (CollectionUtils.isEmpty(plans)) {
return true;
}
final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) {
return true;
}

try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
for (NormalizationPlan plan : plans) {
plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
// as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
// limiting of merge requests due to this serial loop.
for (NormalizationPlan plan : plans) {
plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
}
}
}
} finally {
normalizationInProgressLock.unlock();
}
}
// If Region did not generate any plans, it means the cluster is already balanced.
Expand Down

This file was deleted.

Loading

0 comments on commit 171d1fc

Please sign in to comment.