Skip to content

Commit

Permalink
Serializing wf execution tasks on object OID (not on request id).
Browse files Browse the repository at this point in the history
(cherry picked from commit 83ffdaf)
  • Loading branch information
mederly committed Oct 2, 2017
1 parent 2814088 commit d403251
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 20 deletions.
Expand Up @@ -80,7 +80,7 @@ public class BaseModelInvocationProcessingHelper {
*/
public WfTaskCreationInstruction createInstructionForRoot(ChangeProcessor changeProcessor, ModelContext modelContext, Task taskFromModel, ModelContext contextForRoot, OperationResult result) throws SchemaException {

WfTaskCreationInstruction instruction;
WfTaskCreationInstruction<?, ?> instruction;
if (contextForRoot != null) {
instruction = WfTaskCreationInstruction.createModelOnly(changeProcessor, contextForRoot);
} else {
Expand All @@ -92,6 +92,7 @@ public WfTaskCreationInstruction createInstructionForRoot(ChangeProcessor change
instruction.setTaskOwner(taskFromModel.getOwner());
instruction.setCreateTaskAsWaiting();

instruction.setObjectRef(modelContext, result);
instruction.setRequesterRef(getRequester(taskFromModel, result));
return instruction;
}
Expand Down
Expand Up @@ -271,8 +271,9 @@ private WfTask submitTask0(ModelContext context, ObjectTreeDeltas changesWithout
WfConfigurationType wfConfigurationType, OperationResult result) throws SchemaException, ObjectNotFoundException {
if (changesWithoutApproval != null && !changesWithoutApproval.isEmpty() && executionMode != ALL_AFTERWARDS) {
ModelContext task0context = contextCopyWithDeltasReplaced(context, changesWithoutApproval);
WfTaskCreationInstruction instruction0 = WfTaskCreationInstruction.createModelOnly(rootWfTask.getChangeProcessor(), task0context);
WfTaskCreationInstruction<?, ?> instruction0 = WfTaskCreationInstruction.createModelOnly(rootWfTask.getChangeProcessor(), task0context);
instruction0.setTaskName("Executing changes that do not require approval");
instruction0.setObjectRef(context, result);
return wfTaskController.submitWfTask(instruction0, rootWfTask, wfConfigurationType, result);
} else {
return null;
Expand Down
Expand Up @@ -93,6 +93,8 @@ public class AbstractWfTestPolicy extends AbstractModelImplementationIntegration
public static final File USER_ADMINISTRATOR_FILE = new File(TEST_RESOURCE_DIR, "user-administrator.xml");

protected static final File USER_JACK_FILE = new File(TEST_RESOURCE_DIR, "user-jack.xml");
protected static final File USER_BOB_FILE = new File(TEST_RESOURCE_DIR, "user-bob.xml");
protected static final File USER_CHUCK_FILE = new File(TEST_RESOURCE_DIR, "user-chuck.xml");
protected static final File USER_LEAD1_FILE = new File(TEST_RESOURCE_DIR, "user-lead1.xml");
protected static final File USER_LEAD1_DEPUTY_1_FILE = new File(TEST_RESOURCE_DIR, "user-lead1-deputy1.xml");
protected static final File USER_LEAD1_DEPUTY_2_FILE = new File(TEST_RESOURCE_DIR, "user-lead1-deputy2.xml");
Expand Down Expand Up @@ -134,6 +136,8 @@ public class AbstractWfTestPolicy extends AbstractModelImplementationIntegration
protected static final String USER_ADMINISTRATOR_OID = SystemObjectsType.USER_ADMINISTRATOR.value();

protected String userJackOid;
protected String userBobOid;
protected String userChuckOid;
protected String userLead1Oid;
protected String userLead1Deputy1Oid;
protected String userLead1Deputy2Oid;
Expand Down Expand Up @@ -202,6 +206,8 @@ public void initSystem(Task initTask, OperationResult initResult) throws Excepti
metaroleApproveUnassign = repoAddObjectFromFile(METAROLE_APPROVE_UNASSIGN_FILE, initResult).getOid();

userJackOid = repoAddObjectFromFile(USER_JACK_FILE, initResult).getOid();
userBobOid = repoAddObjectFromFile(USER_BOB_FILE, initResult).getOid();
userChuckOid = repoAddObjectFromFile(USER_CHUCK_FILE, initResult).getOid();
roleRole1Oid = repoAddObjectFromFile(ROLE_ROLE1_FILE, initResult).getOid();
roleRole1aOid = repoAddObjectFromFile(ROLE_ROLE1A_FILE, initResult).getOid();
roleRole1bOid = repoAddObjectFromFile(ROLE_ROLE1B_FILE, initResult).getOid();
Expand Down
Expand Up @@ -41,11 +41,16 @@
import org.testng.annotations.Test;

import java.io.File;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static com.evolveum.midpoint.model.api.ModelExecuteOptions.createExecuteImmediatelyAfterApproval;
import static com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionStatusType.CLOSED;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;

/**
* @author mederly
Expand All @@ -54,11 +59,12 @@
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestParallelApprovals extends AbstractWfTestPolicy {

private static final File ROLE_ROLE50A_FILE = new File(TEST_RESOURCE_DIR, "role-role50a-slow.xml");
private static final File ROLE_ROLE51A_FILE = new File(TEST_RESOURCE_DIR, "role-role51a-slow.xml");
private static final File ROLE_ROLE52A_FILE = new File(TEST_RESOURCE_DIR, "role-role52a-slow.xml");
private static final File ROLE_ROLE53A_FILE = new File(TEST_RESOURCE_DIR, "role-role53a-slow.xml");

private String roleRole51aOid, roleRole52aOid, roleRole53aOid;
private String roleRole50aOid, roleRole51aOid, roleRole52aOid, roleRole53aOid;

@Override
protected PrismObject<UserType> getDefaultActor() {
Expand All @@ -69,6 +75,7 @@ protected PrismObject<UserType> getDefaultActor() {
public void initSystem(Task initTask, OperationResult initResult) throws Exception {
super.initSystem(initTask, initResult);

roleRole50aOid = repoAddObjectFromFile(ROLE_ROLE50A_FILE, initResult).getOid();
roleRole51aOid = repoAddObjectFromFile(ROLE_ROLE51A_FILE, initResult).getOid();
roleRole52aOid = repoAddObjectFromFile(ROLE_ROLE52A_FILE, initResult).getOid();
roleRole53aOid = repoAddObjectFromFile(ROLE_ROLE53A_FILE, initResult).getOid();
Expand All @@ -85,6 +92,8 @@ protected void updateSystemConfiguration(SystemConfigurationType systemConfigura
.retryAfter(XmlTypeConverter.createDuration(1000)); // makes tests run faster
}

private CheckingTaskListener listener;

@Test
public void test100ParallelApprovals() throws Exception {
final String TEST_NAME = "test100ParallelApprovals";
Expand All @@ -98,6 +107,7 @@ public void test100ParallelApprovals() throws Exception {
displayWhen(TEST_NAME);
ObjectDelta<UserType> assignDelta = DeltaBuilder.deltaFor(UserType.class, prismContext)
.item(UserType.F_ASSIGNMENT).add(
ObjectTypeUtil.createAssignmentTo(roleRole50aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole51aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole52aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole53aOid, ObjectTypes.ROLE, prismContext))
Expand All @@ -111,66 +121,266 @@ public void test100ParallelApprovals() throws Exception {
String rootTaskOid = wfTaskUtil.getRootTaskOid(task);
display("root task", getTask(rootTaskOid));

CheckingTaskListener listener = new CheckingTaskListener(rootTaskOid);
if (listener != null) {
taskManager.unregisterTaskListener(listener);
}
listener = new CheckingTaskListener(singleton(rootTaskOid));
taskManager.registerTaskListener(listener);

approveAllWorkItems(task, result);

waitForTaskCloseOrSuspend(rootTaskOid, 120000, 1000);

// THEN

PrismObject<TaskType> rootTask = getTask(rootTaskOid);
assertNull("Exception has occurred " + listener.getException(), listener.getException());
assertEquals("Wrong root task1 status", CLOSED, rootTask.asObjectable().getExecutionStatus());

PrismObject<UserType> jack = getUser(userJackOid);
assertAssignedRole(jack, roleRole50aOid);
assertAssignedRole(jack, roleRole51aOid);
assertAssignedRole(jack, roleRole52aOid);
assertAssignedRole(jack, roleRole53aOid);
}

@Test
public void test110ParallelApprovalsAdd() throws Exception {
final String TEST_NAME = "test110ParallelApprovalsAdd";
TestUtil.displayTestTitle(this, TEST_NAME);
login(userAdministrator);

Task task = createTask(TEST_NAME);
OperationResult result = task.getResult();

if (listener != null) {
taskManager.unregisterTaskListener(listener);
}
listener = new CheckingTaskListener();
taskManager.registerTaskListener(listener);

// WHEN
displayWhen(TEST_NAME);
UserType alice = prismContext.createObjectable(UserType.class)
.name("alice")
.assignment(ObjectTypeUtil.createAssignmentTo(roleRole50aOid, ObjectTypes.ROLE, prismContext))
.assignment(ObjectTypeUtil.createAssignmentTo(roleRole51aOid, ObjectTypes.ROLE, prismContext))
.assignment(ObjectTypeUtil.createAssignmentTo(roleRole52aOid, ObjectTypes.ROLE, prismContext))
.assignment(ObjectTypeUtil.createAssignmentTo(roleRole53aOid, ObjectTypes.ROLE, prismContext));
executeChanges(ObjectDelta.createAddDelta(alice.asPrismObject()), createExecuteImmediatelyAfterApproval(), task, result); // should start approval processes

display("Task after operation", task);
String rootTaskOid = wfTaskUtil.getRootTaskOid(task);
display("root task", getTask(rootTaskOid));

listener.setTasksToSuspendOnError(singleton(rootTaskOid));

approveAllWorkItems(task, result);
waitForTaskCloseOrSuspend(rootTaskOid, 120000, 1000);

// THEN

PrismObject<TaskType> rootTask = getTask(rootTaskOid);
assertNull("Exception has occurred " + listener.getException(), listener.getException());
assertEquals("Wrong root task1 status", CLOSED, rootTask.asObjectable().getExecutionStatus());

PrismObject<UserType> aliceAfter = findUserByUsername("alice");
assertAssignedRole(aliceAfter, roleRole50aOid);
assertAssignedRole(aliceAfter, roleRole51aOid);
assertAssignedRole(aliceAfter, roleRole52aOid);
assertAssignedRole(aliceAfter, roleRole53aOid);
}

public void approveAllWorkItems(Task task, OperationResult result) throws Exception {
List<WorkItemType> workItems = getWorkItems(task, result);
display("work items", workItems);
display("approving work items");
for (WorkItemType workItem : workItems) {
workflowManager.completeWorkItem(workItem.getExternalId(), true, null, null, null, result);
}
}

waitForTaskCloseOrSuspend(rootTaskOid, 120000, 1000);
@Test
public void test120ParallelApprovalsInTwoOperations() throws Exception {
final String TEST_NAME = "test120ParallelApprovalsInTwoOperations";
TestUtil.displayTestTitle(this, TEST_NAME);
login(userAdministrator);

Task task = createTask(TEST_NAME);
Task task1 = createTask(TEST_NAME);
Task task2 = createTask(TEST_NAME);
OperationResult result = task.getResult();

if (listener != null) {
taskManager.unregisterTaskListener(listener);
}
listener = new CheckingTaskListener();
taskManager.registerTaskListener(listener);

// WHEN
displayWhen(TEST_NAME);
ObjectDelta<UserType> assignDelta1 = DeltaBuilder.deltaFor(UserType.class, prismContext)
.item(UserType.F_ASSIGNMENT).add(
ObjectTypeUtil.createAssignmentTo(roleRole50aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole51aOid, ObjectTypes.ROLE, prismContext))
.asObjectDeltaCast(userBobOid);
executeChanges(assignDelta1, createExecuteImmediatelyAfterApproval(), task1, result); // should start approval processes
ObjectDelta<UserType> assignDelta2 = DeltaBuilder.deltaFor(UserType.class, prismContext)
.item(UserType.F_ASSIGNMENT).add(
ObjectTypeUtil.createAssignmentTo(roleRole50aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole52aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole53aOid, ObjectTypes.ROLE, prismContext))
.asObjectDeltaCast(userBobOid);
executeChanges(assignDelta2, createExecuteImmediatelyAfterApproval(), task2, result); // should start approval processes
assertNotAssignedRole(userBobOid, roleRole51aOid, task, result);
assertNotAssignedRole(userBobOid, roleRole52aOid, task, result);
assertNotAssignedRole(userBobOid, roleRole53aOid, task, result);

display("Task1 after operation", task1);
display("Task2 after operation", task2);
String rootTask1Oid = wfTaskUtil.getRootTaskOid(task1);
String rootTask2Oid = wfTaskUtil.getRootTaskOid(task2);
display("root task 1", getTask(rootTask1Oid));
display("root task 2", getTask(rootTask2Oid));

assertNull("Exception has occurred " + listener.getException(), listener.getException());
listener.setTasksToSuspendOnError(Arrays.asList(rootTask1Oid, rootTask2Oid));

approveAllWorkItems(task, result);

waitForTaskCloseOrSuspend(rootTask1Oid, 120000, 1000);
waitForTaskCloseOrSuspend(rootTask2Oid, 120000, 1000);

// THEN

PrismObject<TaskType> rootTask = getTask(rootTaskOid);
if (listener.getException() != null || rootTask.asObjectable().getExecutionStatus() != CLOSED) {
fail("root task has not completed; recorded exception = " + listener.getException());
PrismObject<TaskType> rootTask1 = getTask(rootTask1Oid);
PrismObject<TaskType> rootTask2 = getTask(rootTask2Oid);
assertNull("Exception has occurred " + listener.getException(), listener.getException());
assertEquals("Wrong root task1 status", CLOSED, rootTask1.asObjectable().getExecutionStatus());
assertEquals("Wrong root task2 status", CLOSED, rootTask2.asObjectable().getExecutionStatus());

PrismObject<UserType> bob = getUser(userBobOid);
assertAssignedRole(bob, roleRole50aOid);
assertAssignedRole(bob, roleRole51aOid);
assertAssignedRole(bob, roleRole52aOid);
assertAssignedRole(bob, roleRole53aOid);
}

@Test
public void test130ParallelApprovalsInThreeSummarizingOperations() throws Exception {
final String TEST_NAME = "test130ParallelApprovalsInThreeSummarizingOperations";
TestUtil.displayTestTitle(this, TEST_NAME);
login(userAdministrator);

Task task = createTask(TEST_NAME);
Task task1 = createTask(TEST_NAME);
Task task2 = createTask(TEST_NAME);
Task task3 = createTask(TEST_NAME);
OperationResult result = task.getResult();

if (listener != null) {
taskManager.unregisterTaskListener(listener);
}
listener = new CheckingTaskListener();
taskManager.registerTaskListener(listener);

PrismObject<UserType> jack = getUser(userJackOid);
assertAssignedRole(jack, roleRole51aOid);
assertAssignedRole(jack, roleRole52aOid);
assertAssignedRole(jack, roleRole53aOid);
// WHEN
displayWhen(TEST_NAME);
// three separate approval contexts, "summarizing" as the deltas are executed after all approvals
assignRole(userChuckOid, roleRole51aOid, task1, result);
assignRole(userChuckOid, roleRole52aOid, task2, result);
assignRole(userChuckOid, roleRole53aOid, task3, result);
assertNotAssignedRole(userChuckOid, roleRole51aOid, task, result);
assertNotAssignedRole(userChuckOid, roleRole52aOid, task, result);
assertNotAssignedRole(userChuckOid, roleRole53aOid, task, result);

display("Task1 after operation", task1);
display("Task2 after operation", task2);
display("Task3 after operation", task3);
String rootTask1Oid = wfTaskUtil.getRootTaskOid(task1);
String rootTask2Oid = wfTaskUtil.getRootTaskOid(task2);
String rootTask3Oid = wfTaskUtil.getRootTaskOid(task3);
display("root task 1", getTask(rootTask1Oid));
display("root task 2", getTask(rootTask2Oid));
display("root task 3", getTask(rootTask3Oid));

assertNull("Exception has occurred " + listener.getException(), listener.getException());
listener.setTasksToSuspendOnError(Arrays.asList(rootTask1Oid, rootTask2Oid, rootTask3Oid));

approveAllWorkItems(task, result);

waitForTaskCloseOrSuspend(rootTask1Oid, 120000, 1000);
waitForTaskCloseOrSuspend(rootTask2Oid, 120000, 1000);
waitForTaskCloseOrSuspend(rootTask3Oid, 120000, 1000);

// THEN

PrismObject<TaskType> rootTask1 = getTask(rootTask1Oid);
PrismObject<TaskType> rootTask2 = getTask(rootTask2Oid);
PrismObject<TaskType> rootTask3 = getTask(rootTask3Oid);
assertNull("Exception has occurred " + listener.getException(), listener.getException());
assertEquals("Wrong root task1 status", CLOSED, rootTask1.asObjectable().getExecutionStatus());
assertEquals("Wrong root task2 status", CLOSED, rootTask2.asObjectable().getExecutionStatus());
assertEquals("Wrong root task3 status", CLOSED, rootTask3.asObjectable().getExecutionStatus());

PrismObject<UserType> chuck = getUser(userChuckOid);
assertAssignedRole(chuck, roleRole51aOid);
assertAssignedRole(chuck, roleRole52aOid);
assertAssignedRole(chuck, roleRole53aOid);
}

private class CheckingTaskListener implements TaskListener {

private String rootTaskOid;
private Collection<String> tasksToSuspendOnError;
private Task executing;
private RuntimeException exception;

public CheckingTaskListener(String rootTaskOid) {
this.rootTaskOid = rootTaskOid;
public CheckingTaskListener() {
this.tasksToSuspendOnError = emptySet();
}

public CheckingTaskListener(Collection<String> tasksToSuspendOnError) {
this.tasksToSuspendOnError = tasksToSuspendOnError;
}

public RuntimeException getException() {
return exception;
}

public void setTasksToSuspendOnError(Collection<String> tasksToSuspendOnError) {
this.tasksToSuspendOnError = tasksToSuspendOnError;
if (exception != null) {
suspendTasks();
}
}

@Override
public synchronized void onTaskStart(Task task) {
if (!ModelOperationTaskHandler.MODEL_OPERATION_TASK_URI.equals(task.getHandlerUri())) {
return;
}
System.out.println("Starting " + task + ", handler uri " + task.getHandlerUri());
System.out.println(Thread.currentThread().getName() + ": Starting " + task + ", handler uri " + task.getHandlerUri() + ", group " + task.getGroup());
if (executing != null) {
exception = new IllegalStateException("Started task " + task + " but another one is already executing: " + executing);
System.out.println(exception.getMessage());
// suspend root task in order to fail faster
taskManager.suspendTasks(Collections.singleton(rootTaskOid), TaskManager.DO_NOT_WAIT, new OperationResult("dummy"));
display("newly started task", task);
display("already executing task", executing);
suspendTasks();
}
executing = task;
}

public void suspendTasks() {
// suspend root task in order to fail faster
taskManager.suspendTasks(tasksToSuspendOnError, TaskManager.DO_NOT_WAIT, new OperationResult("dummy"));
}

@Override
public synchronized void onTaskFinish(Task task, TaskRunResult runResult) {
if (!ModelOperationTaskHandler.MODEL_OPERATION_TASK_URI.equals(task.getHandlerUri())) {
return;
}
System.out.println("Finishing " + task + ", handler uri " + task.getHandlerUri());
System.out.println(Thread.currentThread().getName() + ": Finishing " + task + ", handler uri " + task.getHandlerUri());
assert executing.getOid().equals(task.getOid());
executing = null;
}
Expand Down

0 comments on commit d403251

Please sign in to comment.