Skip to content

Commit

Permalink
Implement suspend/resume for multi-node tasks
Browse files Browse the repository at this point in the history
Now it is possible to correctly suspend/resume multi-node tasks,
i.e. the whole task trees. Also, the waiting reason value of WORKFLOW
is now deprecated.
  • Loading branch information
mederly committed Mar 12, 2018
1 parent 0546e7f commit a7549d2
Show file tree
Hide file tree
Showing 23 changed files with 889 additions and 185 deletions.
Expand Up @@ -1992,6 +1992,30 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="unpauseAction" type="tns:TaskUnpauseActionType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What to do after task is unpaused (i.e. stopped waiting)?
The default for single-run tasks is executeImmediately (mainly for backwards compatibility
reasons); the default for recurring tasks is reschedule. It is expected that this item is
set by the code that puts the task into waiting state.
</xsd:documentation>
<xsd:appinfo>
<a:since>3.8</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="stateBeforeSuspend" type="tns:TaskExecutionStatusType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
What was the task state before it was suspended? It could be either runnable or waiting.
This is important because we want to restore the correct state on resume.
</xsd:documentation>
<xsd:appinfo>
<a:since>3.8</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="node" type="xsd:string" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -2572,8 +2596,11 @@
<xsd:annotation>
<xsd:documentation>
The task is waiting for a workflow process (that it monitors/shadows) to be finished.
DEPRECATED. DO NOT USE.
</xsd:documentation>
<xsd:appinfo>
<a:deprecated>true</a:deprecated>
<a:deprecatedSince>3.8</a:deprecatedSince>
<jaxb:typesafeEnumMember name="WORKFLOW"/>
</xsd:appinfo>
</xsd:annotation>
Expand All @@ -2591,6 +2618,51 @@
</xsd:restriction>
</xsd:simpleType>

<xsd:simpleType name="TaskUnpauseActionType">
<xsd:annotation>
<xsd:documentation>
What to do after task is unpaused (i.e. stopped waiting)?
EXPERIMENTAL
</xsd:documentation>
<xsd:appinfo>
<a:since>3.8</a:since>
</xsd:appinfo>
</xsd:annotation>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="executeImmediately">
<xsd:annotation>
<xsd:documentation>
Execute current handler immediately (if there is any). This is the default for single-run tasks.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="EXECUTE_IMMEDIATELY"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="reschedule">
<xsd:annotation>
<xsd:documentation>
Execute current handler (if there is any) on next scheduled time. This is the default for recurring tasks.
For single-run tasks this means the task is closed, as there is no next scheduled time.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="RESCHEDULE"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="close">
<xsd:annotation>
<xsd:documentation>
The task will be closed.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="CLOSE"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
</xsd:restriction>
</xsd:simpleType>

<xsd:simpleType name="ThreadStopActionType">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -194,11 +194,21 @@ public static void displayWhen(String testName) {
LOGGER.info(TEST_LOG_SECTION_PREFIX + " WHEN " + testName + TEST_LOG_SECTION_SUFFIX);
}

public static void displayWhen(String testName, String part) {
System.out.println(TEST_OUT_SECTION_PREFIX + " WHEN " + testName + " (" + part + ")" + TEST_OUT_SECTION_SUFFIX);
LOGGER.info(TEST_LOG_SECTION_PREFIX + " WHEN " + testName + " (" + part + ")" + TEST_LOG_SECTION_SUFFIX);
}

public static void displayThen(String testName) {
System.out.println(TEST_OUT_SECTION_PREFIX + " THEN " + testName + TEST_OUT_SECTION_SUFFIX);
LOGGER.info(TEST_LOG_SECTION_PREFIX + " THEN " + testName + TEST_LOG_SECTION_SUFFIX);
}

public static void displayThen(String testName, String part) {
System.out.println(TEST_OUT_SECTION_PREFIX + " THEN " + testName + " (" + part + ")" + TEST_OUT_SECTION_SUFFIX);
LOGGER.info(TEST_LOG_SECTION_PREFIX + " THEN " + testName + " (" + part + ")" + TEST_LOG_SECTION_SUFFIX);
}

public static void displayCleanup(String testName) {
System.out.println(TEST_OUT_SECTION_PREFIX + " CLEANUP " + testName + TEST_OUT_SECTION_SUFFIX);
LOGGER.info(TEST_LOG_SECTION_PREFIX + " CLEANUP " + testName + TEST_LOG_SECTION_SUFFIX);
Expand Down
Expand Up @@ -396,7 +396,7 @@ public Task createTask(WfTaskController taskController, Task parentTask, WfConfi
task.pushHandlerUri(WfProcessInstanceShadowTaskHandler.HANDLER_URI, schedule, TaskBinding.LOOSE);
} else {
task.pushHandlerUri(WfProcessInstanceShadowTaskHandler.HANDLER_URI, new ScheduleType(), null); // note that this handler will not be actively used (at least for now)
task.makeWaiting();
task.makeWaiting(TaskWaitingReason.OTHER);
}
}

Expand Down
Expand Up @@ -612,6 +612,11 @@ public void makeWaiting(TaskWaitingReason reason) {
throw new UnsupportedOperationException("not implemented yet.");
}

@Override
public void makeWaiting(TaskWaitingReason reason, TaskUnpauseActionType unpauseAction) {
throw new UnsupportedOperationException("not implemented yet.");
}

