Skip to content

Commit

Permalink
Introduce RUNNING execution state
Browse files Browse the repository at this point in the history
1) Now the execution state can be both RUNNABLE and RUNNING.
2) Refactored JobExecutor into smaller pieces.
3) Removed obsolete RESTART_REQUESTED, FINISHED_HANDLER task run result
statuses.
4) Removed SUSPENDING task execution state. We will not implement
it in this release.
  • Loading branch information
mederly committed Feb 23, 2021
1 parent b399e0e commit f7a03b7
Show file tree
Hide file tree
Showing 20 changed files with 945 additions and 705 deletions.
Expand Up @@ -33,7 +33,7 @@ public enum TaskDtoExecutionState {
public static TaskDtoExecutionState fromTaskExecutionStatus(TaskExecutionStateType executionStatus, boolean running) {
if (running) {
if (executionStatus == TaskExecutionStateType.SUSPENDED) {
return SUSPENDING; // todo remove when no longer needed
return SUSPENDING;
} else {
return TaskDtoExecutionState.RUNNING;
}
Expand All @@ -44,7 +44,6 @@ public static TaskDtoExecutionState fromTaskExecutionStatus(TaskExecutionStateTy
case RUNNING: return RUNNING;
case WAITING: return WAITING;
case SUSPENDED: return SUSPENDED;
case SUSPENDING: return SUSPENDING;
case CLOSED: return CLOSED;
default: throw new IllegalArgumentException("executionStatus = " + executionStatus);
}
Expand Down
Expand Up @@ -3603,17 +3603,6 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="suspending">
<xsd:annotation>
<xsd:documentation>
Task suspension was requested and is in progress.
When the task really stops, the status will be switched to SUSPENDED.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="SUSPENDING"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="closed">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -33,8 +33,6 @@ public interface TaskEvent extends Event {

boolean isInterrupted();

boolean isRestartRequested();

OperationResultStatus getOperationResultStatus();

String getMessage();
Expand Down
Expand Up @@ -69,20 +69,14 @@ public boolean isPermanentError() {
@Override
public boolean isFinished() {
return taskRunResult != null &&
(taskRunResult.getRunResultStatus() == TaskRunResult.TaskRunResultStatus.FINISHED ||
taskRunResult.getRunResultStatus() == TaskRunResult.TaskRunResultStatus.FINISHED_HANDLER);
taskRunResult.getRunResultStatus() == TaskRunResult.TaskRunResultStatus.FINISHED;
}

@Override
public boolean isInterrupted() {
return taskRunResult != null && taskRunResult.getRunResultStatus() == TaskRunResult.TaskRunResultStatus.INTERRUPTED;
}

@Override
public boolean isRestartRequested() {
return taskRunResult != null && taskRunResult.getRunResultStatus() == TaskRunResult.TaskRunResultStatus.RESTART_REQUESTED;
}

@Override
public boolean isStatusType(EventStatusType eventStatus) {
if (eventStatus == null) {
Expand Down
Expand Up @@ -20,8 +20,7 @@ public enum RTaskExecutionState implements SchemaEnum<TaskExecutionStateType> {
WAITING(TaskExecutionStateType.WAITING),
SUSPENDED(TaskExecutionStateType.SUSPENDED),
CLOSED(TaskExecutionStateType.CLOSED),
RUNNING(TaskExecutionStateType.RUNNING),
SUSPENDING(TaskExecutionStateType.SUSPENDING);
RUNNING(TaskExecutionStateType.RUNNING);

private TaskExecutionStateType status;

Expand Down
Expand Up @@ -353,6 +353,11 @@ default boolean isSingle() {
default boolean isLooselyBound() {
return getBinding() == TaskBindingType.LOOSE;
}

/** Returns true if the task is tightly bound. */
default boolean isTightlyBound() {
return !isLooselyBound();
}
//endregion

//region Handler URI, category, archetype
Expand Down
Expand Up @@ -32,14 +32,6 @@ public enum TaskRunResultStatus {
*/
FINISHED,

/**
* The task run has finished, and this was the last run of the current handler.
*
* For single-run tasks, the effect is the same as of FINISHED value.
* However, for recurring tasks, this return value causes current handler to be removed from the handler stack.
*/
FINISHED_HANDLER,

/**
* The run has failed.
*
Expand All @@ -63,18 +55,9 @@ public enum TaskRunResultStatus {
* Task run hasn't finished but nevertheless it must end (for now). An example of such a situation is
* when the long-living task run execution is requested to stop (e.g. when suspending the task or
* shutting down the node).
*
* For single-run tasks this state means that the task SHOULD NOT be closed, nor the handler should
* be removed from the handler stack.
*/
INTERRUPTED,

/**
* Task has to be restarted, typically because a new handler was put onto the handler stack during
* the task run.
*/
RESTART_REQUESTED,

/**
* Task has entered waiting state. TODO. EXPERIMENTAL.
*/
Expand Down
Expand Up @@ -63,7 +63,7 @@ public class RunningTaskQuartzImpl extends TaskQuartzImpl implements RunningTask
*/
private final Map<String, RunningTaskQuartzImpl> lightweightAsynchronousSubtasks = new ConcurrentHashMap<>();

private RunningTaskQuartzImpl parentForLightweightAsynchronousTask; // EXPERIMENTAL
private RunningTaskQuartzImpl parentForLightweightAsynchronousTask;

/**
* Is the task handler allowed to run, or should it stop as soon as possible?
Expand Down
Expand Up @@ -163,11 +163,6 @@ private boolean canContinue(RunningTask task, TaskRunResult runResult) {
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.TEMPORARY_ERROR) {
LOGGER.trace("Task encountered temporary error, continuing with the execution cycle. Task = {}", task);
return false;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.RESTART_REQUESTED) {
// in case of RESTART_REQUESTED we have to get (new) current handler and restart it
// this is implemented by pushHandler and by Quartz
LOGGER.trace("Task returned RESTART_REQUESTED state, exiting the execution cycle. Task = {}", task);
return false;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.PERMANENT_ERROR) {
LOGGER.info("Task encountered permanent error, suspending the task. Task = {}", task);
return false;
Expand All @@ -177,9 +172,6 @@ private boolean canContinue(RunningTask task, TaskRunResult runResult) {
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.IS_WAITING) {
LOGGER.trace("Task switched to waiting state, exiting the execution cycle. Task = {}", task);
return true;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.FINISHED_HANDLER) {
LOGGER.trace("Task handler finished with FINISHED_HANDLER, calling task.finishHandler() and exiting the execution cycle. Task = {}", task);
return true;
} else {
throw new IllegalStateException("Invalid value for Task's runResultStatus: " + runResult.getRunResultStatus() + " for task " + task);
}
Expand Down
@@ -0,0 +1,83 @@
package com.evolveum.midpoint.task.quartzimpl.run;

import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.schema.SearchResultList;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.quartzimpl.RunningTaskQuartzImpl;
import com.evolveum.midpoint.task.quartzimpl.TaskBeans;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.NodeType;

import static com.evolveum.midpoint.task.quartzimpl.run.StopJobException.Severity.ERROR;

/**
* We have a suspicion that Quartz (in some cases) allows multiple instances of a given task to run concurrently.
* So we use "node" property to check this.
*
* Note that if the task is being recovered, its "node" property should be already cleared.
*
* The following algorithm is only approximate one. It could generate false positives e.g. if a node goes down abruptly
* (i.e. without stopping tasks cleanly) and then restarts sooner than in "nodeTimeout" (30) seconds. Therefore, its use
* is currently optional.
*/
class ConcurrentExecutionChecker {

private static final Trace LOGGER = TraceManager.getTrace(ConcurrentExecutionChecker.class);
private final RunningTaskQuartzImpl task;
private final TaskBeans beans;

public ConcurrentExecutionChecker(RunningTaskQuartzImpl task, TaskBeans beans) {
this.task = task;
this.beans = beans;
}

public void check(OperationResult result) throws ObjectNotFoundException, SchemaException, StopJobException {
task.refresh(result);
String executingAtNode = task.getNode();
if (executingAtNode == null) {
LOGGER.trace("Current node is null, we assume no concurrent execution");
return;
}

LOGGER.debug("Task {} seems to be executing on node {}", task, executingAtNode);
if (executingAtNode.equals(beans.configuration.getNodeId())) {
RunningTaskQuartzImpl locallyRunningTask = beans.localNodeState
.getLocallyRunningTaskByIdentifier(task.getTaskIdentifier());
if (locallyRunningTask != null) {
throw new StopJobException(ERROR, "Current task %s seems to be already running in thread %s on the"
+ " local node. We will NOT start it here.", null, task, locallyRunningTask.getExecutingThread());
} else {
LOGGER.warn("Current task {} seemed to be already running on the local node but it cannot be found"
+ " there now. Therefore we continue with the Quartz job execution.", task);
}
} else {
ObjectQuery query = beans.prismContext.queryFor(NodeType.class)
.item(NodeType.F_NODE_IDENTIFIER).eq(executingAtNode)
.build();
SearchResultList<PrismObject<NodeType>> nodes = beans.nodeRetriever.searchNodes(query, null, result);
if (nodes.size() > 1) {
throw new IllegalStateException("More than one node with identifier " + executingAtNode + ": " + nodes);
} else if (nodes.size() == 1) {
NodeType remoteNode = nodes.get(0).asObjectable();
if (beans.clusterManager.isCheckingIn(remoteNode)) {
// We should probably contact the remote node and check if the task is really running there.
// But let's keep things simple for the time being.
throw new StopJobException(ERROR,
"Current task %s seems to be already running at node %s that is alive or starting. "
+ "We will NOT start it here.", null, task, remoteNode.getNodeIdentifier());
} else {
LOGGER.warn("Current task {} seems to be already running at node {} but this node is not currently "
+ "checking in (last: {}). So we will start the task here.", task,
remoteNode.getNodeIdentifier(), remoteNode.getLastCheckInTime());
}
} else {
LOGGER.warn("Current task {} seems to be already running at node {} but this node cannot be found"
+ "in the repository. So we will start the task here.", task, executingAtNode);
}
}
}
}

0 comments on commit f7a03b7

Please sign in to comment.