Skip to content

Commit

Permalink
HBASE-24588 : Submit task for NormalizationPlan (#1933) (#1985)
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
virajjasani committed Jun 27, 2020
1 parent 6b6acb5 commit 66489bb
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,9 @@ default boolean balancer(boolean force) throws IOException {

/**
* Invoke region normalizer. Can NOT run for various reasons. Check logs.
* This is a non-blocking invocation to region normalizer. If return value is true, it means
* the request was submitted successfully. We need to check logs for the details of which regions
* were split/merged.
*
* @return <code>true</code> if region normalizer ran, <code>false</code> otherwise.
* @throws IOException if a remote or network exception occurs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1284,7 +1284,7 @@ public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, b
return;
}

MergeTableRegionsRequest request = null;
final MergeTableRegionsRequest request;
try {
request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
forcible, ng.getNonceGroup(), ng.newNonce());
Expand All @@ -1294,8 +1294,8 @@ public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, b
}

addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
this.procedureCall(tableName, request,
MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
new MergeTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
Expand Down Expand Up @@ -1470,7 +1470,7 @@ public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint)
private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
CompletableFuture<Void> future = new CompletableFuture<>();
TableName tableName = hri.getTable();
SplitTableRegionRequest request = null;
final SplitTableRegionRequest request;
try {
request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
ng.newNonce());
Expand All @@ -1480,8 +1480,8 @@ private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
}

addListener(
this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
this.procedureCall(tableName,
request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
new SplitTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1904,44 +1904,50 @@ public boolean normalizeRegions() throws IOException {
}

try {
final List<TableName> allEnabledTables = new ArrayList<>(
tableStateManager.getTablesInStates(TableState.State.ENABLED));
final List<TableName> allEnabledTables =
new ArrayList<>(tableStateManager.getTablesInStates(TableState.State.ENABLED));
Collections.shuffle(allEnabledTables);

try (final Admin admin = clusterConnection.getAdmin()) {
for (TableName table : allEnabledTables) {
if (table.isSystemTable()) {
continue;
}
final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug("Skipping table {} because normalization is disabled in its"
+ " table properties.", table);
continue;
}
final List<Long> submittedPlanProcIds = new ArrayList<>();
for (TableName table : allEnabledTables) {
if (table.isSystemTable()) {
continue;
}
final TableDescriptor tblDesc = getTableDescriptors().get(table);
if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
LOG.debug(
"Skipping table {} because normalization is disabled in its table properties.", table);
continue;
}

// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}
// make one last check that the cluster isn't shutting down before proceeding.
if (skipRegionManagementAction("region normalizer")) {
return false;
}

final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", table);
continue;
}
final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
if (CollectionUtils.isEmpty(plans)) {
LOG.debug("No normalization required for table {}.", table);
continue;
}

// as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
// limiting of merge requests due to this serial loop.
for (NormalizationPlan plan : plans) {
plan.execute(admin);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
// submit task , so there's no artificial rate-
// limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) {
long procId = plan.submit(this);
submittedPlanProcIds.add(procId);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
}
int totalPlansSubmitted = submittedPlanProcIds.size();
if (totalPlansSubmitted > 0 && LOG.isDebugEnabled()) {
LOG.debug("Normalizer plans submitted. Total plans count: {} , procID list: {}",
totalPlansSubmitted, submittedPlanProcIds);
}
}
} finally {
normalizationInProgressLock.unlock();
Expand Down Expand Up @@ -1983,8 +1989,8 @@ public long mergeRegions(
" failed because merge switch is off");
}

final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(r -> r.getEncodedName()).
collect(Collectors.joining(", "));
final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
.collect(Collectors.joining(", "));
return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
@Override
protected void run() throws IOException {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

import java.io.IOException;

import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,6 +42,20 @@ public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
this.secondRegion = secondRegion;
}

/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
LOG.info("Executing merging normalization plan: " + this);
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
return masterServices
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
HConstants.NO_NONCE);
}

@Override
public PlanType getType() {
return PlanType.MERGE;
Expand All @@ -62,20 +77,4 @@ public String toString() {
'}';
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing merging normalization plan: " + this);
try {
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
admin.mergeRegionsAsync(firstRegion.getEncodedNameAsBytes(),
secondRegion.getEncodedNameAsBytes(), false);
} catch (IOException ex) {
LOG.error("Error during region merge: ", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import java.io.IOException;

/**
* Interface for normalization plan.
Expand All @@ -33,10 +34,13 @@ enum PlanType {
}

/**
* Executes normalization plan on cluster (does actual splitting/merging work).
* @param admin instance of Admin
* Submits normalization plan on cluster (does actual splitting/merging work) and
* returns proc Id to caller.
* @param masterServices instance of {@link MasterServices}
* @return Proc Id for the submitted task
* @throws IOException If plan submission to Admin fails
*/
void execute(Admin admin);
long submit(MasterServices masterServices) throws IOException;

/**
* @return the type of this plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +42,14 @@ public SplitNormalizationPlan(RegionInfo regionInfo, byte[] splitPoint) {
this.splitPoint = splitPoint;
}

/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
return masterServices.splitRegion(regionInfo, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

@Override
public PlanType getType() {
return PlanType.SPLIT;
Expand Down Expand Up @@ -69,16 +79,4 @@ public String toString() {
'}';
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing splitting normalization plan: " + this);
try {
admin.splitRegionAsync(regionInfo.getRegionName()).get();
} catch (Exception ex) {
LOG.error("Error during region split: ", ex);
}
}
}
Loading

0 comments on commit 66489bb

Please sign in to comment.