Skip to content

Commit

Permalink
HBASE-28199 Phase I: Suspend TRSP and SCP when updating meta
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Nov 17, 2023
1 parent 9e74cc0 commit 518ce00
Show file tree
Hide file tree
Showing 19 changed files with 851 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1695,9 +1695,6 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
}
}

// Add the procedure to the stack
procStack.addRollbackStep(procedure);

// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (
Expand All @@ -1715,7 +1712,11 @@ private void execProcedure(RootProcedureState<TEnvironment> procStack,
// Commit the transaction even if a suspend (state may have changed). Note this append
// can take a bunch of time to complete.
if (procedure.needPersistence()) {
updateStoreOnExec(procStack, procedure, subprocs);
// Add the procedure to the stack
synchronized (procStack) {
procStack.addRollbackStep(procedure);
updateStoreOnExec(procStack, procedure, subprocs);
}
}

// if the store is not running we are aborting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
Expand Down Expand Up @@ -1989,71 +1991,78 @@ public RegionInfo getRegionInfo(final String encodedRegionName) {
// Should only be called in TransitRegionStateProcedure(and related procedures), as the locking
// and pre-assumptions are very tricky.
// ============================================================================================
private void transitStateAndUpdate(RegionStateNode regionNode, RegionState.State newState,
RegionState.State... expectedStates) throws IOException {
private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode,
RegionState.State newState, RegionState.State... expectedStates) {
RegionState.State state = regionNode.getState();
regionNode.transitionState(newState, expectedStates);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
regionNode.transitionState(newState, expectedStates);
} catch (UnexpectedStateException e) {
return FutureUtils.failedFuture(e);
}
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e != null) {
// revert
regionNode.setState(state);
}
}
});
return future;
}

// should be called within the synchronized block of RegionStateNode
void regionOpening(RegionStateNode regionNode) throws IOException {
CompletableFuture<Void> regionOpening(RegionStateNode regionNode) {
// As in SCP, for performance reason, there is no TRSP attached with this region, we will not
// update the region state, which means that the region could be in any state when we want to
// assign it after a RS crash. So here we do not pass the expectedStates parameter.
transitStateAndUpdate(regionNode, State.OPENING);
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
return transitStateAndUpdate(regionNode, State.OPENING).thenAccept(r -> {
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// should be called under the RegionStateNode lock
// The parameter 'giveUp' means whether we will try to open the region again, if it is true, then
// we will persist the FAILED_OPEN state into hbase:meta.
void regionFailedOpen(RegionStateNode regionNode, boolean giveUp) throws IOException {
CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giveUp) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
if (giveUp) {
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
if (!giveUp) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
return CompletableFuture.completedFuture(null);
}
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
regionNode.setState(State.FAILED_OPEN);
regionNode.setRegionLocation(null);
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
});
return future;
}

// should be called under the RegionStateNode lock
void regionClosing(RegionStateNode regionNode) throws IOException {
transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING);

RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
CompletableFuture<Void> regionClosing(RegionStateNode regionNode) {
return transitStateAndUpdate(regionNode, State.CLOSING, STATES_EXPECTED_ON_CLOSING)
.thenAccept(r -> {
RegionInfo hri = regionNode.getRegionInfo();
// Set meta has not initialized early. so people trying to create/edit tables will wait
if (isMetaRegion(hri)) {
setMetaAssigned(hri, false);
}
regionStates.addRegionToServer(regionNode);
// update the operation count metrics
metrics.incrementOperationCounter();
});
}

// for open and close, they will first be persist to the procedure store in
Expand All @@ -2062,15 +2071,17 @@ void regionClosing(RegionStateNode regionNode) throws IOException {
// RegionRemoteProcedureBase is woken up, we will persist the RegionStateNode to hbase:meta.

// should be called under the RegionStateNode lock
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionOpenedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
regionNode.transitionState(State.OPEN, STATES_EXPECTED_ON_OPEN);
RegionInfo regionInfo = regionNode.getRegionInfo();
regionStates.addRegionToServer(regionNode);
regionStates.removeFromFailedOpen(regionInfo);
}

// should be called under the RegionStateNode lock
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOException {
void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode)
throws UnexpectedStateException {
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.CLOSED, STATES_EXPECTED_ON_CLOSED);
regionNode.setRegionLocation(null);
Expand All @@ -2080,40 +2091,41 @@ void regionClosedWithoutPersistingToMeta(RegionStateNode regionNode) throws IOEx
}
}

