Skip to content

Commit

Permalink
merge: #10653
Browse files Browse the repository at this point in the history
10653: [Backport stable/8.1] Remove interrupted state on event subprocess activation r=remcowesterhoud a=backport-action

# Description
Backport of #10609 to `stable/8.1`.

relates to #10477

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 10, 2022
2 parents aa574c4 + 5092849 commit bdfd16b
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public void processRecord(
processInstance,
process,
instruction));

activatedElementKeys
.getFlowScopeKeys()
.forEach(value::addActivatedElementInstanceKey);

return activatedElementKeys.getFlowScopeKeys().stream();
})
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand All @@ -50,6 +51,7 @@ public final class EventAppliers implements EventApplier {
public EventAppliers(final MutableZeebeState state) {
registerProcessInstanceEventAppliers(state);
registerProcessInstanceCreationAppliers(state);
registerProcessInstanceModificationAppliers(state);

register(ProcessIntent.CREATED, new ProcessCreatedApplier(state));
register(ErrorIntent.CREATED, new ErrorCreatedApplier(state.getBlackListState()));
Expand Down Expand Up @@ -147,6 +149,12 @@ private void registerProcessInstanceCreationAppliers(final MutableZeebeState sta
new ProcessInstanceCreationCreatedApplier(processState, elementInstanceState));
}

private void registerProcessInstanceModificationAppliers(final MutableZeebeState state) {
register(
ProcessInstanceModificationIntent.MODIFIED,
new ProcessInstanceModifiedEventApplier(state.getElementInstanceState()));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
register(JobIntent.CANCELED, new JobCanceledApplier(state));
register(JobIntent.COMPLETED, new JobCompletedApplier(state));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;

final class ProcessInstanceModifiedEventApplier
implements TypedEventApplier<
ProcessInstanceModificationIntent, ProcessInstanceModificationRecord> {

private final MutableElementInstanceState elementInstanceState;

public ProcessInstanceModifiedEventApplier(
final MutableElementInstanceState elementInstanceState) {
this.elementInstanceState = elementInstanceState;
}

@Override
public void applyState(final long key, final ProcessInstanceModificationRecord value) {
value.getActivatedElementInstanceKeys().stream()
.map(elementInstanceState::getInstance)
.filter(ElementInstance::isInterrupted)
.forEach(
instance ->
elementInstanceState.updateInstance(
instance.getKey(), ElementInstance::clearInterruptedState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public boolean isInterrupted() {
return getInterruptingElementId().capacity() > 0;
}

public void clearInterruptedState() {
interruptingEventKeyProp.setValue("");
}

public long getParentKey() {
return parentKeyProp.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -26,6 +27,7 @@
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -734,6 +736,79 @@ public void shouldTerminateAndActivateElementInTheSameScope() {
verifyThatProcessInstanceIsCompleted(processInstanceKey);
}

@Test
public void shouldActivateElementInInterruptedFlowScope() {
// given
final Consumer<EventSubProcessBuilder> eventSubProcess =
eventSubprocess ->
eventSubprocess
.startEvent()
.interrupting(true)
.message(message -> message.name("interrupt").zeebeCorrelationKeyExpression("key"))
.userTask("A")
.endEvent();

final Consumer<SubProcessBuilder> subProcess =
subprocess -> subprocess.embeddedSubProcess().startEvent().userTask("C").endEvent();

ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event-subprocess", eventSubProcess)
.startEvent()
.userTask("B")
.subProcess("subprocess", subProcess)
.endEvent()
.done())
.deploy();

final var processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("B")
.await();

ENGINE
.message()
.withName("interrupt")
.withCorrelationKey("key-1")
.withTimeToLive(Duration.ofMinutes(1))
.publish();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("A")
.await();

// when
ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.modification()
.activateElement("C")
.activateElement("subprocess")
.modify();

// then
Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("C")
.exists())
.isTrue();

Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("subprocess")
.limit(2)
.count())
.isEqualTo(2);
}

private static void verifyThatRootElementIsActivated(
final long processInstanceKey, final String elementId, final BpmnElementType elementType) {
verifyThatElementIsActivated(processInstanceKey, elementId, elementType, processInstanceKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.camunda.zeebe.msgpack.property.ArrayProperty;
import io.camunda.zeebe.msgpack.property.LongProperty;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.ObjectValue;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
implements ProcessInstanceModificationRecordValue {
Expand All @@ -27,11 +30,14 @@ public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
activateInstructionsProperty =
new ArrayProperty<>(
"activateInstructions", new ProcessInstanceModificationActivateInstruction());
private final ArrayProperty<LongValue> activatedElementInstanceKeys =
new ArrayProperty<>("activatedElementInstanceKeys", new LongValue());

public ProcessInstanceModificationRecord() {
declareProperty(processInstanceKeyProperty)
.declareProperty(terminateInstructionsProperty)
.declareProperty(activateInstructionsProperty);
.declareProperty(activateInstructionsProperty)
.declareProperty(activatedElementInstanceKeys);
}

/**
Expand Down Expand Up @@ -98,6 +104,17 @@ public ProcessInstanceModificationRecord addActivateInstruction(
return this;
}

public Set<Long> getActivatedElementInstanceKeys() {
return activatedElementInstanceKeys.stream()
.map(LongValue::getValue)
.collect(Collectors.toSet());
}

public ProcessInstanceModificationRecord addActivatedElementInstanceKey(final long key) {
activatedElementInstanceKeys.add().setValue(key);
return this;
}

@Override
public long getProcessInstanceKey() {
return processInstanceKeyProperty.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,8 @@ final var record = new DeploymentDistributionRecord();
}
}],
"elementId": "activity"
}]
}],
"activatedElementInstanceKeys": []
}
"""
},
Expand All @@ -803,7 +804,8 @@ final var record = new DeploymentDistributionRecord();
{
"processInstanceKey": 1,
"terminateInstructions": [],
"activateInstructions": []
"activateInstructions": [],
"activatedElementInstanceKeys": []
}
"""
},
Expand Down

0 comments on commit bdfd16b

Please sign in to comment.