Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24588 : Submit task for NormalizationPlan #1933

Merged
merged 6 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, b
return;
}

MergeTableRegionsRequest request = null;
final MergeTableRegionsRequest request;
try {
request = RequestConverter.buildMergeTableRegionsRequest(encodedNameOfRegionsToMerge,
forcible, ng.getNonceGroup(), ng.newNonce());
Expand All @@ -1304,8 +1304,8 @@ public CompletableFuture<Void> mergeRegions(List<byte[]> nameOfRegionsToMerge, b
}

addListener(
this.<MergeTableRegionsRequest, MergeTableRegionsResponse> procedureCall(tableName, request,
(s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(),
this.procedureCall(tableName, request,
MasterService.Interface::mergeTableRegions, MergeTableRegionsResponse::getProcId,
ndimiduk marked this conversation as resolved.
Show resolved Hide resolved
new MergeTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
Expand Down Expand Up @@ -1480,7 +1480,7 @@ public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint)
private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
CompletableFuture<Void> future = new CompletableFuture<>();
TableName tableName = hri.getTable();
SplitTableRegionRequest request = null;
final SplitTableRegionRequest request;
try {
request = RequestConverter.buildSplitTableRegionRequest(hri, splitPoint, ng.getNonceGroup(),
ng.newNonce());
Expand All @@ -1490,8 +1490,8 @@ private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
}

addListener(
this.<SplitTableRegionRequest, SplitTableRegionResponse> procedureCall(tableName,
request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(),
this.procedureCall(tableName,
request, MasterService.Interface::splitRegion, SplitTableRegionResponse::getProcId,
new SplitTableRegionProcedureBiConsumer(tableName)),
(ret, err2) -> {
if (err2 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ public void run() {
// Cached clusterId on stand by masters to serve clusterID requests from clients.
private final CachedClusterId cachedClusterId;

// Split/Merge Normalization plan executes asynchronously and the caller blocks on
// waiting max 5 sec for single plan to complete with success/failure.
private static final int NORMALIZATION_PLAN_WAIT_TIMEOUT = 5;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i like to include a unit in these types of constants. i.e., NORMALIZATION_PLAN_WAIT_TIMEOUT_SEC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not convinced we should do this, per my other comment :)


public static class RedirectServlet extends HttpServlet {
private static final long serialVersionUID = 2894774810058302473L;
private final int regionServerInfoPort;
Expand Down Expand Up @@ -1935,6 +1939,8 @@ public boolean normalizeRegions() throws IOException {
Collections.shuffle(allEnabledTables);

try (final Admin admin = asyncClusterConnection.toConnection().getAdmin()) {
int failedNormalizationPlans = 0;
final List<Future<?>> submittedPlanList = new ArrayList<>();
for (TableName table : allEnabledTables) {
if (table.isSystemTable()) {
continue;
Expand All @@ -1957,17 +1963,30 @@ public boolean normalizeRegions() throws IOException {
continue;
}

// as of this writing, `plan.execute()` is non-blocking, so there's no artificial rate-
// limiting of merge requests due to this serial loop.
// as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
// submit task , so there's no artificial rate-
// limiting of merge/split requests due to this serial loop.
for (NormalizationPlan plan : plans) {
plan.execute(admin);
Future<Void> future = plan.submit(admin);
submittedPlanList.add(future);
if (plan.getType() == PlanType.SPLIT) {
splitPlanCount++;
} else if (plan.getType() == PlanType.MERGE) {
mergePlanCount++;
}
}
}
for (Future<?> submittedPlan : submittedPlanList) {
try {
submittedPlan.get(NORMALIZATION_PLAN_WAIT_TIMEOUT, TimeUnit.SECONDS);
} catch (Exception e) {
failedNormalizationPlans++;
LOG.error("Submitted normalization plan failed with error: ", e);
}
}
int totalNormalizationPlans = submittedPlanList.size();
LOG.info("Normalizer run was able to successfully execute {}/{} merge or split plans",
totalNormalizationPlans - failedNormalizationPlans, totalNormalizationPlans);
}
} finally {
normalizationInProgressLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* Plan which signifies that no normalization is required,
Expand All @@ -44,7 +45,8 @@ public static EmptyNormalizationPlan getInstance(){
* No-op for empty plan.
*/
@Override
public void execute(Admin admin) {
public Future<Void> submit(Admin admin) {
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.normalizer;

import java.io.IOException;
import java.util.concurrent.Future;

import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -41,6 +42,21 @@ public MergeNormalizationPlan(RegionInfo firstRegion, RegionInfo secondRegion) {
this.secondRegion = secondRegion;
}

/**
* {@inheritDoc}
*/
@Override
public Future<Void> submit(Admin admin) throws IOException {
LOG.info("Executing merging normalization plan: " + this);
byte[][] regionsToMerge = new byte[2][];
regionsToMerge[0] = firstRegion.getEncodedNameAsBytes();
regionsToMerge[1] = secondRegion.getEncodedNameAsBytes();
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
return admin.mergeRegionsAsync(regionsToMerge, false);
}

@Override
public PlanType getType() {
return PlanType.MERGE;
Expand All @@ -62,20 +78,4 @@ public String toString() {
'}';
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing merging normalization plan: " + this);
try {
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
admin.mergeRegionsAsync(firstRegion.getEncodedNameAsBytes(),
secondRegion.getEncodedNameAsBytes(), false);
} catch (IOException ex) {
LOG.error("Error during region merge: ", ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import java.io.IOException;
import java.util.concurrent.Future;

/**
* Interface for normalization plan.
Expand All @@ -33,10 +35,13 @@ enum PlanType {
}

/**
* Executes normalization plan on cluster (does actual splitting/merging work).
* Submits normalization plan on cluster (does actual splitting/merging work) and
* returns Future reference for caller.
* @param admin instance of Admin
* @return Future reference to submitted task
* @throws IOException If plan submission to Admin fails
*/
void execute(Admin admin);
Future<Void> submit(Admin admin) throws IOException;

/**
* @return the type of this plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.hadoop.hbase.master.normalizer;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Future;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +42,15 @@ public SplitNormalizationPlan(RegionInfo regionInfo, byte[] splitPoint) {
this.splitPoint = splitPoint;
}

/**
* {@inheritDoc}
*/
@Override
public Future<Void> submit(final Admin admin) throws IOException {
LOG.info("Executing splitting normalization plan: " + this);
return admin.splitRegionAsync(regionInfo.getRegionName());
}

@Override
public PlanType getType() {
return PlanType.SPLIT;
Expand Down Expand Up @@ -69,16 +80,4 @@ public String toString() {
'}';
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Admin admin) {
LOG.info("Executing splitting normalization plan: " + this);
try {
admin.splitRegionAsync(regionInfo.getRegionName()).get();
} catch (Exception ex) {
LOG.error("Error during region split: ", ex);
}
}
}