Skip to content

Commit

Permalink
Configurable wf execution tasks serialization scope.
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Oct 2, 2017
1 parent d403251 commit 0b7fe52
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 29 deletions.
Expand Up @@ -703,6 +703,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="scope" type="tns:WfExecutionTasksSerializationScopeType" minOccurs="0" maxOccurs="unbounded">
<xsd:annotation>
<xsd:documentation>
Scope of serialization. The default is "object". If multiple scopes are defined, serialization occurs on each one.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="retryAfter" type="xsd:duration" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand All @@ -720,6 +727,61 @@
</xsd:sequence>
</xsd:complexType>

<xsd:simpleType name="WfExecutionTasksSerializationScopeType">
<xsd:annotation>
<xsd:documentation>
Scope of execution task serialization.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumClass/>
</xsd:appinfo>
</xsd:annotation>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="operation">
<xsd:annotation>
<xsd:documentation>
No two workflow execution tasks from a single operation are allowed to execute at once.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="OPERATION"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="object">
<xsd:annotation>
<xsd:documentation>
No two workflow execution tasks on a given object are allowed to execute at once.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="OBJECT"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="target">
<xsd:annotation>
<xsd:documentation>
No two workflow execution tasks related to give target are allowed to execute at once.
Note that the information on target is not always available (e.g. when executing changes
that do not require approval), so this may not be absolutely reliable.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="TARGET"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
<xsd:enumeration value="global">
<xsd:annotation>
<xsd:documentation>
No two workflow execution tasks are allowed to execute at once.
</xsd:documentation>
<xsd:appinfo>
<jaxb:typesafeEnumMember name="GLOBAL"/>
</xsd:appinfo>
</xsd:annotation>
</xsd:enumeration>
</xsd:restriction>
</xsd:simpleType>

