Skip to content

Commit

Permalink
HBASE-13759 Procedure v2 - Improve procedure yielding
Browse files Browse the repository at this point in the history
  • Loading branch information
Matteo Bertozzi committed May 29, 2015
1 parent a016b23 commit d86f2fa
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 47 deletions.
Expand Up @@ -82,10 +82,13 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* The main code of the procedure. It must be idempotent since execute() * The main code of the procedure. It must be idempotent since execute()
* may be called multiple time in case of machine failure in the middle * may be called multiple time in case of machine failure in the middle
* of the execution. * of the execution.
* @param env the environment passed to the ProcedureExecutor
* @return a set of sub-procedures or null if there is nothing else to execute. * @return a set of sub-procedures or null if there is nothing else to execute.
* @throw ProcedureYieldException the procedure will be added back to the queue and retried later
* @throw InterruptedException the procedure will be added back to the queue and retried later
*/ */
protected abstract Procedure[] execute(TEnvironment env) protected abstract Procedure[] execute(TEnvironment env)
throws ProcedureYieldException; throws ProcedureYieldException, InterruptedException;


/** /**
* The code to undo what done by the execute() code. * The code to undo what done by the execute() code.
Expand All @@ -94,10 +97,12 @@ protected abstract Procedure[] execute(TEnvironment env)
* the execute() call. The implementation must be idempotent since rollback() * the execute() call. The implementation must be idempotent since rollback()
* may be called multiple time in case of machine failure in the middle * may be called multiple time in case of machine failure in the middle
* of the execution. * of the execution.
* @param env the environment passed to the ProcedureExecutor
* @throws IOException temporary failure, the rollback will retry later * @throws IOException temporary failure, the rollback will retry later
* @throw InterruptedException the procedure will be added back to the queue and retried later
*/ */
protected abstract void rollback(TEnvironment env) protected abstract void rollback(TEnvironment env)
throws IOException; throws IOException, InterruptedException;


/** /**
* The abort() call is asynchronous and each procedure must decide how to deal * The abort() call is asynchronous and each procedure must decide how to deal
Expand Down Expand Up @@ -169,12 +174,14 @@ protected void completionCleanup(final TEnvironment env) {
} }


/** /**
* By default, the executor will run procedures start to finish. Return true to make the executor * By default, the executor will try ro run procedures start to finish.
* yield between each flow step to give other procedures time to run their flow steps. * Return true to make the executor yield between each execution step to
* @return Return true if the executor should yield on completion of a flow state step. * give other procedures time to run their steps.
* Defaults to return false. * @param env the environment passed to the ProcedureExecutor
* @return Return true if the executor should yield on completion of an execution step.
* Defaults to return false.
*/ */
protected boolean isYieldAfterSuccessfulFlowStateStep() { protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
return false; return false;
} }


Expand Down Expand Up @@ -404,7 +411,7 @@ protected void setParentProcId(final long parentProcId) {
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
protected Procedure[] doExecute(final TEnvironment env) protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException { throws ProcedureYieldException, InterruptedException {
try { try {
updateTimestamp(); updateTimestamp();
return execute(env); return execute(env);
Expand All @@ -418,7 +425,8 @@ protected Procedure[] doExecute(final TEnvironment env)
* user-level code rollback(). * user-level code rollback().
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
protected void doRollback(final TEnvironment env) throws IOException { protected void doRollback(final TEnvironment env)
throws IOException, InterruptedException {
try { try {
updateTimestamp(); updateTimestamp();
rollback(env); rollback(env);
Expand Down
Expand Up @@ -732,6 +732,12 @@ private void execLoop(Procedure proc) {
procedureFinished(proc); procedureFinished(proc);
break; break;
} }

// if the procedure is kind enough to pass the slot to someone else, yield
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
runnables.yield(proc);
break;
}
} while (procStack.isFailed()); } while (procStack.isFailed());
} }


Expand Down Expand Up @@ -828,6 +834,11 @@ private boolean executeRollback(final long rootProcId, final RootProcedureState
} }


subprocStack.remove(stackTail); subprocStack.remove(stackTail);

// if the procedure is kind enough to pass the slot to someone else, yield
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
return false;
}
} }


