Skip to content

Commit

Permalink
merge: #10779
Browse files Browse the repository at this point in the history
10779: Fix activation of Parallel Gateway through Process Instance Modification r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
Activating a Parallel Gateway was not possible due to a failing check if the correct amount of active sequence have been taken. This PR will increment the amount of taken sequence upon when the activate instructions contain a parallel gateway. This is done in the `ProcessInstance.MODIFIED` event applier.

One edge case that is not supported with this change is the following scenario:
<img width="684" alt="image" src="https://user-images.githubusercontent.com/5787702/196986426-bdff96b0-e47f-4063-956e-c007e1aa2a13.png">
If I want to modify this process by putting an activation on task A and on the joining gateway, the gateway will be activated immediately. Task A will be executed and will start waiting at the gateway again, meaning the process will be stuck). The gateway will not wait for task A before continuing. I could not think of a simple solution to solve this problem.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10518



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 26, 2022
2 parents b73753d + dc329c0 commit c5f0f0a
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationVariableInstruction;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down Expand Up @@ -167,6 +168,9 @@ public void processRecord(
return;
}

final var extendedRecord = new ProcessInstanceModificationRecord();
extendedRecord.setProcessInstanceKey(value.getProcessInstanceKey());

final var requiredKeysForActivation =
value.getActivateInstructions().stream()
.flatMap(
Expand All @@ -186,9 +190,9 @@ public void processRecord(
process,
instruction));

activatedElementKeys
.getFlowScopeKeys()
.forEach(value::addActivatedElementInstanceKey);
extendedRecord.addActivateInstruction(
((ProcessInstanceModificationActivateInstruction) instruction)
.addAncestorScopeKeys(activatedElementKeys.getFlowScopeKeys()));