<xsd:simpleType name="LegacyApproversSpecificationUsageType">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -410,38 +410,74 @@ public Task createTask(WfTaskController taskController, Task parentTask, WfConfi
}
// serialization
WfExecutionTasksSerializationType serialization = tasksConfig.getSerialization();
if (serialization != null && !Boolean.FALSE.equals(serialization.isEnabled()) && parentTask != null) {
String groupPrefix = serialization.getGroupPrefix() != null ?
serialization.getGroupPrefix() : DEFAULT_EXECUTION_GROUP_PREFIX_FOR_SERIALIZATION;
String groupName = groupPrefix + parentTask.getTaskIdentifier();
Duration retryAfter;
if (serialization.getRetryAfter() != null) {
if (constraints != null && constraints.getRetryAfter() != null && !constraints.getRetryAfter().equals(serialization.getRetryAfter())) {
LOGGER.warn(
"Workflow configuration: task constraints retryAfter ({}) is different from serialization retryAfter ({}) -- using the latter",
constraints.getRetryAfter(), serialization.getRetryAfter());
if (serialization != null && !Boolean.FALSE.equals(serialization.isEnabled())) {
List<WfExecutionTasksSerializationScopeType> scopes = new ArrayList<>(serialization.getScope());
if (scopes.isEmpty()) {
scopes.add(WfExecutionTasksSerializationScopeType.OBJECT);
}
List<String> groups = new ArrayList<>(scopes.size());
for (WfExecutionTasksSerializationScopeType scope : scopes) {
String groupPrefix = serialization.getGroupPrefix() != null
? serialization.getGroupPrefix() : DEFAULT_EXECUTION_GROUP_PREFIX_FOR_SERIALIZATION;
String groupSuffix = getGroupSuffix(scope, wfContext, parentTask, task);
if (groupSuffix == null) {
continue;
}
retryAfter = serialization.getRetryAfter();
} else if (constraints != null && constraints.getRetryAfter() != null) {
retryAfter = constraints.getRetryAfter();
} else {
retryAfter = XmlTypeConverter.createDuration(DEFAULT_SERIALIZATION_RETRY_TIME);
groups.add(groupPrefix + scope.value() + ":" + groupSuffix);
}
if (taskBean.getExecutionConstraints() == null) {
taskBean.setExecutionConstraints(new TaskExecutionConstraintsType());
if (!groups.isEmpty()) {
Duration retryAfter;
if (serialization.getRetryAfter() != null) {
if (constraints != null && constraints.getRetryAfter() != null && !constraints.getRetryAfter()
.equals(serialization.getRetryAfter())) {
LOGGER.warn(
"Workflow configuration: task constraints retryAfter ({}) is different from serialization retryAfter ({}) -- using the latter",
constraints.getRetryAfter(), serialization.getRetryAfter());
}
retryAfter = serialization.getRetryAfter();
} else if (constraints != null && constraints.getRetryAfter() != null) {
retryAfter = constraints.getRetryAfter();
} else {
retryAfter = XmlTypeConverter.createDuration(DEFAULT_SERIALIZATION_RETRY_TIME);
}
TaskExecutionConstraintsType executionConstraints = taskBean.getExecutionConstraints();
if (executionConstraints == null) {
executionConstraints = new TaskExecutionConstraintsType();
taskBean.setExecutionConstraints(executionConstraints);
}
for (String group : groups) {
executionConstraints
.beginSecondaryGroup()
.group(group)
.groupTaskLimit(1);
}
executionConstraints.setRetryAfter(retryAfter);
LOGGER.trace("Setting groups {} with a limit of 1 for task {}", groups, task);
}
taskBean.getExecutionConstraints()
.beginSecondaryGroup()
.group(groupName)
.groupTaskLimit(1)
.<TaskExecutionConstraintsType>end()
.retryAfter(retryAfter);
LOGGER.trace("Setting group '{}' with a limit of 1 for task {}", groupName, task);
}
}
return task;
}

private String getGroupSuffix(WfExecutionTasksSerializationScopeType scope, WfContextType wfContext, Task parentTask, Task task) {
switch (scope) {
case GLOBAL: return "";
case OBJECT:
String oid = wfContext.getObjectRef() != null ? wfContext.getObjectRef().getOid() : null;
if (oid == null) {
LOGGER.warn("No object OID present, synchronization with the scope of {} couldn't be set up for task {}", scope, task);
return null;
}
return oid;
case TARGET:
return wfContext.getTargetRef() != null ? wfContext.getTargetRef().getOid() : null; // null can occur so let's be silent then
case OPERATION:
return parentTask != null ? parentTask.getTaskIdentifier() : null; // null can occur so let's be silent then
default:
throw new AssertionError("Unknown scope: " + scope);
}
}

// FIXME brutal hack because of objectDelta should be in wfContext when evaluating auto completion expression
public void createProcessorContent() {
if (processorContent != null) {
Expand Down
Expand Up @@ -32,10 +32,7 @@
import com.evolveum.midpoint.test.util.TestUtil;
import com.evolveum.midpoint.util.DebugUtil;
import com.evolveum.midpoint.wf.impl.policy.AbstractWfTestPolicy;
import com.evolveum.midpoint.xml.ns._public.common.common_3.SystemConfigurationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.UserType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkItemType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -359,7 +356,7 @@ public synchronized void onTaskStart(Task task) {
if (!ModelOperationTaskHandler.MODEL_OPERATION_TASK_URI.equals(task.getHandlerUri())) {
return;
}
System.out.println(Thread.currentThread().getName() + ": Starting " + task + ", handler uri " + task.getHandlerUri() + ", group " + task.getGroup());
System.out.println(Thread.currentThread().getName() + ": Starting " + task + ", handler uri " + task.getHandlerUri() + ", groups " + task.getGroups());
if (executing != null) {
exception = new IllegalStateException("Started task " + task + " but another one is already executing: " + executing);
System.out.println(exception.getMessage());
Expand Down

0 comments on commit 0b7fe52

Please sign in to comment.