Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-24628 Region normalizer now respects a rate limit" to branch-2 #2520

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 37 additions & 140 deletions hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -51,7 +50,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -118,11 +116,8 @@
import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
Expand Down Expand Up @@ -196,7 +191,6 @@
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock;
Expand Down Expand Up @@ -224,7 +218,6 @@
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
Expand Down Expand Up @@ -325,9 +318,6 @@ public void run() {
// Tracker for split and merge state
private SplitOrMergeTracker splitOrMergeTracker;

// Tracker for region normalizer state
private RegionNormalizerTracker regionNormalizerTracker;

private ClusterSchemaService clusterSchemaService;

public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
Expand Down Expand Up @@ -389,11 +379,8 @@ public void run() {
private final LockManager lockManager = new LockManager(this);

private LoadBalancer balancer;
// a lock to prevent concurrent normalization actions.
private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
private RegionNormalizer normalizer;
private BalancerChore balancerChore;
private RegionNormalizerChore normalizerChore;
private RegionNormalizerManager regionNormalizerManager;
private ClusterStatusChore clusterStatusChore;
private ClusterStatusPublisher clusterStatusPublisherChore = null;
private SnapshotCleanerChore snapshotCleanerChore = null;
Expand Down Expand Up @@ -448,9 +435,6 @@ public void run() {
// handle table states
private TableStateManager tableStateManager;

private long splitPlanCount;
private long mergePlanCount;

/* Handle favored nodes information */
private FavoredNodesManager favoredNodesManager;

Expand Down Expand Up @@ -775,27 +759,19 @@ public MetricsMaster getMasterMetrics() {
}

/**
* <p>
* Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
* should have already been initialized along with {@link ServerManager}.
* </p>
* <p>
* Will be overridden in tests.
* </p>
*/
@VisibleForTesting
protected void initializeZKBasedSystemTrackers()
throws IOException, InterruptedException, KeeperException, ReplicationException {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();

this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
this.normalizer.setMasterServices(this);
this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
this.regionNormalizerTracker.start();
this.regionNormalizerManager =
RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
this.regionNormalizerManager.start();

this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
this.splitOrMergeTracker.start();
Expand Down Expand Up @@ -875,10 +851,10 @@ protected AssignmentManager createAssignmentManager(MasterServices master) {
* </ol>
* </li>
* <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
* <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
* procedure executor, etc. Notice that the balancer must be created first as assignment manager
* may use it when assigning regions.</li>
* <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
* <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
* the procedure executor, etc. Notice that the balancer must be created first as assignment
* manager may use it when assigning regions.</li>
* <li>Wait for meta to be initialized if necessary, start table state manager.</li>
* <li>Wait for enough region servers to check-in</li>
* <li>Let assignment manager load data from meta and construct region states</li>
* <li>Start all other things such as chore services, etc</li>
Expand Down Expand Up @@ -1091,8 +1067,7 @@ private void finishActiveMasterInitialization(MonitoredTask status)
getChoreService().scheduleChore(clusterStatusChore);
this.balancerChore = new BalancerChore(this);
getChoreService().scheduleChore(balancerChore);
this.normalizerChore = new RegionNormalizerChore(this);
getChoreService().scheduleChore(normalizerChore);
getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
this.catalogJanitorChore = new CatalogJanitor(this);
getChoreService().scheduleChore(catalogJanitorChore);
this.hbckChore = new HbckChore(this);
Expand Down Expand Up @@ -1508,6 +1483,9 @@ protected void stopServiceThreads() {
// example.
stopProcedureExecutor();

if (regionNormalizerManager != null) {
regionNormalizerManager.stop();
}
if (this.quotaManager != null) {
this.quotaManager.stop();
}
Expand Down Expand Up @@ -1626,7 +1604,7 @@ private void stopChores() {
choreService.cancelChore(this.expiredMobFileCleanerChore);
choreService.cancelChore(this.mobCompactChore);
choreService.cancelChore(this.balancerChore);
choreService.cancelChore(this.normalizerChore);
choreService.cancelChore(getRegionNormalizerManager().getRegionNormalizerChore());
choreService.cancelChore(this.clusterStatusChore);
choreService.cancelChore(this.catalogJanitorChore);
choreService.cancelChore(this.clusterStatusPublisherChore);
Expand Down Expand Up @@ -1726,7 +1704,9 @@ public boolean balance() throws IOException {
* @param action the name of the action under consideration, for logging.
* @return {@code true} when the caller should exit early, {@code false} otherwise.
*/
private boolean skipRegionManagementAction(final String action) {
@Override
public boolean skipRegionManagementAction(final String action) {
// Note: this method could be `default` on MasterServices if but for logging.
if (!isInitialized()) {
LOG.debug("Master has not been initialized, don't run {}.", action);
return true;
Expand Down Expand Up @@ -1871,24 +1851,16 @@ public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans)
}

@Override
public RegionNormalizer getRegionNormalizer() {
return this.normalizer;
public RegionNormalizerManager getRegionNormalizerManager() {
return regionNormalizerManager;
}

public boolean normalizeRegions() throws IOException {
return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
}

/**
* Perform normalization of cluster.
*
* @return true if an existing normalization was already in progress, or if a new normalization
* was performed successfully; false otherwise (specifically, if HMaster finished initializing
* or normalization is globally disabled).
*/
public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
final long startTime = EnvironmentEdgeManager.currentTime();
if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
@Override
public boolean normalizeRegions(
final NormalizeTableFilterParams ntfp,
final boolean isHighPriority
) throws IOException {
if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
LOG.debug("Region normalization is disabled, don't run region normalizer.");
return false;
}
Expand All @@ -1899,70 +1871,17 @@ public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IO
return false;
}

if (!normalizationInProgressLock.tryLock()) {
// Don't run the normalizer concurrently
LOG.info("Normalization already in progress. Skipping request.");
return true;
}

int affectedTables = 0;
try {
final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
.stream()
.map(TableDescriptor::getTableName)
.collect(Collectors.toSet());
final Set<TableName> allEnabledTables =
tableStateManager.getTablesInStates(TableState.State.ENABLED);
final List<TableName> targetTables =
new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
Collections.shuffle(targetTables);

final List<Long> submittedPlanProcIds = new ArrayList<>();
for (TableName table : targetTables) {
if (table.isSystemTable()) {
continue;
}
final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug(
"Skipping table {} because normalization is disabled in its table properties.", table);
continue;
}

// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}

final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", table);
continue;
}

affectedTables++;
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
// submit task , so there's no artificial rate-
// limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) {
long procId = plan.submit(this);
submittedPlanProcIds.add(procId);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
}
}
final long endTime = EnvironmentEdgeManager.currentTime();
LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
} finally {
normalizationInProgressLock.unlock();
}
return true;
final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
.stream()
.map(TableDescriptor::getTableName)
.collect(Collectors.toSet());
final Set<TableName> allEnabledTables =
tableStateManager.getTablesInStates(TableState.State.ENABLED);
final List<TableName> targetTables =
new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
Collections.shuffle(targetTables);
return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
}

/**
Expand Down Expand Up @@ -2969,20 +2888,6 @@ public double getAverageLoad() {
return regionStates.getAverageLoad();
}

/*
* @return the count of region split plans executed
*/
public long getSplitPlanCount() {
return splitPlanCount;
}

/*
* @return the count of region merge plans executed
*/
public long getMergePlanCount() {
return mergePlanCount;
}

@Override
public boolean registerService(Service instance) {
/*
Expand Down Expand Up @@ -3487,8 +3392,7 @@ public boolean isBalancerOn() {
*/
public boolean isNormalizerOn() {
return !isInMaintenanceMode()
&& regionNormalizerTracker != null
&& regionNormalizerTracker.isNormalizerOn();
&& getRegionNormalizerManager().isNormalizerOn();
}

/**
Expand All @@ -3514,13 +3418,6 @@ public String getLoadBalancerClassName() {
.getDefaultLoadBalancerClass().getName());
}

/**
* @return RegionNormalizerTracker instance
*/
public RegionNormalizerTracker getRegionNormalizerTracker() {
return regionNormalizerTracker;
}

public SplitOrMergeTracker getSplitOrMergeTracker() {
return splitOrMergeTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1927,9 +1927,7 @@ public SetSplitOrMergeEnabledResponse setSplitOrMergeEnabled(RpcController contr
master.cpHost.postSetSplitOrMergeEnabled(newValue, switchType);
}
}
} catch (IOException e) {
throw new ServiceException(e);
} catch (KeeperException e) {
} catch (IOException | KeeperException e) {
throw new ServiceException(e);
}
return response.build();
Expand All @@ -1954,7 +1952,8 @@ public NormalizeResponse normalize(RpcController controller,
.namespace(request.hasNamespace() ? request.getNamespace() : null)
.build();
return NormalizeResponse.newBuilder()
.setNormalizerRan(master.normalizeRegions(ntfp))
// all API requests are considered priority requests.
.setNormalizerRan(master.normalizeRegions(ntfp, true))
.build();
} catch (IOException ex) {
throw new ServiceException(ex);
Expand All @@ -1967,20 +1966,27 @@ public SetNormalizerRunningResponse setNormalizerRunning(RpcController controlle
rpcPreCheck("setNormalizerRunning");

// Sets normalizer on/off flag in ZK.
boolean prevValue = master.getRegionNormalizerTracker().isNormalizerOn();
boolean newValue = request.getOn();
try {
master.getRegionNormalizerTracker().setNormalizerOn(newValue);
} catch (KeeperException ke) {
LOG.warn("Error flipping normalizer switch", ke);
}
// TODO: this method is totally broken in terms of atomicity of actions and values read.
// 1. The contract has this RPC returning the previous value. There isn't a ZKUtil method
// that lets us retrieve the previous value as part of setting a new value, so we simply
// perform a read before issuing the update. Thus we have a data race opportunity, between
// when the `prevValue` is read and whatever is actually overwritten.
// 2. Down in `setNormalizerOn`, the call to `createAndWatch` inside of the catch clause can
// itself fail in the event that the znode already exists. Thus, another data race, between
// when the initial `setData` call is notified of the absence of the target znode and the
// subsequent `createAndWatch`, with another client creating said node.
// That said, there's supposed to be only one active master and thus there's supposed to be
// only one process with the authority to modify the value.
final boolean prevValue = master.getRegionNormalizerManager().isNormalizerOn();
final boolean newValue = request.getOn();
master.getRegionNormalizerManager().setNormalizerOn(newValue);
LOG.info("{} set normalizerSwitch={}", master.getClientIdAuditPrefix(), newValue);
return SetNormalizerRunningResponse.newBuilder().setPrevNormalizerValue(prevValue).build();
}

@Override
public IsNormalizerEnabledResponse isNormalizerEnabled(RpcController controller,
IsNormalizerEnabledRequest request) throws ServiceException {
IsNormalizerEnabledRequest request) {
IsNormalizerEnabledResponse.Builder response = IsNormalizerEnabledResponse.newBuilder();
response.setEnabled(master.isNormalizerOn());
return response.build();
Expand Down
Loading