// Finalize the procedure state // Finalize the procedure state
Expand All @@ -851,6 +862,9 @@ private boolean executeRollback(final Procedure proc) {
LOG.debug("rollback attempt failed for " + proc, e); LOG.debug("rollback attempt failed for " + proc, e);
} }
return false; return false;
} catch (InterruptedException e) {
handleInterruptedException(proc, e);
return false;
} catch (Throwable e) { } catch (Throwable e) {
// Catch NullPointerExceptions or similar errors... // Catch NullPointerExceptions or similar errors...
LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e); LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
Expand All @@ -859,9 +873,7 @@ private boolean executeRollback(final Procedure proc) {
// allows to kill the executor before something is stored to the wal. // allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery. // useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate()) { if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
if (LOG.isDebugEnabled()) { LOG.debug("TESTING: Kill before store update");
LOG.debug("TESTING: Kill before store update");
}
stop(); stop();
return false; return false;
} }
Expand All @@ -877,6 +889,7 @@ private boolean executeRollback(final Procedure proc) {
} else { } else {
store.update(proc); store.update(proc);
} }

return true; return true;
} }


Expand Down Expand Up @@ -912,10 +925,14 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
} }
} catch (ProcedureYieldException e) { } catch (ProcedureYieldException e) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Yield procedure: " + procedure); LOG.trace("Yield procedure: " + procedure + ": " + e.getMessage());
} }
runnables.yield(procedure); runnables.yield(procedure);
return; return;
} catch (InterruptedException e) {
handleInterruptedException(procedure, e);
runnables.yield(procedure);
return;
} catch (Throwable e) { } catch (Throwable e) {
// Catch NullPointerExceptions or similar errors... // Catch NullPointerExceptions or similar errors...
String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure; String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
Expand Down Expand Up @@ -974,9 +991,7 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
// allows to kill the executor before something is stored to the wal. // allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery. // useful to test the procedure recovery.
if (testing != null && testing.shouldKillBeforeStoreUpdate()) { if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
if (LOG.isDebugEnabled()) { LOG.debug("TESTING: Kill before store update");
LOG.debug("TESTING: Kill before store update");
}
stop(); stop();
return; return;
} }
Expand All @@ -999,6 +1014,11 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
return; return;
} }


// if the procedure is kind enough to pass the slot to someone else, yield
if (reExecute && procedure.isYieldAfterExecutionStep(getEnvironment())) {
return;
}

assert (reExecute && subprocs == null) || !reExecute; assert (reExecute && subprocs == null) || !reExecute;
} while (reExecute); } while (reExecute);


Expand Down Expand Up @@ -1035,6 +1055,18 @@ private void execProcedure(final RootProcedureState procStack, final Procedure p
} }
} }


private void handleInterruptedException(final Procedure proc, final InterruptedException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("got an interrupt during " + proc + ". suspend and retry it later.", e);
}

// NOTE: We don't call Thread.currentThread().interrupt()
// because otherwise all the subsequent calls e.g. Thread.sleep() will throw
// the InterruptedException. If the master is going down, we will be notified
// and the executor/store will be stopped.
// (The interrupted procedure will be retried on the next run)
}

