Skip to content

Commit

Permalink
HBASE-20846 Restore procedure locks when master restarts
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Jul 25, 2018
1 parent e44f506 commit f3f17fa
Show file tree
Hide file tree
Showing 35 changed files with 624 additions and 551 deletions.
Expand Up @@ -163,8 +163,8 @@ public Procedure poll(final long nanos) {
return null; return null;
} }
} }

final Procedure pollResult = dequeue(); final Procedure pollResult = dequeue();

pollCalls++; pollCalls++;
nullPollCalls += (pollResult == null) ? 1 : 0; nullPollCalls += (pollResult == null) ? 1 : 0;
return pollResult; return pollResult;
Expand Down
Expand Up @@ -24,8 +24,9 @@
* Vessel that carries a Procedure and a timeout. * Vessel that carries a Procedure and a timeout.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class DelayedProcedure extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> { class DelayedProcedure<TEnvironment>
public DelayedProcedure(Procedure<?> procedure) { extends DelayedUtil.DelayedContainerWithTimestamp<Procedure<TEnvironment>> {
public DelayedProcedure(Procedure<TEnvironment> procedure) {
super(procedure, procedure.getTimeoutTimestamp()); super(procedure, procedure.getTimeoutTimestamp());
} }
} }

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Expand Up @@ -202,6 +202,9 @@ public static ProcedureProtos.Procedure convertToProtoProcedure(final Procedure
builder.setNonce(proc.getNonceKey().getNonce()); builder.setNonce(proc.getNonceKey().getNonce());
} }


if (proc.hasLock()) {
builder.setLocked(true);
}
return builder.build(); return builder.build();
} }


Expand Down Expand Up @@ -255,6 +258,10 @@ public static Procedure convertToProcedure(final ProcedureProtos.Procedure proto
proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
} }


if (proto.getLocked()) {
proc.lockedWhenLoading();
}

ProcedureStateSerializer serializer = null; ProcedureStateSerializer serializer = null;


