Skip to content

Commit

Permalink
Fix suspend/resume of multithreaded task trees
Browse files Browse the repository at this point in the history
If a task tree contains multithreaded workers, suspend/resume
(including suspend+delete) of the whole tree failed; it is now fixed.

Also added new action "Delete workers and work state" that brings
multinode task into more-or-less initial state.
  • Loading branch information
mederly committed May 3, 2018
1 parent 0e1a1a6 commit d8a9de2
Show file tree
Hide file tree
Showing 15 changed files with 227 additions and 39 deletions.
Expand Up @@ -120,6 +120,7 @@ public class PageTasks extends PageAdminTasks implements Refreshable {
private static final String OPERATION_RESUME_TASK = DOT_CLASS + "resumeTask";
private static final String OPERATION_DELETE_TASKS = DOT_CLASS + "deleteTasks";
private static final String OPERATION_RECONCILE_WORKERS = DOT_CLASS + "reconcileWorkers";
private static final String OPERATION_DELETE_WORKERS_AND_WORK_STATE = DOT_CLASS + "deleteWorkersAndWorkState";
private static final String OPERATION_DELETE_ALL_CLOSED_TASKS = DOT_CLASS + "deleteAllClosedTasks";
private static final String OPERATION_SCHEDULE_TASKS = DOT_CLASS + "scheduleTasks";
private static final String OPERATION_DELETE_NODES = DOT_CLASS + "deleteNodes";
Expand Down Expand Up @@ -956,6 +957,33 @@ public IModel<String> getConfirmationMessageModel() {
return PageTasks.this.getTaskConfirmationMessageModel((ColumnMenuAction) getAction(), actionName);
}
});
items.add(new InlineMenuItem(createStringResource("pageTasks.button.deleteWorkersAndWorkState"), false,
new ColumnMenuAction<TaskDto>() {

@Override
public void onClick(AjaxRequestTarget target) {
if (getRowModel() == null) {
throw new UnsupportedOperationException();
} else {
TaskDto rowDto = getRowModel().getObject();
deleteWorkersAndWorkState(target, rowDto);
}
}
}) {

private static final long serialVersionUID = 1L;

@Override
public boolean isShowConfirmationDialog() {
return PageTasks.this.isTaskShowConfirmationDialog((ColumnMenuAction) getAction());
}

@Override
public IModel<String> getConfirmationMessageModel() {
String actionName = createStringResource("pageTasks.message.deleteWorkersAndWorkState").getString();
return PageTasks.this.getTaskConfirmationMessageModel((ColumnMenuAction) getAction(), actionName);
}
});
}
}
if (isHeader) {
Expand Down Expand Up @@ -1794,6 +1822,22 @@ private void resumeCoordinatorOnly(AjaxRequestTarget target, @NotNull TaskDto ta
refreshTables(target);
}

private void deleteWorkersAndWorkState(AjaxRequestTarget target, @NotNull TaskDto task) {
Task opTask = createSimpleTask(OPERATION_DELETE_WORKERS_AND_WORK_STATE);
OperationResult result = opTask.getResult();
try {
getTaskService().deleteWorkersAndWorkState(task.getOid(), WAIT_FOR_TASK_STOP, opTask, result);
result.computeStatus();
} catch (ObjectNotFoundException | SchemaException | SecurityViolationException | ExpressionEvaluationException | RuntimeException | CommunicationException | ConfigurationException e) {
result.recordFatalError("Couldn't delete workers and the work state of the coordinator", e); // todo i18n
}
showResult(result);

TaskDtoProvider provider = (TaskDtoProvider) getTaskTable().getDataTable().getDataProvider();
provider.clearCache();
refreshTables(target);
}

private static class SearchFragment extends Fragment {

public SearchFragment(String id, String markupId, MarkupContainer markupProvider,
Expand Down
Expand Up @@ -881,6 +881,8 @@ operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.stopSchedulersAn
operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.stopSchedulers=Stop schedulers (Gui)
operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.suspendTasks=Suspend tasks (Gui)
operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.synchronizeTasks=Synchronize tasks (Gui)
operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.deleteWorkersAndWorkState=Delete workers and work state (Gui)
operation.com.evolveum.midpoint.web.page.admin.server.PageTasks.reconcileWorkers=Reconcile workers (Gui)
operation.com.evolveum.midpoint.web.page.admin.users.component.TreeTablePanel.deleteObject=Delete object (Gui)
operation.com.evolveum.midpoint.web.page.admin.users.component.TreeTablePanel.deleteObjects=Delete objects (Gui)
operation.com.evolveum.midpoint.web.page.admin.users.component.TreeTablePanel.moveObject=Move object (Gui)
Expand Down Expand Up @@ -2139,6 +2141,7 @@ pageTasks.button.deactivateServiceThreads=Stop all threads
pageTasks.button.deleteNode=Delete
pageTasks.button.deleteTask=Delete
pageTasks.button.reconcileWorkers=Reconcile workers
pageTasks.button.deleteWorkersAndWorkState=Delete workers and work state
pageTasks.button.suspendCoordinatorOnly=Suspend (coordinator only)
pageTasks.button.resumeCoordinatorOnly=Resume (coordinator only)
pageTasks.button.deleteAllClosedTasks=Delete all closed tasks
Expand Down Expand Up @@ -2176,6 +2179,7 @@ pageTasks.message.resumeAction=resume
pageTasks.message.runNowAction=run now
pageTasks.message.deleteAction=delete
pageTasks.message.reconcileWorkersAction=reconcile workers of
pageTasks.message.deleteWorkersAndWorkState=delete workers and work state of
pageTasks.message.deleteAllClosedTasksAction=delete all closed tasks
pageTasks.message.startAction=start
pageTasks.message.scheduleTaskAction=schedule task
Expand Down
Expand Up @@ -198,5 +198,11 @@ public interface TaskService {
void reconcileWorkers(String oid, Task opTask, OperationResult result)
throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException,
ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException;


void deleteWorkersAndWorkState(String coordinatorOid, long subtasksWaitTime, Task operationTask, OperationResult parentResult)
throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException,
CommunicationException, ConfigurationException;

//endregion
}
Expand Up @@ -1903,6 +1903,14 @@ public void reconcileWorkers(String oid, Task opTask, OperationResult result)
taskManager.reconcileWorkers(oid, null, result);
}

@Override
public void deleteWorkersAndWorkState(String coordinatorOid, long subtasksWaitTime, Task operationTask, OperationResult parentResult)
throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException,
CommunicationException, ConfigurationException {
securityEnforcer.authorize(AuthorizationConstants.AUTZ_ALL_URL, null, AuthorizationParameters.EMPTY, null, operationTask, parentResult);
taskManager.deleteWorkersAndWorkState(coordinatorOid, subtasksWaitTime, parentResult);
}

@Override
public List<String> getAllTaskCategories() {
return taskManager.getAllTaskCategories();
Expand Down
Expand Up @@ -563,7 +563,7 @@ public void finishHandler(OperationResult parentResult) throws ObjectNotFoundExc

@NotNull
@Override
public List<Task> listSubtasks(OperationResult parentResult) throws SchemaException {
public List<Task> listSubtasks(boolean persistentOnly, OperationResult parentResult) throws SchemaException {
throw new UnsupportedOperationException("not implemented yet.");
}

Expand Down Expand Up @@ -649,7 +649,7 @@ public void addExtensionReference(PrismReference reference) throws SchemaExcepti
}

@Override
public List<Task> listSubtasksDeeply(OperationResult result) throws SchemaException {
public List<Task> listSubtasksDeeply(boolean persistentOnly, OperationResult result) throws SchemaException {
throw new UnsupportedOperationException("not implemented yet.");
}

Expand Down
Expand Up @@ -838,7 +838,12 @@ void setResultImmediate(OperationResult result, OperationResult parentResult)
* @throws SchemaException
*/
@NotNull
List<Task> listSubtasks(OperationResult parentResult) throws SchemaException;
default List<Task> listSubtasks(OperationResult parentResult) throws SchemaException {
return listSubtasks(false, parentResult);
}

@NotNull
List<Task> listSubtasks(boolean persistentOnly, OperationResult parentResult) throws SchemaException;

/**
* List all the subtasks of a given task, i.e. whole task tree rooted at the current task.
Expand All @@ -848,7 +853,11 @@ void setResultImmediate(OperationResult result, OperationResult parentResult)
* @return
* @throws SchemaException
*/
List<Task> listSubtasksDeeply(OperationResult result) throws SchemaException;
default List<Task> listSubtasksDeeply(OperationResult result) throws SchemaException {
return listSubtasksDeeply(false, result);
}

List<Task> listSubtasksDeeply(boolean persistentOnly, OperationResult result) throws SchemaException;

/**
* Lists all explicit dependents, i.e. tasks that wait for the completion of this tasks (that depend on it).
Expand Down
Expand Up @@ -418,6 +418,9 @@ void resumeTaskTree(String coordinatorOid, OperationResult parentResult)
void reconcileWorkers(String coordinatorOid, WorkersReconciliationOptions options, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException;

void deleteWorkersAndWorkState(String coordinatorOid, long subtasksWaitTime, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException;

/**
* TODO is this method really necessary?
*/
Expand Down
Expand Up @@ -375,7 +375,7 @@ public boolean suspendTaskTree(String rootTaskOid, long waitTime, OperationResul

try {
TaskQuartzImpl root = getTask(rootTaskOid, result);
List<Task> subtasks = root.listSubtasksDeeply(parentResult);
List<Task> subtasks = root.listSubtasksDeeply(true, parentResult);
List<String> oidsToSuspend = new ArrayList<>(subtasks.size() + 1);
oidsToSuspend.add(rootTaskOid);
for (Task subtask : subtasks) {
Expand All @@ -397,7 +397,7 @@ public void resumeTaskTree(String rootTaskOid, OperationResult parentResult) thr

try {
TaskQuartzImpl root = getTask(rootTaskOid, result);
List<Task> subtasks = root.listSubtasks(parentResult);
List<Task> subtasks = root.listSubtasks(true, parentResult);
List<String> oidsToResume = new ArrayList<>(subtasks.size() + 1);
if (root.getExecutionStatus() == TaskExecutionStatus.SUSPENDED) {
oidsToResume.add(rootTaskOid);
Expand Down Expand Up @@ -432,6 +432,22 @@ public void reconcileWorkers(String coordinatorTaskOid, WorkersReconciliationOpt
}
}

@Override
public void deleteWorkersAndWorkState(String coordinatorTaskOid, long subtasksWaitTime, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException {
OperationResult result = parentResult.createSubresult(DOT_INTERFACE + "deleteWorkersAndWorkState");
result.addParam("coordinatorTaskOid", coordinatorTaskOid);
result.addParam("subtasksWaitTime", subtasksWaitTime);
try {
workersManager.deleteWorkersAndWorkState(coordinatorTaskOid, subtasksWaitTime, result);
} catch (Throwable t) {
result.recordFatalError("Couldn't delete workers and work state", t);
throw t;
} finally {
result.computeStatusIfUnknown();
}
}

@Override
public void scheduleCoordinatorAndWorkersNow(String coordinatorOid, OperationResult parentResult) throws SchemaException, ObjectNotFoundException {
OperationResult result = parentResult.createSubresult(DOT_INTERFACE + "scheduleCoordinatorAndWorkersNow");
Expand Down Expand Up @@ -921,7 +937,7 @@ public void suspendAndDeleteTasks(Collection<String> taskOids, long suspendTimeo
Task task = getTask(oid, result);
tasksToBeDeleted.add(task);
if (alsoSubtasks) {
tasksToBeDeleted.addAll(task.listSubtasksDeeply(result));
tasksToBeDeleted.addAll(task.listSubtasksDeeply(true, result));
}
} catch (ObjectNotFoundException e) {
// just skip suspending/deleting this task. As for the error, it should be already put into result.
Expand Down Expand Up @@ -1231,7 +1247,7 @@ private void fillInSubtasks(TaskType task, ClusterStatusInformation clusterStatu
boolean retrieveRetryTime = SelectorOptions.hasToLoadPath(new ItemPath(TaskType.F_NEXT_RETRY_TIMESTAMP), options);
boolean retrieveNodeAsObserved = SelectorOptions.hasToLoadPath(new ItemPath(TaskType.F_NODE_AS_OBSERVED), options);

List<PrismObject<TaskType>> subtasks = listSubtasksForTask(task.getTaskIdentifier(), result);
List<PrismObject<TaskType>> subtasks = listPersistentSubtasksForTask(task.getTaskIdentifier(), result);

for (PrismObject<TaskType> subtask : subtasks) {

Expand All @@ -1249,7 +1265,7 @@ private void fillInSubtasks(TaskType task, ClusterStatusInformation clusterStatu
}
}

public List<PrismObject<TaskType>> listSubtasksForTask(String taskIdentifier, OperationResult result) throws SchemaException {
public List<PrismObject<TaskType>> listPersistentSubtasksForTask(String taskIdentifier, OperationResult result) throws SchemaException {

if (StringUtils.isEmpty(taskIdentifier)) {
return new ArrayList<>();
Expand Down Expand Up @@ -2007,7 +2023,7 @@ public void cleanupTasks(CleanupPolicyType policy, Task executionTask, Operation
try {
// get whole tree
Task rootTask = createTaskInstance(rootTaskPrism, result);
List<Task> taskTreeMembers = rootTask.listSubtasksDeeply(result);
List<Task> taskTreeMembers = rootTask.listSubtasksDeeply(true, result);
taskTreeMembers.add(rootTask);

LOGGER.trace("Removing task {} along with its {} children.", rootTask, taskTreeMembers.size() - 1);
Expand Down
Expand Up @@ -2659,8 +2659,8 @@ public TaskRunResult waitForSubtasks(Integer interval, Collection<ItemDelta<?, ?
// return true;
// }

public List<PrismObject<TaskType>> listSubtasksRaw(OperationResult parentResult) throws SchemaException {
OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "listSubtasksRaw");
public List<PrismObject<TaskType>> listPersistentSubtasksRaw(OperationResult parentResult) throws SchemaException {
OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "listPersistentSubtasksRaw");
result.addContext(OperationResult.CONTEXT_OID, getOid());
result.addContext(OperationResult.CONTEXT_IMPLEMENTATION_CLASS, TaskQuartzImpl.class);

Expand All @@ -2669,7 +2669,7 @@ public List<PrismObject<TaskType>> listSubtasksRaw(OperationResult parentResult)
return new ArrayList<>(0);
}

return taskManager.listSubtasksForTask(getTaskIdentifier(), result);
return taskManager.listPersistentSubtasksForTask(getTaskIdentifier(), result);
}

public List<PrismObject<TaskType>> listPrerequisiteTasksRaw(OperationResult parentResult) throws SchemaException {
Expand All @@ -2688,41 +2688,42 @@ public List<PrismObject<TaskType>> listPrerequisiteTasksRaw(OperationResult pare

@NotNull
@Override
public List<Task> listSubtasks(OperationResult parentResult) throws SchemaException {

public List<Task> listSubtasks(boolean persistentOnly, OperationResult parentResult) throws SchemaException {
OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "listSubtasks");
result.addParam("persistentOnly", persistentOnly);
result.addContext(OperationResult.CONTEXT_OID, getOid());
result.addContext(OperationResult.CONTEXT_IMPLEMENTATION_CLASS, TaskQuartzImpl.class);

return listSubtasksInternal(result);
return listSubtasksInternal(persistentOnly, result);
}

@NotNull
private List<Task> listSubtasksInternal(OperationResult result) throws SchemaException {
List<Task> retval = new ArrayList<>();
private List<Task> listSubtasksInternal(boolean persistentOnly, OperationResult result) throws SchemaException {
// persistent subtasks
retval.addAll(taskManager.resolveTasksFromTaskTypes(listSubtasksRaw(result), result));
List<Task> retval = new ArrayList<>(taskManager.resolveTasksFromTaskTypes(listPersistentSubtasksRaw(result), result));
// transient asynchronous subtasks - must be taken from the running task instance!
retval.addAll(taskManager.getTransientSubtasks(this));
if (!persistentOnly) {
retval.addAll(taskManager.getTransientSubtasks(this));
}
return retval;
}

@Override
public List<Task> listSubtasksDeeply(OperationResult parentResult) throws SchemaException {
public List<Task> listSubtasksDeeply(boolean persistentOnly, OperationResult parentResult) throws SchemaException {

OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "listSubtasksDeeply");
result.addParam("persistentOnly", persistentOnly);
result.addContext(OperationResult.CONTEXT_OID, getOid());
result.addContext(OperationResult.CONTEXT_IMPLEMENTATION_CLASS, TaskQuartzImpl.class);

ArrayList<Task> retval = new ArrayList<>();
addSubtasks(retval, this, result);
addSubtasks(retval, this, persistentOnly, result);
return retval;
}

private void addSubtasks(ArrayList<Task> tasks, TaskQuartzImpl taskToProcess, OperationResult result) throws SchemaException {
for (Task task : taskToProcess.listSubtasksInternal(result)) {
private void addSubtasks(ArrayList<Task> tasks, TaskQuartzImpl taskToProcess, boolean persistentOnly, OperationResult result) throws SchemaException {
for (Task task : taskToProcess.listSubtasksInternal(persistentOnly, result)) {
tasks.add(task);
addSubtasks(tasks, (TaskQuartzImpl) task, result);
addSubtasks(tasks, (TaskQuartzImpl) task, persistentOnly, result);
}
}

Expand Down
Expand Up @@ -22,7 +22,6 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskHandler;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskRunResult;
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
Expand Down Expand Up @@ -66,7 +65,7 @@ public TaskRunResult run(Task task) {

List<PrismObject<TaskType>> subtasks = null;
try {
subtasks = ((TaskQuartzImpl) task).listSubtasksRaw(opResult);
subtasks = ((TaskQuartzImpl) task).listPersistentSubtasksRaw(opResult);
} catch (SchemaException e) {
throw new SystemException("Couldn't list subtasks of " + task + " due to schema exception", e);
}
Expand Down
Expand Up @@ -66,7 +66,7 @@ public TaskRunResult run(Task task) {

try {
setOrCheckTaskKind(task, opResult);
List<Task> workers = task.listSubtasks(opResult);
List<Task> workers = task.listSubtasks(true, opResult);
boolean clean = task.getWorkState() == null || Boolean.TRUE.equals(task.getWorkState().isAllWorkComplete());
// todo consider checking that the subtask is really a worker (workStateConfiguration.taskKind)
if (clean) {
Expand Down Expand Up @@ -98,7 +98,7 @@ public TaskRunResult run(Task task) {
taskManager.reconcileWorkers(task.getOid(), options, opResult);
task.makeWaiting(TaskWaitingReason.OTHER_TASKS, TaskUnpauseActionType.RESCHEDULE); // i.e. close for single-run tasks
task.savePendingModifications(opResult);
taskManager.resumeTasks(TaskUtil.tasksToOids(task.listSubtasks(opResult)), opResult);
taskManager.resumeTasks(TaskUtil.tasksToOids(task.listSubtasks(true, opResult)), opResult);
LOGGER.info("Worker tasks were successfully created for coordinator {}", task);
} catch (SchemaException | ObjectNotFoundException | ObjectAlreadyExistsException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't (re)create workers for {}", e, task);
Expand Down Expand Up @@ -133,7 +133,7 @@ private boolean deleteWorkersAndWorkState(List<Task> workers, Task task, Operati
deleteWorkState(task, opResult);
for (Task worker : workers) {
try {
List<Task> workerSubtasks = worker.listSubtasks(opResult);
List<Task> workerSubtasks = worker.listSubtasks(true, opResult);
if (!workerSubtasks.isEmpty()) {
LOGGER.warn("Couldn't recreate worker task {} because it has its own subtasks: {}", worker, workerSubtasks);
opResult.recordFatalError("Couldn't recreate worker task " + worker + " because it has its own subtasks: " + workerSubtasks);
Expand Down
Expand Up @@ -67,7 +67,7 @@ public TaskRunResult run(Task task) {
runResult.setOperationResult(opResult);

try {
List<Task> workers = task.listSubtasks(opResult);
List<Task> workers = task.listSubtasks(true, opResult);
List<Task> workersNotClosed = workers.stream()
.filter(w -> w.getExecutionStatus() != TaskExecutionStatus.CLOSED)
.collect(Collectors.toList());
Expand Down

0 comments on commit d8a9de2

Please sign in to comment.