Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ protected void rollback(Env env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
protected boolean abort(Env env) {
throw new UnsupportedOperationException();
}

@Override
public void deserialize(ByteBuffer byteBuffer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,6 @@ protected abstract Procedure<Env>[] execute(Env env)
protected abstract void rollback(Env env)
throws IOException, InterruptedException, ProcedureException;

/**
* The abort() call is asynchronous and each procedure must decide how to deal with it, if they
* want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
* abort() method and then the execute() will check if the abort flag is set or not. abort() may
* be called multiple times from the client, so the implementation must be idempotent.
*
* <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
* procedure implementor abort.
*/
protected abstract boolean abort(Env env);

public void serialize(DataOutputStream stream) throws IOException {
// procid
stream.writeLong(this.procId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand Down Expand Up @@ -418,7 +417,6 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
return;
}
boolean suspended = false;
boolean reExecute;

Procedure<Env>[] subprocs = null;
Expand All @@ -430,9 +428,6 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
LOG.debug("Suspend {}", proc);
suspended = true;
} catch (ProcedureYieldException e) {
LOG.debug("Yield {}", proc);
yieldProcedure(proc);
Expand All @@ -455,7 +450,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.info("Added into timeoutExecutor {}", proc);
} else if (!suspended) {
} else {
proc.setState(ProcedureState.SUCCESS);
}
}
Expand All @@ -470,7 +465,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
return;
}

if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) {
yieldProcedure(proc);
return;
}
Expand All @@ -481,7 +476,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}

releaseLock(proc, false);
if (!suspended && proc.isFinished() && proc.hasParent()) {
if (proc.isFinished() && proc.hasParent()) {
countDownChildren(rootProcStack, proc);
}
}
Expand Down Expand Up @@ -796,21 +791,6 @@ public long getCurrentRunTime() {
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {

public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KAProcExecWorker-");
this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
}

@Override
protected boolean keepAlive(long lastUpdate) {
return System.currentTimeMillis() - lastUpdate < keepAliveTime;
}
}

private final class WorkerMonitor extends InternalProcedure<Env> {
private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 30000; // 30sec

Expand All @@ -823,7 +803,7 @@ public WorkerMonitor() {
updateTimestamp();
}

private int checkForStuckWorkers() {
private void checkForStuckWorkers() {
// Check if any of the worker is stuck
int stuckCount = 0;
for (WorkerThread worker : workerThreads) {
Expand All @@ -840,31 +820,12 @@ private int checkForStuckWorkers() {
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
}
return stuckCount;
}

private void checkThreadCount(final int stuckCount) {
// Nothing to do if there are no runnable tasks
if (stuckCount < 1 || !scheduler.hasRunnables()) {
return;
}
// Add a new thread if the worker stuck percentage exceed the threshold limit
// and every handler is active.
final float stuckPerc = ((float) stuckCount) / workerThreads.size();
// Let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread {}", worker);
}
LOG.warn("There are {} workers stuck", stuckCount);
}

@Override
protected void periodicExecute(Env env) {
final int stuckCount = checkForStuckWorkers();
checkThreadCount(stuckCount);
checkForStuckWorkers();
updateTimestamp();
}
}
Expand Down Expand Up @@ -942,28 +903,6 @@ public long submitProcedure(Procedure<Env> procedure) {
return pushProcedure(procedure);
}

/**
* Abort a specified procedure.
*
* @param procId procedure id
* @param force whether abort the running procdure.
* @return true if the procedure exists and has received the abort.
*/
public boolean abort(long procId, boolean force) {
Procedure<Env> procedure = procedures.get(procId);
if (procedure != null) {
if (!force && procedure.wasExecuted()) {
return false;
}
return procedure.abort(this.environment);
}
return false;
}

public boolean abort(long procId) {
return abort(procId, true);
}

public ProcedureScheduler getScheduler() {
return scheduler;
}
Expand Down
Loading