if (proto.getStateMessageCount() > 0) { if (proto.getStateMessageCount() > 0) {
Expand Down
Expand Up @@ -22,11 +22,9 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;

import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;


/** /**
Expand All @@ -42,17 +40,16 @@
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
class RootProcedureState { class RootProcedureState<TEnvironment> {
private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);


private enum State { private enum State {
RUNNING, // The Procedure is running or ready to run RUNNING, // The Procedure is running or ready to run
FAILED, // The Procedure failed, waiting for the rollback executing FAILED, // The Procedure failed, waiting for the rollback executing
ROLLINGBACK, // The Procedure failed and the execution was rolledback ROLLINGBACK, // The Procedure failed and the execution was rolledback
} }


private Set<Procedure> subprocs = null; private Set<Procedure<TEnvironment>> subprocs = null;
private ArrayList<Procedure> subprocStack = null; private ArrayList<Procedure<TEnvironment>> subprocStack = null;
private State state = State.RUNNING; private State state = State.RUNNING;
private int running = 0; private int running = 0;


Expand Down Expand Up @@ -91,22 +88,19 @@ protected synchronized void unsetRollback() {
} }


protected synchronized long[] getSubprocedureIds() { protected synchronized long[] getSubprocedureIds() {
if (subprocs == null) return null; if (subprocs == null) {
int index = 0; return null;
final long[] subIds = new long[subprocs.size()];
for (Procedure proc: subprocs) {
subIds[index++] = proc.getProcId();
} }
return subIds; return subprocs.stream().mapToLong(Procedure::getProcId).toArray();
} }


protected synchronized List<Procedure> getSubproceduresStack() { protected synchronized List<Procedure<TEnvironment>> getSubproceduresStack() {
return subprocStack; return subprocStack;
} }


protected synchronized RemoteProcedureException getException() { protected synchronized RemoteProcedureException getException() {
if (subprocStack != null) { if (subprocStack != null) {
for (Procedure proc: subprocStack) { for (Procedure<TEnvironment> proc: subprocStack) {
if (proc.hasException()) { if (proc.hasException()) {
return proc.getException(); return proc.getException();
} }
Expand All @@ -118,8 +112,10 @@ protected synchronized RemoteProcedureException getException() {
/** /**
* Called by the ProcedureExecutor to mark the procedure step as running. * Called by the ProcedureExecutor to mark the procedure step as running.
*/ */
protected synchronized boolean acquire(final Procedure proc) { protected synchronized boolean acquire(Procedure<TEnvironment> proc) {
if (state != State.RUNNING) return false; if (state != State.RUNNING) {
return false;
}


running++; running++;
return true; return true;
Expand All @@ -128,7 +124,7 @@ protected synchronized boolean acquire(final Procedure proc) {
/** /**
* Called by the ProcedureExecutor to mark the procedure step as finished. * Called by the ProcedureExecutor to mark the procedure step as finished.
*/ */
protected synchronized void release(final Procedure proc) { protected synchronized void release(Procedure<TEnvironment> proc) {
running--; running--;
} }


Expand All @@ -142,7 +138,7 @@ protected synchronized void abort() {
* Called by the ProcedureExecutor after the procedure step is completed, * Called by the ProcedureExecutor after the procedure step is completed,
* to add the step to the rollback list (or procedure stack) * to add the step to the rollback list (or procedure stack)
*/ */
protected synchronized void addRollbackStep(final Procedure proc) { protected synchronized void addRollbackStep(Procedure<TEnvironment> proc) {
if (proc.isFailed()) { if (proc.isFailed()) {
state = State.FAILED; state = State.FAILED;
} }
Expand All @@ -153,8 +149,10 @@ protected synchronized void addRollbackStep(final Procedure proc) {
subprocStack.add(proc); subprocStack.add(proc);
} }


protected synchronized void addSubProcedure(final Procedure proc) { protected synchronized void addSubProcedure(Procedure<TEnvironment> proc) {
if (!proc.hasParent()) return; if (!proc.hasParent()) {
return;
}
if (subprocs == null) { if (subprocs == null) {
subprocs = new HashSet<>(); subprocs = new HashSet<>();
} }
Expand All @@ -168,7 +166,7 @@ protected synchronized void addSubProcedure(final Procedure proc) {
* to the store only the Procedure we executed, and nothing else. * to the store only the Procedure we executed, and nothing else.
* on load we recreate the full stack by aggregating each procedure stack-positions. * on load we recreate the full stack by aggregating each procedure stack-positions.
*/ */
protected synchronized void loadStack(final Procedure proc) { protected synchronized void loadStack(Procedure<TEnvironment> proc) {
addSubProcedure(proc); addSubProcedure(proc);
int[] stackIndexes = proc.getStackIndexes(); int[] stackIndexes = proc.getStackIndexes();
if (stackIndexes != null) { if (stackIndexes != null) {
Expand Down Expand Up @@ -196,7 +194,7 @@ protected synchronized void loadStack(final Procedure proc) {
*/ */
protected synchronized boolean isValid() { protected synchronized boolean isValid() {
if (subprocStack != null) { if (subprocStack != null) {
for (Procedure proc: subprocStack) { for (Procedure<TEnvironment> proc : subprocStack) {
if (proc == null) { if (proc == null) {
return false; return false;
} }
Expand Down
Expand Up @@ -31,15 +31,15 @@
* @see InlineChore * @see InlineChore
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class TimeoutExecutorThread extends StoppableThread { class TimeoutExecutorThread<TEnvironment> extends StoppableThread {


private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class); private static final Logger LOG = LoggerFactory.getLogger(TimeoutExecutorThread.class);


private final ProcedureExecutor<?> executor; private final ProcedureExecutor<TEnvironment> executor;


private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();


public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup group) { public TimeoutExecutorThread(ProcedureExecutor<TEnvironment> executor, ThreadGroup group) {
super(group, "ProcExecTimeout"); super(group, "ProcExecTimeout");
setDaemon(true); setDaemon(true);
this.executor = executor; this.executor = executor;
Expand All @@ -65,7 +65,7 @@ public void run() {
if (task instanceof InlineChore) { if (task instanceof InlineChore) {
execInlineChore((InlineChore) task); execInlineChore((InlineChore) task);
} else if (task instanceof DelayedProcedure) { } else if (task instanceof DelayedProcedure) {
execDelayedProcedure((DelayedProcedure) task); execDelayedProcedure((DelayedProcedure<TEnvironment>) task);
} else { } else {
LOG.error("CODE-BUG unknown timeout task type {}", task); LOG.error("CODE-BUG unknown timeout task type {}", task);
} }
Expand All @@ -77,29 +77,29 @@ public void add(InlineChore chore) {
queue.add(chore); queue.add(chore);
} }


public void add(Procedure<?> procedure) { public void add(Procedure<TEnvironment> procedure) {
assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(), LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
procedure.getTimeoutTimestamp()); procedure.getTimeoutTimestamp());
queue.add(new DelayedProcedure(procedure)); queue.add(new DelayedProcedure<>(procedure));
} }


public boolean remove(Procedure<?> procedure) { public boolean remove(Procedure<TEnvironment> procedure) {
return queue.remove(new DelayedProcedure(procedure)); return queue.remove(new DelayedProcedure<>(procedure));
} }


private void execInlineChore(InlineChore chore) { private void execInlineChore(InlineChore chore) {
chore.run(); chore.run();
add(chore); add(chore);
} }


private void execDelayedProcedure(DelayedProcedure delayed) { private void execDelayedProcedure(DelayedProcedure<TEnvironment> delayed) {
// TODO: treat this as a normal procedure, add it to the scheduler and // TODO: treat this as a normal procedure, add it to the scheduler and
// let one of the workers handle it. // let one of the workers handle it.
// Today we consider ProcedureInMemoryChore as InlineChores // Today we consider ProcedureInMemoryChore as InlineChores
Procedure<?> procedure = delayed.getObject(); Procedure<TEnvironment> procedure = delayed.getObject();
if (procedure instanceof ProcedureInMemoryChore) { if (procedure instanceof ProcedureInMemoryChore) {
executeInMemoryChore((ProcedureInMemoryChore) procedure); executeInMemoryChore((ProcedureInMemoryChore<TEnvironment>) procedure);
// if the procedure is in a waiting state again, put it back in the queue // if the procedure is in a waiting state again, put it back in the queue
procedure.updateTimestamp(); procedure.updateTimestamp();
if (procedure.isWaiting()) { if (procedure.isWaiting()) {
Expand All @@ -111,7 +111,7 @@ private void execDelayedProcedure(DelayedProcedure delayed) {
} }
} }


private void executeInMemoryChore(ProcedureInMemoryChore chore) { private void executeInMemoryChore(ProcedureInMemoryChore<TEnvironment> chore) {
if (!chore.isWaiting()) { if (!chore.isWaiting()) {
return; return;
} }
Expand All @@ -126,12 +126,12 @@ private void executeInMemoryChore(ProcedureInMemoryChore chore) {
} }
} }


private void executeTimedoutProcedure(Procedure proc) { private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
// The procedure received a timeout. if the procedure itself does not handle it, // The procedure received a timeout. if the procedure itself does not handle it,
// call abort() and add the procedure back in the queue for rollback. // call abort() and add the procedure back in the queue for rollback.
if (proc.setTimeoutFailure(executor.getEnvironment())) { if (proc.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(proc); long rootProcId = executor.getRootProcedureId(proc);
RootProcedureState procStack = executor.getProcStack(rootProcId); RootProcedureState<TEnvironment> procStack = executor.getProcStack(rootProcId);
procStack.abort(); procStack.abort();
executor.getStore().update(proc); executor.getStore().update(proc);
executor.getScheduler().addFront(proc); executor.getScheduler().addFront(proc);
Expand Down
Expand Up @@ -35,14 +35,20 @@
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value; import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;


@Category({MasterTests.class, LargeTests.class}) /**
* For now we do not guarantee this, we will restore the locks when restarting ProcedureExecutor so
* we should use lock to obtain the correct order. Ignored.
*/
@Ignore
@Category({ MasterTests.class, LargeTests.class })
public class TestProcedureReplayOrder { public class TestProcedureReplayOrder {


@ClassRule @ClassRule
Expand Down
Expand Up @@ -227,19 +227,13 @@ protected LockState acquireLock(final TestProcEnv env) {
protected void releaseLock(final TestProcEnv env) { protected void releaseLock(final TestProcEnv env) {
LOG.info("RELEASE LOCK " + this + " " + hasLock); LOG.info("RELEASE LOCK " + this + " " + hasLock);
lock.set(false); lock.set(false);
hasLock = false;
} }


@Override @Override
protected boolean holdLock(final TestProcEnv env) { protected boolean holdLock(final TestProcEnv env) {
return true; return true;
} }


@Override
protected boolean hasLock(final TestProcEnv env) {
return hasLock;
}

public ArrayList<Long> getTimestamps() { public ArrayList<Long> getTimestamps() {
return timestamps; return timestamps;
} }
Expand Down
3 changes: 3 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Procedure.proto
Expand Up @@ -63,6 +63,9 @@ message Procedure {
// Nonce to prevent same procedure submit by multiple times // Nonce to prevent same procedure submit by multiple times
optional uint64 nonce_group = 13 [default = 0]; optional uint64 nonce_group = 13 [default = 0];
optional uint64 nonce = 14 [default = 0]; optional uint64 nonce = 14 [default = 0];

// whether the procedure has held the lock
optional bool locked = 16 [default = false];
} }


/** /**
Expand Down
Expand Up @@ -81,8 +81,8 @@ public TableNamespaceManager getTableNamespaceManager() {
return this.tableNamespaceManager; return this.tableNamespaceManager;
} }


private long submitProcedure(final Procedure<?> procedure, final NonceKey nonceKey) private long submitProcedure(final Procedure<MasterProcedureEnv> procedure,
throws ServiceNotRunningException { final NonceKey nonceKey) throws ServiceNotRunningException {
checkIsRunning(); checkIsRunning();
ProcedureExecutor<MasterProcedureEnv> pe = this.masterServices.getMasterProcedureExecutor(); ProcedureExecutor<MasterProcedureEnv> pe = this.masterServices.getMasterProcedureExecutor();
return pe.submitProcedure(procedure, nonceKey); return pe.submitProcedure(procedure, nonceKey);
Expand Down
Expand Up @@ -923,7 +923,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc
InitMetaProcedure initMetaProc = null; InitMetaProcedure initMetaProc = null;
if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO) if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO)
.isOffline()) { .isOffline()) {
Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream() Optional<Procedure<MasterProcedureEnv>> optProc = procedureExecutor.getProcedures().stream()
.filter(p -> p instanceof InitMetaProcedure).findAny(); .filter(p -> p instanceof InitMetaProcedure).findAny();
if (optProc.isPresent()) { if (optProc.isPresent()) {
initMetaProc = (InitMetaProcedure) optProc.get(); initMetaProc = (InitMetaProcedure) optProc.get();
Expand Down Expand Up @@ -3202,7 +3202,8 @@ public List<Procedure<?>> getProcedures() throws IOException {
cpHost.preGetProcedures(); cpHost.preGetProcedures();
} }


final List<Procedure<?>> procList = this.procedureExecutor.getProcedures(); @SuppressWarnings({ "unchecked", "rawtypes" })
List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();


if (cpHost != null) { if (cpHost != null) {
cpHost.postGetProcedures(procList); cpHost.postGetProcedures(procList);
Expand Down Expand Up @@ -3717,7 +3718,7 @@ public ReplicationPeerManager getReplicationPeerManager() {
HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap = HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
new HashMap<>(peerList.size()); new HashMap<>(peerList.size());
peerList.stream() peerList.stream()
.forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList())); .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
for (ServerName serverName : serverNames) { for (ServerName serverName : serverNames) {
List<ReplicationLoadSource> replicationLoadSources = List<ReplicationLoadSource> replicationLoadSources =
getServerManager().getLoad(serverName).getReplicationLoadSourceList(); getServerManager().getLoad(serverName).getReplicationLoadSourceList();
Expand Down
Expand Up @@ -148,9 +148,4 @@ protected void deserializeStateData(ProcedureStateSerializer serializer)
serializer.deserialize(MasterProcedureProtos.GCRegionStateData.class); serializer.deserialize(MasterProcedureProtos.GCRegionStateData.class);
setRegion(ProtobufUtil.toRegionInfo(msg.getRegionInfo())); setRegion(ProtobufUtil.toRegionInfo(msg.getRegionInfo()));
} }

@Override
protected org.apache.hadoop.hbase.procedure2.Procedure.LockState acquireLock(MasterProcedureEnv env) {
return super.acquireLock(env);
}
} }

0 comments on commit f3f17fa

Please sign in to comment.