Skip to content

Commit

Permalink
Reimplement workers recon & state deletion
Browse files Browse the repository at this point in the history
This is a preliminary reimplementation of GUI actions
of workers reconciliation + and worker and/or activity state deletion.
  • Loading branch information
mederly committed Jul 7, 2021
1 parent d593641 commit fab7dc9
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public abstract class TaskTablePanel extends MainObjectListPanel<TaskType> {
public static final String OPERATION_RESUME_TASK = DOT_CLASS + "resumeTask";
public static final String OPERATION_DELETE_TASKS = DOT_CLASS + "deleteTasks";
public static final String OPERATION_RECONCILE_WORKERS = DOT_CLASS + "reconcileWorkers";
public static final String OPERATION_DELETE_WORKERS_AND_WORK_STATE = DOT_CLASS + "deleteWorkersAndWorkState";
public static final String OPERATION_DELETE_ACTIVITY_STATE_AND_WORKERS = DOT_CLASS + "deleteActivityStateAndWorkers";
public static final String OPERATION_DELETE_WORK_STATE = DOT_CLASS + "deleteWorkState";
public static final String OPERATION_DELETE_ALL_CLOSED_TASKS = DOT_CLASS + "deleteAllClosedTasks";
public static final String OPERATION_SCHEDULE_TASKS = DOT_CLASS + "scheduleTasks";
Expand Down Expand Up @@ -460,7 +460,7 @@ private InlineMenuItem createResumeRootOnlyMenuAction() {

private InlineMenuItem createDeleteWorkStateAndWorkersMenuAction() {
InlineMenuItem deleteWorkStateAndWorkers = createTaskInlineMenuItem("pageTasks.button.deleteWorkersAndWorkState",
this::deleteWorkersAndWorkState,
this::deleteActivityStateAndWorkers,
"pageTasks.message.deleteWorkersAndWorkState",
(task) -> true,
false);
Expand Down Expand Up @@ -687,11 +687,11 @@ private void resumeRootOnly(AjaxRequestTarget target, @NotNull IModel<Selectable
clearCache();
}

private void deleteWorkersAndWorkState(AjaxRequestTarget target, @NotNull IModel<SelectableBean<TaskType>> task) {
Task opTask = createSimpleTask(OPERATION_DELETE_WORKERS_AND_WORK_STATE);
private void deleteActivityStateAndWorkers(AjaxRequestTarget target, @NotNull IModel<SelectableBean<TaskType>> task) {
Task opTask = createSimpleTask(OPERATION_DELETE_ACTIVITY_STATE_AND_WORKERS);
OperationResult result = opTask.getResult();
try {
getTaskService().deleteWorkersAndWorkState(task.getObject().getValue().getOid(), true, WAIT_FOR_TASK_STOP, opTask, result);
getTaskService().deleteActivityStateAndWorkers(task.getObject().getValue().getOid(), true, WAIT_FOR_TASK_STOP, opTask, result);
result.computeStatus();
} catch (Throwable e) {
result.recordFatalError(createStringResource("pageTasks.message.deleteWorkersAndWorkState.fatalError").getString(),
Expand All @@ -715,7 +715,7 @@ private void deleteWorkState(AjaxRequestTarget target, @NotNull TaskType task) {
Task opTask = createSimpleTask(OPERATION_DELETE_WORK_STATE);
OperationResult result = opTask.getResult();
try {
getTaskService().deleteWorkersAndWorkState(task.getOid(), false, WAIT_FOR_TASK_STOP, opTask, result);
getTaskService().deleteActivityStateAndWorkers(task.getOid(), false, WAIT_FOR_TASK_STOP, opTask, result);
result.computeStatus();
} catch (Throwable e) {
result.recordFatalError(createStringResource("pageTasks.message.deleteWorkState.fatalError").getString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;

/**
* Utility methods for navigating throughout activity trees, potentially distributed throughout a task tree.
*/
Expand All @@ -33,16 +34,31 @@ public class ActivityTreeUtil {

/**
* Transforms activity state objects into custom ones, organized into a tree.
* Delegation states are ignored. Distribution states are considered, and their workers' states are (currently) ignored.
* Delegation states are ignored. Distribution states are considered, and their workers' states
* are packed along them.
*/
public static <X> @NotNull TreeNode<X> transformStates(@NotNull TaskType rootTask,
@NotNull TaskResolver resolver,
@NotNull ActivityStateTransformer<X> transformer) {
TreeNode<X> root = new TreeNode<>();
transformStates(root, getLocalRootPath(rootTask), getLocalRootState(rootTask), rootTask, resolver, transformer);
processStates(rootTask, resolver, new TreeTransformingProcessor<>(transformer, root));
return root;
}

/**
* Processes activity state objects using the same rules as in {@link #transformStates(TaskType, TaskResolver, ActivityStateTransformer)}:
* delegation states are ignored, distribution states are considered, along with all their workers' states.
*/
public static void processStates(@NotNull TaskType rootTask,
@NotNull TaskResolver resolver,
@NotNull ActivityStateProcessor processor) {
processStates(getLocalRootPath(rootTask), getLocalRootState(rootTask), rootTask, resolver, processor);
}

/**
* Special case of {@link #transformStates(TaskType, TaskResolver, ActivityStateTransformer)}: creates a {@link TreeNode}
* of {@link ActivityStateInContext} objects.
*/
public static @NotNull TreeNode<ActivityStateInContext> toStateTree(@NotNull TaskType rootTask,
@NotNull TaskResolver resolver) {
return ActivityTreeUtil.transformStates(rootTask, resolver, ActivityStateInContext::new);
Expand All @@ -58,28 +74,28 @@ private static ActivityStateType getLocalRootState(TaskType task) {
task.getActivityState().getActivity() : null;
}

private static <X> void transformStates(@NotNull TreeNode<X> transformed, @NotNull ActivityPath path,
@Nullable ActivityStateType state, @NotNull TaskType task, @NotNull TaskResolver resolver,
@NotNull ActivityTreeUtil.ActivityStateTransformer<X> transformer) {
private static void processStates(@NotNull ActivityPath path, @Nullable ActivityStateType state,
@NotNull TaskType task, @NotNull TaskResolver resolver,
@NotNull ActivityStateProcessor processor) {
if (state != null && ActivityStateUtil.isDelegated(state)) {
processDelegatedState(transformed, path, state, task, resolver, transformer);
processDelegatedState(path, state, task, resolver, processor);
} else {
processNonDelegatedState(transformed, path, state, task, resolver, transformer);
processNonDelegatedState(path, state, task, resolver, processor);
}
}

private static <X> void processNonDelegatedState(@NotNull TreeNode<X> transformed, @NotNull ActivityPath path,
private static void processNonDelegatedState(@NotNull ActivityPath path,
@Nullable ActivityStateType state, @NotNull TaskType task, @NotNull TaskResolver resolver,
@NotNull ActivityStateTransformer<X> transformer) {
@NotNull ActivityStateProcessor processor) {

List<ActivityStateType> workerStates = collectWorkerStates(path, state, task, resolver);
transformed.setUserObject(transformer.transform(path, state, workerStates, task));
processor.process(path, state, workerStates, task);

if (state != null) {
for (ActivityStateType childState : state.getActivity()) {
TreeNode<X> child = new TreeNode<>();
transformed.add(child);
transformStates(child, path.append(childState.getIdentifier()), childState, task, resolver, transformer);
processor.toNewChild(childState);
processStates(path.append(childState.getIdentifier()), childState, task, resolver, processor);
processor.toParent();
}
}
}
Expand All @@ -95,15 +111,15 @@ private static List<ActivityStateType> collectWorkerStates(@NotNull ActivityPath
}
}

private static <X> void processDelegatedState(@NotNull TreeNode<X> transformed, @NotNull ActivityPath path,
@NotNull ActivityStateType state, @NotNull TaskType task, @NotNull TaskResolver resolver,
@NotNull ActivityTreeUtil.ActivityStateTransformer<X> transformer) {
private static void processDelegatedState(@NotNull ActivityPath path, @NotNull ActivityStateType state,
@NotNull TaskType task, @NotNull TaskResolver resolver,
@NotNull ActivityTreeUtil.ActivityStateProcessor processor) {
ObjectReferenceType delegateTaskRef = getDelegatedTaskRef(state);
TaskType delegateTask = getSubtask(delegateTaskRef, path, task, resolver);
if (delegateTask != null) {
transformStates(transformed, path, getLocalRootState(delegateTask), delegateTask, resolver, transformer);
processStates(path, getLocalRootState(delegateTask), delegateTask, resolver, processor);
} else {
// nothing to report
// nothing to process
}
}

Expand Down Expand Up @@ -131,6 +147,19 @@ private static TaskType getSubtask(ObjectReferenceType subtaskRef, ActivityPath
}
}

/**
* We know this by comparing task OIDs (except for worker ones): all must be the same to be non-delegated.
*/
public static boolean hasDelegatedActivity(TreeNode<ActivityStateInContext> node) {
Set<String> oids = new HashSet<>();
node.acceptDepthFirst(n ->
oids.add(
requireNonNull(n.getUserObject())
.getTask()
.getOid()));
return oids.size() > 1;
}

@Experimental
@FunctionalInterface
public interface ActivityStateTransformer<X> {
Expand All @@ -142,6 +171,33 @@ X transform(@NotNull ActivityPath path, @Nullable ActivityStateType state,
@Nullable List<ActivityStateType> workerStates, @NotNull TaskType task);
}

@Experimental
@FunctionalInterface
public interface ActivityStateProcessor {

/**
* Called when relevant state is found.
*
* Worker states are present in the case of distributed coordinator-workers scenario.
*/
void process(@NotNull ActivityPath path, @Nullable ActivityStateType state,
@Nullable List<ActivityStateType> workerStates, @NotNull TaskType task);

/**
* Called when new child is entered into.
*
* @param childState State of the child. Often can be ignored.
*/
default void toNewChild(@NotNull ActivityStateType childState) {
}

/**
* Called when new child is processed, and we want to return to the parent.
*/
default void toParent() {
}
}

public static @NotNull List<TaskType> getSubtasksForPath(TaskType task, ActivityPath activityPath,
TaskResolver taskResolver) {
return TaskTreeUtil.getResolvedSubtasks(task, taskResolver).stream()
Expand Down Expand Up @@ -208,5 +264,17 @@ public static class ActivityStateInContext {
Stream.ofNullable(activityState),
workerStates != null ? workerStates.stream() : Stream.empty());
}

public boolean isCoordinator() {
return getWorkerStates() != null;
}

@Override
public String toString() {
return "ActivityStateInContext{" +
"activityPath=" + activityPath +
", task=" + task +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public static boolean hasLimitations(WorkBucketType bucket) {

@SuppressWarnings("WeakerAccess")
public static boolean isCoordinator(ActivityStateType state) {
return state.getBucketing() != null &&
return state != null &&
state.getBucketing() != null &&
state.getBucketing().getBucketsProcessingRole() == BucketsProcessingRoleType.COORDINATOR;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.schema.util.task;

import com.evolveum.midpoint.schema.util.task.ActivityTreeUtil.ActivityStateTransformer;
import com.evolveum.midpoint.util.TreeNode;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ActivityStateType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.List;

class TreeTransformingProcessor<X> implements ActivityTreeUtil.ActivityStateProcessor {

@NotNull private final ActivityStateTransformer<X> transformer;
@NotNull private TreeNode<X> currentNode;

TreeTransformingProcessor(@NotNull ActivityStateTransformer<X> transformer, @NotNull TreeNode<X> root) {
this.transformer = transformer;
this.currentNode = root;
}

@Override
public void process(@NotNull ActivityPath path, @Nullable ActivityStateType state,
@Nullable List<ActivityStateType> workerStates, @NotNull TaskType task) {
currentNode.setUserObject(
transformer.transform(path, state, workerStates, task));
}

@Override
public void toNewChild(@NotNull ActivityStateType childState) {
TreeNode<X> child = new TreeNode<>();
currentNode.add(child);
currentNode = child;
}

@Override
public void toParent() {
currentNode = currentNode.getParent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void reconcileWorkers(String oid, Task opTask, OperationResult result)
throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException,
ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException;

void deleteWorkersAndWorkState(String rootTaskOid, boolean deleteWorkers, long subtasksWaitTime, Task operationTask,
void deleteActivityStateAndWorkers(String rootTaskOid, boolean deleteWorkers, long subtasksWaitTime, Task operationTask,
OperationResult parentResult)
throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException,
CommunicationException, ConfigurationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.*;
import javax.xml.namespace.QName;

import com.evolveum.midpoint.repo.common.activity.TaskActivityManager;
import com.evolveum.midpoint.util.exception.IndestructibilityViolationException;

import org.apache.commons.lang.Validate;
Expand Down Expand Up @@ -55,7 +56,6 @@
import com.evolveum.midpoint.provisioning.api.ProvisioningOperationOptions;
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.provisioning.api.ExternalResourceEvent;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.api.RepoAddOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.repo.cache.RepositoryCache;
Expand Down Expand Up @@ -130,6 +130,7 @@ public class ModelController implements ModelService, TaskService, WorkflowServi
@Autowired private ObjectImporter objectImporter;
@Autowired private HookRegistry hookRegistry;
@Autowired private TaskManager taskManager;
@Autowired private TaskActivityManager activityManager;
@Autowired private ScriptingExpressionEvaluator scriptingExpressionEvaluator;
@Autowired private AuditHelper auditHelper;
@Autowired private SecurityEnforcer securityEnforcer;
Expand Down Expand Up @@ -1945,18 +1946,16 @@ public void reconcileWorkers(String oid, Task opTask, OperationResult result)
throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException,
ConfigurationException, ExpressionEvaluationException, ObjectAlreadyExistsException {
securityEnforcer.authorize(AuthorizationConstants.AUTZ_ALL_URL, null, AuthorizationParameters.EMPTY, null, opTask, result);
// taskManager.reconcileWorkers(oid, null, result);
throw new UnsupportedOperationException();
activityManager.reconcileWorkers(oid, result);
}

@Override
public void deleteWorkersAndWorkState(String rootTaskOid, boolean deleteWorkers, long subtasksWaitTime, Task operationTask,
public void deleteActivityStateAndWorkers(String rootTaskOid, boolean deleteWorkers, long subtasksWaitTime, Task operationTask,
OperationResult parentResult)
throws SecurityViolationException, ObjectNotFoundException, SchemaException, ExpressionEvaluationException,
CommunicationException, ConfigurationException {
securityEnforcer.authorize(AuthorizationConstants.AUTZ_ALL_URL, null, AuthorizationParameters.EMPTY, null, operationTask, parentResult);
// taskManager.deleteWorkersAndWorkState(rootTaskOid, deleteWorkers, subtasksWaitTime, parentResult);
throw new UnsupportedOperationException();
activityManager.deleteActivityStateAndWorkers(rootTaskOid, deleteWorkers, subtasksWaitTime, parentResult);
}

private List<PrismObject<TaskType>> preprocessTaskOperation(String oid, ModelAuthorizationAction action, AuditEventType event, Task task, OperationResult result) throws CommunicationException, ObjectNotFoundException, SchemaException, SecurityViolationException, ConfigurationException, ExpressionEvaluationException {
Expand Down

0 comments on commit fab7dc9

Please sign in to comment.