Skip to content

Commit

Permalink
HBASE-21465 Retry on reportRegionStateTransition can lead to unexpect…
Browse files Browse the repository at this point in the history
…ed errors
  • Loading branch information
Apache9 committed Nov 13, 2018
1 parent 55fa8f4 commit ffb003e
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 67 deletions.
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.assignment;

import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseIOException;
Expand All @@ -40,6 +43,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RegionStateTransitionStateData;
Expand Down Expand Up @@ -346,71 +350,129 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
return false; // 'false' means that this procedure handled the timeout
}

private void reportTransitionOpened(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, TransitionCode code, long openSeqNum) throws IOException {
switch (code) {
case OPENED:
if (openSeqNum < 0) {
throw new UnexpectedStateException("Received report unexpected " + code +
" transition openSeqNum=" + openSeqNum + ", " + regionNode);
}
if (openSeqNum <= regionNode.getOpenSeqNum()) {
if (openSeqNum != 0) {
LOG.warn("Skip update of openSeqNum for {} with {} because the currentSeqNum={}",
regionNode, openSeqNum, regionNode.getOpenSeqNum());
}
} else {
regionNode.setOpenSeqNum(openSeqNum);
}
env.getAssignmentManager().regionOpened(regionNode);
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are done
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
break;
case FAILED_OPEN:
// just wake up the procedure and see if we can retry
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
break;
default:
throw new UnexpectedStateException(
"Received report unexpected " + code + " transition openSeqNum=" + openSeqNum + ", " +
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN.");
private boolean isOpening(RegionStateNode regionNode, ServerName serverName,
TransitionCode code) {
if (!regionNode.isInState(State.OPENING)) {
LOG.warn("Received report {} transition from {}, pid={}, but the region {} is not in" +
" OPENING state, should be a retry, ignore", code, serverName, getProcId(), regionNode);
return false;
}
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_OPENED) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the TRSP is not in {} state," +
" should be a retry, ignore",
code, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_OPENED);
return false;
}
return true;
}

private void reportTransitionOpen(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, long openSeqNum) throws IOException {
if (!isOpening(regionNode, serverName, TransitionCode.OPENED)) {
return;
}
if (openSeqNum < 0) {
throw new UnexpectedStateException("Received report unexpected " + TransitionCode.OPENED +
" transition openSeqNum=" + openSeqNum + ", " + regionNode + ", proc=" + this);
}
if (openSeqNum <= regionNode.getOpenSeqNum()) {
// use the openSeqNum as a fence, if this is not a retry, then the openSeqNum should be
// greater than the existing one.
if (openSeqNum != 0) {
LOG.warn("Skip update of region state for {} with openSeqNum={}, pid={} because the" +
" currentSeqNum={}", regionNode, openSeqNum, getProcId(), regionNode.getOpenSeqNum());
return;
}
} else {
regionNode.setOpenSeqNum(openSeqNum);
}
env.getAssignmentManager().regionOpened(regionNode);
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_OPENED) {
// we are done
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}

private void reportTransitionFailedOpen(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName) {
if (!isOpening(regionNode, serverName, TransitionCode.FAILED_OPEN)) {
return;
}
// there is no openSeqNum for FAILED_OPEN, so we will check the target server instead
if (!regionNode.getRegionLocation().equals(serverName)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not on it," +
" should be a retry, ignore",
TransitionCode.FAILED_OPEN, serverName, getProcId(), regionNode);
return;
}
// just wake up the procedure and see if we can retry
// Notice that, even if we arrive here, this call could still be a retry, as we may retry
// opening on the same server again. And the assumption here is that, once the region state is
// OPENING, and the TRSP state is REGION_STATE_TRANSITION_CONFIRM_OPENED, the TRSP must have
// been suspended on the procedure event, so after the waking operation here, the TRSP will be
// executed and try to schedule new OpenRegionProcedure again. Once there is a successful open
// then we are done, so the TRSP will not be stuck.
// TODO: maybe we could send the procedure id of the OpenRegionProcedure to the region server
// and let the region server send it back when done, so it will be easy to detect whether this
// is a retry.
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}

