Skip to content

Commit

Permalink
Option to serialize wf execution tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Sep 30, 2017
1 parent 5ae6e0d commit 4fe7741
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 2 deletions.
Expand Up @@ -478,12 +478,53 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="executionTasksSerialization" minOccurs="0" type="tns:WfExecutionTasksSerializationType">
<xsd:annotation>
<xsd:documentation>
Whether and how to serialize execution tasks (if "execute after all approvals" is set to false).
EXPERIMENTAL
</xsd:documentation>
<xsd:appinfo>
<a:experimental>true</a:experimental>
<a:since>3.6.1</a:since>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
<xsd:element name="primaryChangeProcessor" minOccurs="0" type="tns:PrimaryChangeProcessorConfigurationType" />
<xsd:element name="generalChangeProcessor" minOccurs="0" type="tns:GeneralChangeProcessorConfigurationType" />
</xsd:sequence>
</xsd:complexType>

<xsd:simpleType name="LegacyApproversSpecificationUsageType">
<xsd:complexType name="WfExecutionTasksSerializationType">
<xsd:annotation>
<xsd:documentation>
Whether and how to serialize execution tasks (if "execute after all approvals" is set to false).
EXPERIMENTAL
</xsd:documentation>
<xsd:appinfo>
<a:experimental>true</a:experimental>
<a:since>3.6.1</a:since>
</xsd:appinfo>
</xsd:annotation>
<xsd:sequence>
<xsd:element name="enabled" type="xsd:boolean" minOccurs="0" default="true">
<xsd:annotation>
<xsd:documentation>
Whether this feature is enabled. Default is true if executionTasksSerialization element is present; false otherwise.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="retryInterval" type="xsd:duration" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Interval after which the execution task is to be rescheduled in case of conflict. Default is 10 seconds.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>

<xsd:simpleType name="LegacyApproversSpecificationUsageType">
<xsd:annotation>
<xsd:documentation>
How to deal with legacy approvers specifications, i.e. approvalRef, approvalExpression, approvalSchema,
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ObjectDelta;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.task.api.*;
Expand All @@ -42,6 +43,7 @@
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;
import org.apache.commons.lang.Validate;

import javax.xml.datatype.Duration;
import java.util.*;

import static com.evolveum.midpoint.prism.xml.XmlTypeConverter.createXMLGregorianCalendar;
Expand All @@ -58,6 +60,8 @@ public class WfTaskCreationInstruction<PRC extends ProcessorSpecificContent, PCS

private static final Trace LOGGER = TraceManager.getTrace(WfTaskCreationInstruction.class);
private static final Integer DEFAULT_PROCESS_CHECK_INTERVAL = 30;
private static final String DEFAULT_EXECUTION_GROUP_PREFIX_FOR_SERIALIZATION = "$approval-task-group$:";
private static final long DEFAULT_SERIALIZATION_RETRY_TIME = 10000L;

private final ChangeProcessor changeProcessor;

Expand Down Expand Up @@ -396,6 +400,23 @@ public Task createTask(WfTaskController taskController, Task parentTask, WfConfi
}
task.setWorkflowContext(wfContext);

WfExecutionTasksSerializationType serialization = wfConfigurationType != null ? wfConfigurationType.getExecutionTasksSerialization() : null;
if (parentTask != null && executeModelOperationHandler && serialization != null && !Boolean.FALSE.equals(serialization.isEnabled())) {
TaskType taskBean = task.getTaskPrismObject().asObjectable();
// TODO think about 3.7 (setting the group influences also the local execution possibilities)
// String groupPrefix = serialization.getExecutionGroupPrefix() != null ?
// serialization.getExecutionGroupPrefix() : DEFAULT_EXECUTION_GROUP_PREFIX_FOR_SERIALIZATION;
String groupName = DEFAULT_EXECUTION_GROUP_PREFIX_FOR_SERIALIZATION + parentTask.getTaskIdentifier();
Duration retryAfter = serialization.getRetryInterval() != null ?
serialization.getRetryInterval() : XmlTypeConverter.createDuration(DEFAULT_SERIALIZATION_RETRY_TIME);
taskBean.setExecutionConstraints(
new TaskExecutionConstraintsType()
.group(groupName)
.groupTaskLimit(1)
.retryAfter(retryAfter));
LOGGER.trace("Setting group '{}' with a limit of 1 for task {}", groupName, task);
}