return activatedElementKeys.getFlowScopeKeys().stream();
})
Expand All @@ -201,6 +205,7 @@ public void processRecord(
.getTerminateInstructions()
.forEach(
instruction -> {
extendedRecord.addTerminateInstruction(instruction);
final var elementInstance =
elementInstanceState.getInstance(instruction.getElementInstanceKey());
if (elementInstance == null) {
Expand All @@ -215,10 +220,11 @@ public void processRecord(
terminateFlowScopes(flowScopeKey, sideEffectQueue, requiredKeysForActivation);
});

stateWriter.appendFollowUpEvent(eventKey, ProcessInstanceModificationIntent.MODIFIED, value);
stateWriter.appendFollowUpEvent(
eventKey, ProcessInstanceModificationIntent.MODIFIED, extendedRecord);

responseWriter.writeEventOnCommand(
eventKey, ProcessInstanceModificationIntent.MODIFIED, value, command);
eventKey, ProcessInstanceModificationIntent.MODIFIED, extendedRecord, command);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private void registerProcessInstanceCreationAppliers(final MutableZeebeState sta
private void registerProcessInstanceModificationAppliers(final MutableZeebeState state) {
register(
ProcessInstanceModificationIntent.MODIFIED,
new ProcessInstanceModifiedEventApplier(state.getElementInstanceState()));
new ProcessInstanceModifiedEventApplier(
state.getElementInstanceState(), state.getProcessState()));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,75 @@
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowNode;
import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.Collections;

final class ProcessInstanceModifiedEventApplier
implements TypedEventApplier<
ProcessInstanceModificationIntent, ProcessInstanceModificationRecord> {

private final MutableElementInstanceState elementInstanceState;
private final MutableProcessState processState;

public ProcessInstanceModifiedEventApplier(
final MutableElementInstanceState elementInstanceState) {
final MutableElementInstanceState elementInstanceState,
final MutableProcessState processState) {
this.elementInstanceState = elementInstanceState;
this.processState = processState;
}

@Override
public void applyState(final long key, final ProcessInstanceModificationRecord value) {
value.getActivatedElementInstanceKeys().stream()
if (value.hasActivateInstructions()) {
clearInterruptedState(value);
incrementNumberOfTakenSequenceFlows(value);
}
}

private void clearInterruptedState(final ProcessInstanceModificationRecord value) {
value.getAncestorScopeKeys().stream()
.map(elementInstanceState::getInstance)
.filter(ElementInstance::isInterrupted)
.forEach(
instance ->
elementInstanceState.updateInstance(
instance.getKey(), ElementInstance::clearInterruptedState));
}

private void incrementNumberOfTakenSequenceFlows(final ProcessInstanceModificationRecord value) {
final ElementInstance processInstance =
elementInstanceState.getInstance(value.getProcessInstanceKey());
final var process =
processState
.getProcessByKey(processInstance.getValue().getProcessDefinitionKey())
.getProcess();
value
.getActivateInstructions()
.forEach(
instruction -> {
final var element =
process.getElementById(instruction.getElementId(), ExecutableFlowNode.class);
if (!element.getElementType().equals(BpmnElementType.PARALLEL_GATEWAY)) {
return;
}

// Parent scopes are created in order from outer scope to inner scope. This means that
// the parent flow scope of the element that must be activated will always be the
// highest key in the set.
final var parentFlowScopeKey = Collections.max(instruction.getAncestorScopeKeys());
element
.getIncoming()
.forEach(
incoming ->
elementInstanceState.incrementNumberOfTakenSequenceFlows(
parentFlowScopeKey, element.getId(), incoming.getId()));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -867,4 +867,42 @@ record -> record.getValue().getBpmnElementType(),
.describedAs("Expected to create the timer only once")
.containsOnlyOnce(TimerIntent.CREATED);
}

@Test
public void shouldActivateParallelGateway() {
// Given
ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.parallelGateway("fork")
.userTask("A")
.moveToLastGateway()
.userTask("B")
.done())
.deploy();

// When
final long processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withStartInstruction("fork").create();

// Then
Assertions.assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limit("fork", ProcessInstanceIntent.ELEMENT_COMPLETED))
.extracting(record -> record.getValue().getBpmnElementType(), Record::getIntent)
.describedAs("Expected to start process instance at parallel gateway")
.containsSequence(
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.PARALLEL_GATEWAY, ProcessInstanceIntent.ACTIVATE_ELEMENT))
.containsSubsequence(
tuple(BpmnElementType.PARALLEL_GATEWAY, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.PARALLEL_GATEWAY, ProcessInstanceIntent.ELEMENT_COMPLETED))
.doesNotContain(
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.START_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,47 @@ public void shouldActivateElementInInterruptedFlowScope() {
.isEqualTo(2);
}

@Test
public void shouldActivateParallelGateway() {
// given
ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.parallelGateway("fork")
.serviceTask("A", a -> a.zeebeJobType("A"))
.endEvent()
.moveToNode("fork")
.serviceTask("B", b -> b.zeebeJobType("B"))
.endEvent()
.done())
.deploy();

final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();

Assertions.assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(2))
.hasSize(2);

// when
ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.modification()
.activateElement("fork")
.modify();

// then
Assertions.assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(4))
.hasSize(4);
}

private static void verifyThatRootElementIsActivated(
final long processInstanceKey, final String elementId, final BpmnElementType elementType) {
verifyThatElementIsActivated(processInstanceKey, elementId, elementType, processInstanceKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@
"type": "keyword"
}
}
},
"ancestorScopeKeys": {
"type": "long"
}
}
},
"activatedElementInstanceKeys": {
"ancestorScopeKeys": {
"type": "long"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import io.camunda.zeebe.msgpack.property.ArrayProperty;
import io.camunda.zeebe.msgpack.property.LongProperty;
import io.camunda.zeebe.msgpack.property.StringProperty;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.ObjectValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue.ProcessInstanceModificationActivateInstructionValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue.ProcessInstanceModificationVariableInstructionValue;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;

@JsonIgnoreProperties({
Expand All @@ -34,10 +37,14 @@ public final class ProcessInstanceModificationActivateInstruction extends Object
new ArrayProperty<>(
"variableInstructions", new ProcessInstanceModificationVariableInstruction());

private final ArrayProperty<LongValue> ancestorScopeKeysProperty =
new ArrayProperty<>("ancestorScopeKeys", new LongValue());

public ProcessInstanceModificationActivateInstruction() {
declareProperty(elementIdProperty)
.declareProperty(ancestorScopeKeyProperty)
.declareProperty(variableInstructionsProperty);
.declareProperty(variableInstructionsProperty)
.declareProperty(ancestorScopeKeysProperty);
}

@Override
Expand Down Expand Up @@ -68,6 +75,11 @@ public List<ProcessInstanceModificationVariableInstructionValue> getVariableInst
.toList();
}

@Override
public Set<Long> getAncestorScopeKeys() {
return ancestorScopeKeysProperty.stream().map(LongValue::getValue).collect(Collectors.toSet());
}

public ProcessInstanceModificationActivateInstruction setAncestorScopeKey(
final long ancestorScopeKey) {
ancestorScopeKeyProperty.setValue(ancestorScopeKey);
Expand All @@ -91,6 +103,12 @@ public ProcessInstanceModificationActivateInstruction addVariableInstruction(
return this;
}

public ProcessInstanceModificationActivateInstruction addAncestorScopeKeys(
final Set<Long> flowScopeKeys) {
flowScopeKeys.forEach(key -> ancestorScopeKeysProperty.add().setValue(key));
return this;
}

@JsonIgnore
public DirectBuffer getElementIdBuffer() {
return elementIdProperty.getValue();
Expand All @@ -102,6 +120,7 @@ public void copy(final ProcessInstanceModificationActivateInstruction object) {
object.getVariableInstructions().stream()
.map(ProcessInstanceModificationVariableInstruction.class::cast)
.forEach(this::addVariableInstruction);
addAncestorScopeKeys(object.getAncestorScopeKeys());
}

/** hashCode relies on implementation provided by {@link ObjectValue#hashCode()} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
activateInstructionsProperty =
new ArrayProperty<>(
"activateInstructions", new ProcessInstanceModificationActivateInstruction());

@Deprecated(since = "8.1.3")
private final ArrayProperty<LongValue> activatedElementInstanceKeys =
new ArrayProperty<>("activatedElementInstanceKeys", new LongValue());

Expand Down Expand Up @@ -81,10 +83,20 @@ public List<ProcessInstanceModificationActivateInstructionValue> getActivateInst
}

@Override
public Set<Long> getActivatedElementInstanceKeys() {
return activatedElementInstanceKeys.stream()
.map(LongValue::getValue)
.collect(Collectors.toSet());
public Set<Long> getAncestorScopeKeys() {
final Set<Long> activatedElementInstanceKeys =
this.activatedElementInstanceKeys.stream()
.map(LongValue::getValue)
.collect(Collectors.toSet());
// For backwards compatibility's sake we have to add the ancestor scope keys of all activate
// instructions, as from version 8.1.3 on the activatedElementInstanceKeys property is no longer
// filled.
activatedElementInstanceKeys.addAll(
getActivateInstructions().stream()
.map(ProcessInstanceModificationActivateInstructionValue::getAncestorScopeKeys)
.flatMap(Set::stream)
.collect(Collectors.toSet()));
return activatedElementInstanceKeys;
}

/** Returns true if this record has terminate instructions, otherwise false. */
Expand All @@ -111,11 +123,6 @@ public ProcessInstanceModificationRecord addActivateInstruction(
return this;
}

public ProcessInstanceModificationRecord addActivatedElementInstanceKey(final long key) {
activatedElementInstanceKeys.add().setValue(key);
return this;
}

@Override
public long getProcessInstanceKey() {
return processInstanceKeyProperty.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
Expand Down Expand Up @@ -771,7 +772,8 @@ final var record = new DeploymentDistributionRecord();
.addVariableInstruction(
new ProcessInstanceModificationVariableInstruction()
.setVariables(VARIABLES_MSGPACK)
.setElementId(variableInstructionElementId)));
.setElementId(variableInstructionElementId))
.addAncestorScopeKeys(Set.of(key, ancestorScopeKey)));
},
"""
{
Expand All @@ -787,9 +789,10 @@ final var record = new DeploymentDistributionRecord();
"foo": "bar"
}
}],
"elementId": "activity"
"elementId": "activity",
"ancestorScopeKeys": [1,3]
}],
"activatedElementInstanceKeys": []
"ancestorScopeKeys": [1,3]
}
"""
},
Expand All @@ -806,7 +809,7 @@ final var record = new DeploymentDistributionRecord();
"processInstanceKey": 1,
"terminateInstructions": [],
"activateInstructions": [],
"activatedElementInstanceKeys": []
"ancestorScopeKeys": []
}
"""
},
Expand Down

0 comments on commit c5f0f0a

Please sign in to comment.