@Override
public void pushWaitForTasksHandlerUri() {
throw new UnsupportedOperationException("not implemented yet.");
Expand Down Expand Up @@ -884,4 +889,14 @@ public TaskWorkStateConfigurationType getWorkStateConfiguration() {
public TaskWorkStateType getWorkState() {
return null;
}

@Override
public TaskUnpauseActionType getUnpauseAction() {
return null;
}

@Override
public TaskExecutionStatusType getStateBeforeSuspend() {
return null;
}
}
Expand Up @@ -27,13 +27,14 @@ public enum RTaskWaitingReason implements SchemaEnum<TaskWaitingReasonType> {

OTHER_TASKS(TaskWaitingReasonType.OTHER_TASKS),

// DEPRECATED
WORKFLOW(TaskWaitingReasonType.WORKFLOW),

OTHER(TaskWaitingReasonType.OTHER);

private TaskWaitingReasonType reason;

private RTaskWaitingReason(TaskWaitingReasonType reason) {
RTaskWaitingReason(TaskWaitingReasonType reason) {
this.reason = reason;
}

Expand Down
Expand Up @@ -376,11 +376,11 @@ public static String getAttributeValue(ShadowType repoShadow, QName name) {
return values.iterator().next();
}

public static void waitFor(String message, Checker checker, int timeoutInterval) throws CommonException {
public static void waitFor(String message, Checker checker, long timeoutInterval) throws CommonException {
waitFor(message, checker, timeoutInterval, WAIT_FOR_LOOP_SLEEP_MILIS);
}

public static void waitFor(String message, Checker checker, int timeoutInterval, long sleepInterval) throws CommonException {
public static void waitFor(String message, Checker checker, long timeoutInterval, long sleepInterval) throws CommonException {
System.out.println(message);
LOGGER.debug(LOG_MESSAGE_PREFIX + message);
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -523,9 +523,7 @@ public static void display(String title, Throwable e) {
String stackTrace = ExceptionUtils.getStackTrace(e);
System.out.println(OBJECT_TITLE_OUT_PREFIX + title + ": "+e.getClass() + " " + e.getMessage());
System.out.println(stackTrace);
LOGGER.debug("{}{}: {} {}\n{}", new Object[]{
OBJECT_TITLE_LOG_PREFIX, title, e.getClass(), e.getMessage(),
stackTrace});
LOGGER.debug("{}{}: {} {}\n{}", OBJECT_TITLE_LOG_PREFIX, title, e.getClass(), e.getMessage(), stackTrace);
}

public static void displayPrismValuesCollection(String message, Collection<? extends PrismValue> collection) {
Expand Down
Expand Up @@ -170,6 +170,8 @@ void setNameImmediate(PolyStringType value, OperationResult parentResult)
*/
void makeWaiting(TaskWaitingReason reason);

void makeWaiting(TaskWaitingReason reason, TaskUnpauseActionType unpauseAction);

/**
* Status-changing method. It changes task's execution status to RUNNABLE.
* Currently use ONLY on transient tasks.
Expand Down Expand Up @@ -1032,4 +1034,8 @@ void savePendingModifications(OperationResult parentResult) throws ObjectNotFoun
TaskWorkStateConfigurationType getWorkStateConfiguration();

TaskWorkStateType getWorkState();

TaskUnpauseActionType getUnpauseAction();

TaskExecutionStatusType getStateBeforeSuspend();
}
Expand Up @@ -458,7 +458,18 @@ public void modifyTask(String oid, Collection<? extends ItemDelta> modifications
*/
void resumeTasks(Collection<String> taskOids, OperationResult parentResult);

/**
boolean suspendTaskTree(String coordinatorOid, long waitTime, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException;

void resumeTaskTree(String coordinatorOid, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException;

/**
* TODO is this method really necessary?
*/
void scheduleCoordinatorAndWorkersNow(String coordinatorOid, OperationResult parentResult) throws SchemaException, ObjectNotFoundException;

/**
* Puts a runnable/running task into WAITING state.
*
* @param task a runnable/running task
Expand Down
Expand Up @@ -30,11 +30,6 @@ public enum TaskWaitingReason {
*/
OTHER_TASKS,

/**
* The task is waiting for a workflow process (that it monitors/shadows) to be finished.
*/
WORKFLOW,

/**
* The task is waiting because of other reason.
*/
Expand All @@ -48,7 +43,7 @@ public static TaskWaitingReason fromTaskType(TaskWaitingReasonType xmlValue) {
}
switch (xmlValue) {
case OTHER_TASKS: return OTHER_TASKS;
case WORKFLOW: return WORKFLOW;
case WORKFLOW: return OTHER;
case OTHER: return OTHER;
default: throw new IllegalArgumentException("Unknown waiting reason type " + xmlValue);
}
Expand All @@ -58,7 +53,6 @@ public TaskWaitingReasonType toTaskType() {

switch (this) {
case OTHER_TASKS: return TaskWaitingReasonType.OTHER_TASKS;
case WORKFLOW: return TaskWaitingReasonType.WORKFLOW;
case OTHER: return TaskWaitingReasonType.OTHER;
default: throw new IllegalArgumentException("Unknown execution status type "+this);
}
Expand Down

0 comments on commit a7549d2

Please sign in to comment.