return task;
}

Expand Down
Expand Up @@ -202,7 +202,10 @@ public void initSystem(Task initTask, OperationResult initResult) throws Excepti
super.initSystem(initTask, initResult);
modelService.postInit(initResult);

repoAddObjectFromFile(getSystemConfigurationFile(), initResult);
PrismObject<SystemConfigurationType> sysconfig = prismContext.parseObject(getSystemConfigurationFile());
updateSystemConfiguration(sysconfig.asObjectable());
repoAddObject(sysconfig, initResult);

repoAddObjectFromFile(ROLE_SUPERUSER_FILE, initResult);
userAdministrator = repoAddObjectFromFile(USER_ADMINISTRATOR_FILE, initResult);
login(userAdministrator);
Expand Down Expand Up @@ -245,6 +248,10 @@ public void initSystem(Task initTask, OperationResult initResult) throws Excepti
userTemplateAssigningRole1aOidAfter = repoAddObjectFromFile(USER_TEMPLATE_ASSIGNING_ROLE_1A_AFTER, initResult).getOid();
}

protected void updateSystemConfiguration(SystemConfigurationType systemConfiguration) {
// nothing to do by default
}

protected File getSystemConfigurationFile() {
return SYSTEM_CONFIGURATION_FILE;
}
Expand Down
@@ -0,0 +1,187 @@
/*
* Copyright (c) 2010-2017 Evolveum
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.evolveum.midpoint.wf.impl.policy.other;

import com.evolveum.midpoint.model.impl.controller.ModelOperationTaskHandler;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ObjectDelta;
import com.evolveum.midpoint.prism.delta.builder.DeltaBuilder;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.schema.constants.ObjectTypes;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskListener;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskRunResult;
import com.evolveum.midpoint.test.util.TestUtil;
import com.evolveum.midpoint.util.DebugUtil;
import com.evolveum.midpoint.wf.impl.policy.AbstractWfTestPolicy;
import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.UserType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkItemType;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.testng.annotations.Test;

import java.io.File;
import java.util.Collections;
import java.util.List;

import static com.evolveum.midpoint.model.api.ModelExecuteOptions.createExecuteImmediatelyAfterApproval;
import static com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionStatusType.CLOSED;

/**
* @author mederly
*/
@ContextConfiguration(locations = {"classpath:ctx-workflow-test-main.xml"})
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
public class TestParallelApprovals extends AbstractWfTestPolicy {

private static final File ROLE_ROLE51A_FILE = new File(TEST_RESOURCE_DIR, "role-role51a-slow.xml");
private static final File ROLE_ROLE52A_FILE = new File(TEST_RESOURCE_DIR, "role-role52a-slow.xml");
private static final File ROLE_ROLE53A_FILE = new File(TEST_RESOURCE_DIR, "role-role53a-slow.xml");

private String roleRole51aOid, roleRole52aOid, roleRole53aOid;

@Override
protected PrismObject<UserType> getDefaultActor() {
return userAdministrator;
}

@Override
public void initSystem(Task initTask, OperationResult initResult) throws Exception {
super.initSystem(initTask, initResult);

roleRole51aOid = repoAddObjectFromFile(ROLE_ROLE51A_FILE, initResult).getOid();
roleRole52aOid = repoAddObjectFromFile(ROLE_ROLE52A_FILE, initResult).getOid();
roleRole53aOid = repoAddObjectFromFile(ROLE_ROLE53A_FILE, initResult).getOid();

DebugUtil.setPrettyPrintBeansAs(PrismContext.LANG_YAML);
}

@Override
protected void updateSystemConfiguration(SystemConfigurationType systemConfiguration) {
super.updateSystemConfiguration(systemConfiguration);
systemConfiguration.getWorkflowConfiguration()
.beginExecutionTasksSerialization()
.retryInterval(XmlTypeConverter.createDuration(1000)); // makes tests run faster
}

@Test
public void test100ParallelApprovals() throws Exception {
final String TEST_NAME = "test100ParallelApprovals";
TestUtil.displayTestTitle(this, TEST_NAME);
login(userAdministrator);

Task task = createTask(TEST_NAME);
OperationResult result = task.getResult();

// WHEN
displayWhen(TEST_NAME);
ObjectDelta<UserType> assignDelta = DeltaBuilder.deltaFor(UserType.class, prismContext)
.item(UserType.F_ASSIGNMENT).add(
ObjectTypeUtil.createAssignmentTo(roleRole51aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole52aOid, ObjectTypes.ROLE, prismContext),
ObjectTypeUtil.createAssignmentTo(roleRole53aOid, ObjectTypes.ROLE, prismContext))
.asObjectDeltaCast(userJackOid);
executeChanges(assignDelta, createExecuteImmediatelyAfterApproval(), task, result); // should start approval processes
assertNotAssignedRole(userJackOid, roleRole51aOid, task, result);
assertNotAssignedRole(userJackOid, roleRole52aOid, task, result);
assertNotAssignedRole(userJackOid, roleRole53aOid, task, result);

display("Task after operation", task);
String rootTaskOid = wfTaskUtil.getRootTaskOid(task);
display("root task", getTask(rootTaskOid));

CheckingTaskListener listener = new CheckingTaskListener(rootTaskOid);
taskManager.registerTaskListener(listener);

List<WorkItemType> workItems = getWorkItems(task, result);
display("work items", workItems);
display("approving work items");
for (WorkItemType workItem : workItems) {
workflowManager.completeWorkItem(workItem.getExternalId(), true, null, null, null, result);
}

waitForTaskCloseOrSuspend(rootTaskOid, 120000, 1000);

// THEN

PrismObject<TaskType> rootTask = getTask(rootTaskOid);
if (listener.getException() != null || rootTask.asObjectable().getExecutionStatus() != CLOSED) {
fail("root task has not completed; recorded exception = " + listener.getException());
}

PrismObject<UserType> jack = getUser(userJackOid);
assertAssignedRole(jack, roleRole51aOid);
assertAssignedRole(jack, roleRole52aOid);
assertAssignedRole(jack, roleRole53aOid);
}

private class CheckingTaskListener implements TaskListener {

private String rootTaskOid;
private Task executing;
private RuntimeException exception;

public CheckingTaskListener(String rootTaskOid) {
this.rootTaskOid = rootTaskOid;
}

public RuntimeException getException() {
return exception;
}

@Override
public synchronized void onTaskStart(Task task) {
if (!ModelOperationTaskHandler.MODEL_OPERATION_TASK_URI.equals(task.getHandlerUri())) {
return;
}
System.out.println("Starting " + task + ", handler uri " + task.getHandlerUri());
if (executing != null) {
exception = new IllegalStateException("Started task " + task + " but another one is already executing: " + executing);
System.out.println(exception.getMessage());
// suspend root task in order to fail faster
taskManager.suspendTasks(Collections.singleton(rootTaskOid), TaskManager.DO_NOT_WAIT, new OperationResult("dummy"));
}
executing = task;
}

@Override
public synchronized void onTaskFinish(Task task, TaskRunResult runResult) {
if (!ModelOperationTaskHandler.MODEL_OPERATION_TASK_URI.equals(task.getHandlerUri())) {
return;
}
System.out.println("Finishing " + task + ", handler uri " + task.getHandlerUri());
assert executing.getOid().equals(task.getOid());
executing = null;
}

@Override
public void onTaskThreadStart(Task task, boolean isRecovering) {
// ignoring
}

@Override
public void onTaskThreadFinish(Task task) {
// ignoring
}
}
}
@@ -0,0 +1,43 @@
<!--
~ Copyright (c) 2010-2017 Evolveum
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<role xmlns="http://midpoint.evolveum.com/xml/ns/public/common/common-3"
oid="00000001-d34d-b33f-f00d-00000000051a">
<name>Role51a-slow</name>
<inducement>
<focusMappings>
<mapping>
<source>
<path>description</path>
</source>
<expression>
<script>
<code>
System.out.println('Sleeping for 1 second (51a)...')
Thread.sleep(1000)
System.out.println('Sleeping done (51a)...')
return description
</code>
</script>
</expression>
<target>
<path>description</path>
</target>
</mapping>
</focusMappings>
</inducement>
<approverRef oid="00000000-0000-0000-0000-000000000002" type="UserType"/> <!-- administrator -->
</role>

0 comments on commit 4fe7741

Please sign in to comment.