private void sendProcedureLoadedNotification(final long procId) { private void sendProcedureLoadedNotification(final long procId) {
if (!this.listeners.isEmpty()) { if (!this.listeners.isEmpty()) {
for (ProcedureExecutorListener listener: this.listeners) { for (ProcedureExecutorListener listener: this.listeners) {
Expand Down
Expand Up @@ -42,7 +42,7 @@ public abstract class SequentialProcedure<TEnvironment> extends Procedure<TEnvir


@Override @Override
protected Procedure[] doExecute(final TEnvironment env) protected Procedure[] doExecute(final TEnvironment env)
throws ProcedureYieldException { throws ProcedureYieldException, InterruptedException {
updateTimestamp(); updateTimestamp();
try { try {
Procedure[] children = !executed ? execute(env) : null; Procedure[] children = !executed ? execute(env) : null;
Expand All @@ -54,7 +54,8 @@ protected Procedure[] doExecute(final TEnvironment env)
} }


@Override @Override
protected void doRollback(final TEnvironment env) throws IOException { protected void doRollback(final TEnvironment env)
throws IOException, InterruptedException {
updateTimestamp(); updateTimestamp();
if (executed) { if (executed) {
try { try {
Expand Down
Expand Up @@ -57,15 +57,15 @@ protected enum Flow {
* Flow.HAS_MORE_STATE if there is another step. * Flow.HAS_MORE_STATE if there is another step.
*/ */
protected abstract Flow executeFromState(TEnvironment env, TState state) protected abstract Flow executeFromState(TEnvironment env, TState state)
throws ProcedureYieldException; throws ProcedureYieldException, InterruptedException;


/** /**
* called to perform the rollback of the specified state * called to perform the rollback of the specified state
* @param state state to rollback * @param state state to rollback
* @throws IOException temporary failure, the rollback will retry later * @throws IOException temporary failure, the rollback will retry later
*/ */
protected abstract void rollbackState(TEnvironment env, TState state) protected abstract void rollbackState(TEnvironment env, TState state)
throws IOException; throws IOException, InterruptedException;


/** /**
* Convert an ordinal (or state id) to an Enum (or more descriptive) state object. * Convert an ordinal (or state id) to an Enum (or more descriptive) state object.
Expand Down Expand Up @@ -95,12 +95,24 @@ protected void setNextState(final TState state) {
setNextState(getStateId(state)); setNextState(getStateId(state));
} }


/**
* By default, the executor will try ro run all the steps of the procedure start to finish.
* Return true to make the executor yield between execution steps to
* give other procedures time to run their steps.
* @param state the state we are going to execute next.
* @return Return true if the executor should yield before the execution of the specified step.
* Defaults to return false.
*/
protected boolean isYieldBeforeExecuteFromState(TEnvironment env, TState state) {
return false;
}

@Override @Override
protected Procedure[] execute(final TEnvironment env) protected Procedure[] execute(final TEnvironment env)
throws ProcedureYieldException { throws ProcedureYieldException, InterruptedException {
updateTimestamp(); updateTimestamp();
try { try {
TState state = stateCount > 0 ? getState(states[stateCount-1]) : getInitialState(); TState state = getCurrentState();
if (stateCount == 0) { if (stateCount == 0) {
setNextState(getStateId(state)); setNextState(getStateId(state));
} }
Expand All @@ -115,16 +127,26 @@ protected Procedure[] execute(final TEnvironment env)
} }


@Override @Override
protected void rollback(final TEnvironment env) throws IOException { protected void rollback(final TEnvironment env)
throws IOException, InterruptedException {
try { try {
updateTimestamp(); updateTimestamp();
rollbackState(env, stateCount > 0 ? getState(states[stateCount-1]) : getInitialState()); rollbackState(env, getCurrentState());
stateCount--; stateCount--;
} finally { } finally {
updateTimestamp(); updateTimestamp();
} }
} }


@Override
protected boolean isYieldAfterExecutionStep(final TEnvironment env) {
return isYieldBeforeExecuteFromState(env, getCurrentState());
}

private TState getCurrentState() {
return stateCount > 0 ? getState(states[stateCount-1]) : getInitialState();
}

/** /**
* Set the next state for the procedure. * Set the next state for the procedure.
* @param stateId the ordinal() of the state enum (or state id) * @param stateId the ordinal() of the state enum (or state id)
Expand Down

0 comments on commit d86f2fa

Please sign in to comment.