// should be called under the RegionStateNode lock
CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
return regionStateStore.updateRegionLocation(regionNode).thenAccept(r -> {
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
}

// should be called under the RegionStateNode lock
// for SCP
public void regionClosedAbnormally(RegionStateNode regionNode) throws IOException {
public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode) {
RegionState.State state = regionNode.getState();
ServerName regionLocation = regionNode.getRegionLocation();
regionNode.transitionState(State.ABNORMALLY_CLOSED);
regionNode.setState(State.ABNORMALLY_CLOSED);
regionNode.setRegionLocation(null);
boolean succ = false;
try {
regionStateStore.updateRegionLocation(regionNode);
succ = true;
} finally {
if (!succ) {
CompletableFuture<Void> future = regionStateStore.updateRegionLocation(regionNode);
FutureUtils.addListener(future, (r, e) -> {
if (e == null) {
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
} else {
// revert
regionNode.setState(state);
regionNode.setRegionLocation(regionLocation);
}
}
if (regionLocation != null) {
regionNode.setLastHost(regionLocation);
regionStates.removeRegionFromServer(regionLocation, regionNode);
}
}

void persistToMeta(RegionStateNode regionNode) throws IOException {
regionStateStore.updateRegionLocation(regionNode);
RegionInfo regionInfo = regionNode.getRegionInfo();
if (isMetaRegion(regionInfo) && regionNode.getState() == State.OPEN) {
// Usually we'd set a table ENABLED at this stage but hbase:meta is ALWAYs enabled, it
// can't be disabled -- so skip the RPC (besides... enabled is managed by TableStateManager
// which is backed by hbase:meta... Avoid setting ENABLED to avoid having to update state
// on table that contains state.
setMetaAssigned(regionInfo, true);
}
});
return future;
}

// ============================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -29,6 +30,7 @@
import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureFutureUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
Expand Down Expand Up @@ -73,6 +75,8 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur

private RetryCounter retryCounter;

private CompletableFuture<Void> future;

protected RegionRemoteProcedureBase() {
}

Expand Down Expand Up @@ -268,11 +272,21 @@ private void unattach(MasterProcedureEnv env) {
getParent(env).unattachRemoteProc(this);
}

private CompletableFuture<Void> getFuture() {
return future;
}

private void setFuture(CompletableFuture<Void> f) {
future = f;
}

@Override
protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
RegionStateNode regionNode = getRegionNode(env);
regionNode.lock();
if (future == null) {
regionNode.lock(this);
}
try {
switch (state) {
case REGION_REMOTE_PROCEDURE_DISPATCH: {
Expand All @@ -294,16 +308,29 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throw new ProcedureSuspendedException();
}
case REGION_REMOTE_PROCEDURE_REPORT_SUCCEED:
env.getAssignmentManager().persistToMeta(regionNode);
unattach(env);
if (
ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
() -> unattach(env))
) {
return null;
}
ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
env.getAssignmentManager().persistToMeta(regionNode), env, () -> unattach(env));
return null;
case REGION_REMOTE_PROCEDURE_DISPATCH_FAIL:
// the remote call is failed so we do not need to change the region state, just return.
unattach(env);
return null;
case REGION_REMOTE_PROCEDURE_SERVER_CRASH:
env.getAssignmentManager().regionClosedAbnormally(regionNode);
unattach(env);
if (
ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
() -> unattach(env))
) {
return null;
}
ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
env.getAssignmentManager().regionClosedAbnormally(regionNode), env,
() -> unattach(env));
return null;
default:
throw new IllegalStateException("Unknown state: " + state);
Expand All @@ -314,12 +341,11 @@ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
}
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
LOG.warn("Failed updating meta, suspend {}secs {}; {};", backoff / 1000, this, regionNode, e);
setTimeout(Math.toIntExact(backoff));
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
skipPersistence();
throw new ProcedureSuspendedException();
throw suspend(Math.toIntExact(backoff), true);
} finally {
regionNode.unlock();
if (future == null) {
regionNode.unlock(this);
}
}
}

Expand Down

0 comments on commit 518ce00

Please sign in to comment.