// we do not need seqId for closing a region
private void reportTransitionClosed(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, TransitionCode code) throws IOException {
switch (code) {
case CLOSED:
env.getAssignmentManager().regionClosed(regionNode, true);
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are done
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
break;
default:
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
regionNode.toShortString() + ", " + this + ", expected CLOSED.");
ServerName serverName) throws IOException {
if (!regionNode.isInState(State.CLOSING)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not in" +
" CLOSING state, should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), regionNode);
return;
}
if (getCurrentState() != REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
LOG.warn(
"Received report {} transition from {}, pid={} but the proc is not in {}" +
" state, should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), REGION_STATE_TRANSITION_CONFIRM_CLOSED);
return;
}
if (!regionNode.getRegionLocation().equals(serverName)) {
LOG.warn(
"Received report {} transition from {}, pid={}, but the region {} is not on it," +
" should be a retry, ignore",
TransitionCode.CLOSED, serverName, getProcId(), regionNode);
return;
}
env.getAssignmentManager().regionClosed(regionNode, true);
if (lastState == RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED) {
// we are done
regionNode.unsetProcedure(this);
}
regionNode.getProcedureEvent().wake(env.getProcedureScheduler());
}

// Should be called with RegionStateNode locked
public void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode,
ServerName serverName, TransitionCode code, long seqId) throws IOException {
switch (getCurrentState()) {
case REGION_STATE_TRANSITION_CONFIRM_OPENED:
reportTransitionOpened(env, regionNode, serverName, code, seqId);
// It is possible that the previous reportRegionStateTransition call was succeeded at master
// side, but before returning the result to region server, the rpc connection was broken, or the
// master restarted. The region server will try calling reportRegionStateTransition again under
// this scenario, so here we need to check whether this is a retry.
switch (code) {
case OPENED:
reportTransitionOpen(env, regionNode, serverName, seqId);
break;
case REGION_STATE_TRANSITION_CONFIRM_CLOSED:
reportTransitionClosed(env, regionNode, serverName, code);
case FAILED_OPEN:
reportTransitionFailedOpen(env, regionNode, serverName);
break;
case CLOSED:
reportTransitionClosed(env, regionNode, serverName);
break;
default:
LOG.warn("{} received unexpected report transition call from {}, code={}, seqId={}", this,
serverName, code, seqId);
throw new UnexpectedStateException("Received report unexpected " + code + " transition, " +
regionNode.toShortString() + ", " + this + ", expected OPENED or FAILED_OPEN or CLOSED.");
}
}

Expand Down Expand Up @@ -478,9 +540,8 @@ protected RegionStateTransitionState getInitialState() {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
super.serializeStateData(serializer);
RegionStateTransitionStateData.Builder builder =
RegionStateTransitionStateData.newBuilder().setInitialState(initialState)
.setLastState(lastState).setForceNewPlan(forceNewPlan);
RegionStateTransitionStateData.Builder builder = RegionStateTransitionStateData.newBuilder()
.setInitialState(initialState).setLastState(lastState).setForceNewPlan(forceNewPlan);
if (assignCandidate != null) {
builder.setAssignCandidate(ProtobufUtil.toServerName(assignCandidate));
}
Expand Down
Expand Up @@ -261,12 +261,12 @@ protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) {

protected void sendTransitionReport(final ServerName serverName,
final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
final TransitionCode state) throws IOException {
final TransitionCode state, long seqId) throws IOException {
ReportRegionStateTransitionRequest.Builder req =
ReportRegionStateTransitionRequest.newBuilder();
req.setServer(ProtobufUtil.toServerName(serverName));
req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
.setTransitionCode(state).setOpenSeqNum(1).build());
.setTransitionCode(state).setOpenSeqNum(seqId).build());
am.reportRegionStateTransition(req.build());
}

Expand All @@ -286,15 +286,18 @@ protected class GoodRsExecutor extends NoopRsExecutor {
@Override
protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
throws IOException {
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
long previousOpenSeqNum =
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
previousOpenSeqNum + 2);
// Concurrency?
// Now update the state of our cluster in regionsToRegionServers.
SortedSet<byte[]> regions = regionsToRegionServers.get(server);
if (regions == null) {
regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
regionsToRegionServers.put(server, regions);
}
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
if (regions.contains(hri.getRegionName())) {
throw new UnsupportedOperationException(hri.getRegionNameAsString());
}
Expand All @@ -306,7 +309,7 @@ protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo op
protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
throws IOException {
RegionInfo hri = am.getRegionInfo(regionName);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
return CloseRegionResponse.newBuilder().setClosed(true).build();
}
}
Expand Down Expand Up @@ -497,18 +500,18 @@ public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProcedure
@Override
protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
throws IOException {
switch (rand.nextInt(6)) {
RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
long previousOpenSeqNum =
am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
switch (rand.nextInt(3)) {
case 0:
LOG.info("Return OPENED response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
previousOpenSeqNum + 2);
return OpenRegionResponse.RegionOpeningState.OPENED;
case 1:
LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
case 2:
LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
default:
// fall out
Expand All @@ -534,7 +537,7 @@ protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionNa
boolean closed = rand.nextBoolean();
if (closed) {
RegionInfo hri = am.getRegionInfo(regionName);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
}
resp.setClosed(closed);
return resp.build();
Expand Down

0 comments on commit ffb003e

Please sign in to comment.