diff --git a/engine/pom.xml b/engine/pom.xml
index 20d643ac30fc..28898cc26f0d 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -158,6 +158,12 @@
test
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+
+
org.awaitility
awaitility
diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java
index cd382b9d9c3a..d558eef9ccbb 100644
--- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java
+++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java
@@ -35,4 +35,6 @@ public interface BpmnBehaviors {
BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior();
BpmnJobBehavior jobBehavior();
+
+ MultiInstanceOutputCollectionBehavior outputCollectionBehavior();
}
diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java
index b4f2f76ce2bd..2895daf33848 100644
--- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java
+++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java
@@ -39,6 +39,8 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors {
private final BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior;
private final BpmnJobBehavior jobBehavior;
+ private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior;
+
public BpmnBehaviorsImpl(
final ExpressionProcessor expressionBehavior,
final SideEffects sideEffects,
@@ -105,6 +107,9 @@ public BpmnBehaviorsImpl(
stateBehavior,
incidentBehavior,
jobMetrics);
+
+ multiInstanceOutputCollectionBehavior =
+ new MultiInstanceOutputCollectionBehavior(stateBehavior, expressionBehavior());
}
@Override
@@ -166,4 +171,9 @@ public BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior()
public BpmnJobBehavior jobBehavior() {
return jobBehavior;
}
+
+ @Override
+ public MultiInstanceOutputCollectionBehavior outputCollectionBehavior() {
+ return multiInstanceOutputCollectionBehavior;
+ }
}
diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java
new file mode 100644
index 000000000000..839d19f92560
--- /dev/null
+++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
+ * one or more contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright ownership.
+ * Licensed under the Zeebe Community License 1.1. You may not use this file
+ * except in compliance with the Zeebe Community License 1.1.
+ */
+package io.camunda.zeebe.engine.processing.bpmn.behavior;
+
+import static io.camunda.zeebe.util.buffer.BufferUtil.bufferAsString;
+
+import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
+import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
+import io.camunda.zeebe.engine.processing.common.Failure;
+import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
+import io.camunda.zeebe.msgpack.spec.MsgPackReader;
+import io.camunda.zeebe.msgpack.spec.MsgPackToken;
+import io.camunda.zeebe.msgpack.spec.MsgPackType;
+import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
+import io.camunda.zeebe.protocol.record.value.ErrorType;
+import io.camunda.zeebe.util.Either;
+import java.util.Optional;
+import org.agrona.DirectBuffer;
+import org.agrona.ExpandableArrayBuffer;
+import org.agrona.concurrent.UnsafeBuffer;
+
+public final class MultiInstanceOutputCollectionBehavior {
+
+ private final MsgPackReader outputCollectionReader = new MsgPackReader();
+ private final MsgPackWriter outputCollectionWriter = new MsgPackWriter();
+ private final ExpandableArrayBuffer outputCollectionBuffer = new ExpandableArrayBuffer();
+ private final DirectBuffer updatedOutputCollectionBuffer = new UnsafeBuffer(0, 0);
+
+ private final BpmnStateBehavior stateBehavior;
+ private final ExpressionProcessor expressionProcessor;
+
+ MultiInstanceOutputCollectionBehavior(
+ final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) {
+ this.stateBehavior = stateBehavior;
+ this.expressionProcessor = expressionProcessor;
+ }
+
+ public void initializeOutputCollection(
+ final BpmnElementContext context, final DirectBuffer variableName, final int size) {
+
+ outputCollectionWriter.wrap(outputCollectionBuffer, 0);
+
+ // initialize the array with nil
+ outputCollectionWriter.writeArrayHeader(size);
+ for (var i = 0; i < size; i++) {
+ outputCollectionWriter.writeNil();
+ }
+
+ final var length = outputCollectionWriter.getOffset();
+
+ stateBehavior.setLocalVariable(context, variableName, outputCollectionBuffer, 0, length);
+ }
+
+ public Either updateOutputCollection(
+ final ExecutableMultiInstanceBody element,
+ final BpmnElementContext childContext,
+ final BpmnElementContext flowScopeContext) {
+
+ return element
+ .getLoopCharacteristics()
+ .getOutputCollection()
+ .map(
+ variableName ->
+ updateOutputCollection(element, childContext, flowScopeContext, variableName))
+ .orElse(Either.right(null));
+ }
+
+ private Either updateOutputCollection(
+ final ExecutableMultiInstanceBody element,
+ final BpmnElementContext childContext,
+ final BpmnElementContext flowScopeContext,
+ final DirectBuffer variableName) {
+
+ final var loopCounter =
+ stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter();
+
+ return readOutputElementVariable(element, childContext)
+ .flatMap(
+ elementVariable -> {
+ // we need to read the output element variable before the current collection
+ // is read, because readOutputElementVariable(Context) uses the same
+ // buffer as getVariableLocal this could also be avoided by cloning the current
+ // collection, but that is slower.
+ final var currentCollection =
+ stateBehavior.getLocalVariable(flowScopeContext, variableName);
+ return replaceAt(
+ currentCollection,
+ loopCounter,
+ elementVariable,
+ flowScopeContext.getElementInstanceKey(),
+ variableName)
+ .map(
+ updatedCollection -> {
+ stateBehavior.setLocalVariable(
+ flowScopeContext, variableName, updatedCollection);
+ return null;
+ });
+ });
+ }
+
+ private Either readOutputElementVariable(
+ final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
+ final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow();
+ return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey());
+ }
+
+ private Either replaceAt(
+ final DirectBuffer array,
+ final int index,
+ final DirectBuffer element,
+ final long variableScopeKey,
+ final DirectBuffer variableName) {
+
+ outputCollectionReader.wrap(array, 0, array.capacity());
+ final var token = outputCollectionReader.readToken();
+
+ final var optValidationFailure =
+ validateIsCollectionAndHasAppropriateSIze(index, variableScopeKey, variableName, token);
+ if (optValidationFailure.isPresent()) {
+ return Either.left(optValidationFailure.get());
+ }
+
+ outputCollectionReader.skipValues((long) index - 1L);
+
+ final var offsetBefore = outputCollectionReader.getOffset();
+ outputCollectionReader.skipValue();
+ final var offsetAfter = outputCollectionReader.getOffset();
+
+ outputCollectionWriter.wrap(outputCollectionBuffer, 0);
+ outputCollectionWriter.writeRaw(array, 0, offsetBefore);
+ outputCollectionWriter.writeRaw(element);
+ outputCollectionWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter);
+
+ final var length = outputCollectionWriter.getOffset();
+
+ updatedOutputCollectionBuffer.wrap(outputCollectionBuffer, 0, length);
+ return Either.right(updatedOutputCollectionBuffer);
+ }
+
+ private Optional validateIsCollectionAndHasAppropriateSIze(
+ final int index,
+ final long variableScopeKey,
+ final DirectBuffer variableName,
+ final MsgPackToken token) {
+ if (token.getType() != MsgPackType.ARRAY) {
+ return Optional.of(
+ new Failure(
+ "Unable to update an item in output collection '%s' because the type of the output collection is: %s. This may happen when multiple BPMN elements write to the same variable."
+ .formatted(bufferAsString(variableName), token.getType()),
+ ErrorType.EXTRACT_VALUE_ERROR,
+ variableScopeKey));
+ }
+
+ final int size = token.getSize();
+ if (index > size) {
+ return Optional.of(
+ new Failure(
+ "Unable to update an item in output collection '%s' at position %d because the size of the collection is: %d. This may happen when multiple BPMN elements write to the same variable."
+ .formatted(bufferAsString(variableName), index, size),
+ ErrorType.EXTRACT_VALUE_ERROR,
+ variableScopeKey));
+ }
+ return Optional.empty();
+ }
+}
diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java
index 45b12f2ece58..0d7439d2946d 100644
--- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java
+++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java
@@ -15,11 +15,11 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
+import io.camunda.zeebe.engine.processing.bpmn.behavior.MultiInstanceOutputCollectionBehavior;
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
import io.camunda.zeebe.msgpack.spec.MsgPackHelper;
-import io.camunda.zeebe.msgpack.spec.MsgPackReader;
import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
@@ -28,7 +28,6 @@
import java.util.List;
import java.util.Optional;
import org.agrona.DirectBuffer;
-import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
@@ -42,16 +41,14 @@ public final class MultiInstanceBodyProcessor
new UnsafeBuffer(new byte[Long.BYTES + 1]);
private final DirectBuffer loopCounterVariableView = new UnsafeBuffer(0, 0);
- private final MsgPackReader variableReader = new MsgPackReader();
- private final MsgPackWriter variableWriter = new MsgPackWriter();
- private final ExpandableArrayBuffer variableBuffer = new ExpandableArrayBuffer();
- private final DirectBuffer resultBuffer = new UnsafeBuffer(0, 0);
+ private final MsgPackWriter loopCounterWriter = new MsgPackWriter();
private final ExpressionProcessor expressionBehavior;
private final BpmnStateTransitionBehavior stateTransitionBehavior;
private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior;
private final BpmnStateBehavior stateBehavior;
private final BpmnIncidentBehavior incidentBehavior;
+ private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior;
public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) {
stateTransitionBehavior = bpmnBehaviors.stateTransitionBehavior();
@@ -59,6 +56,7 @@ public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) {
stateBehavior = bpmnBehaviors.stateBehavior();
expressionBehavior = bpmnBehaviors.expressionBehavior();
incidentBehavior = bpmnBehaviors.incidentBehavior();
+ multiInstanceOutputCollectionBehavior = bpmnBehaviors.outputCollectionBehavior();
}
@Override
@@ -150,7 +148,9 @@ public void onTerminate(
final ExecutableMultiInstanceBody element,
final BpmnElementContext flowScopeContext,
final BpmnElementContext childContext) {
- final var updatedOrFailure = updateOutputCollection(element, childContext, flowScopeContext);
+ final var updatedOrFailure =
+ multiInstanceOutputCollectionBehavior.updateOutputCollection(
+ element, childContext, flowScopeContext);
if (updatedOrFailure.isLeft()) {
return updatedOrFailure;
}
@@ -247,7 +247,8 @@ private void activate(
.getOutputCollection()
.ifPresent(
variableName ->
- initializeOutputCollection(activated, variableName, inputCollection.size()));
+ multiInstanceOutputCollectionBehavior.initializeOutputCollection(
+ activated, variableName, inputCollection.size()));
if (inputCollection.isEmpty()) {
// complete the multi-instance body immediately
@@ -328,99 +329,15 @@ private void createInnerInstance(
}
private DirectBuffer wrapLoopCounter(final int loopCounter) {
- variableWriter.wrap(loopCounterVariableBuffer, 0);
+ loopCounterWriter.wrap(loopCounterVariableBuffer, 0);
- variableWriter.writeInteger(loopCounter);
- final var length = variableWriter.getOffset();
+ loopCounterWriter.writeInteger(loopCounter);
+ final var length = loopCounterWriter.getOffset();
loopCounterVariableView.wrap(loopCounterVariableBuffer, 0, length);
return loopCounterVariableView;
}
- private void initializeOutputCollection(
- final BpmnElementContext context, final DirectBuffer variableName, final int size) {
-
- variableWriter.wrap(variableBuffer, 0);
-
- // initialize the array with nil
- variableWriter.writeArrayHeader(size);
- for (var i = 0; i < size; i++) {
- variableWriter.writeNil();
- }
-
- final var length = variableWriter.getOffset();
-
- stateBehavior.setLocalVariable(context, variableName, variableBuffer, 0, length);
- }
-
- private Either updateOutputCollection(
- final ExecutableMultiInstanceBody element,
- final BpmnElementContext childContext,
- final BpmnElementContext flowScopeContext) {
-
- return element
- .getLoopCharacteristics()
- .getOutputCollection()
- .map(
- variableName ->
- updateOutputCollection(element, childContext, flowScopeContext, variableName))
- .orElse(Either.right(null));
- }
-
- private Either updateOutputCollection(
- final ExecutableMultiInstanceBody element,
- final BpmnElementContext childContext,
- final BpmnElementContext flowScopeContext,
- final DirectBuffer variableName) {
-
- final var loopCounter =
- stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter();
-
- return readOutputElementVariable(element, childContext)
- .map(
- elementVariable -> {
- // we need to read the output element variable before the current collection
- // is read, because readOutputElementVariable(Context) uses the same
- // buffer as getVariableLocal this could also be avoided by cloning the current
- // collection, but that is slower.
- final var currentCollection =
- stateBehavior.getLocalVariable(flowScopeContext, variableName);
- final var updatedCollection =
- insertAt(currentCollection, loopCounter, elementVariable);
- stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection);
-
- return null;
- });
- }
-
- private Either readOutputElementVariable(
- final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
- final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow();
- return expressionBehavior.evaluateAnyExpression(expression, context.getElementInstanceKey());
- }
-
- private DirectBuffer insertAt(
- final DirectBuffer array, final int index, final DirectBuffer element) {
-
- variableReader.wrap(array, 0, array.capacity());
- variableReader.readArrayHeader();
- variableReader.skipValues((long) index - 1L);
-
- final var offsetBefore = variableReader.getOffset();
- variableReader.skipValue();
- final var offsetAfter = variableReader.getOffset();
-
- variableWriter.wrap(variableBuffer, 0);
- variableWriter.writeRaw(array, 0, offsetBefore);
- variableWriter.writeRaw(element);
- variableWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter);
-
- final var length = variableWriter.getOffset();
-
- resultBuffer.wrap(variableBuffer, 0, length);
- return resultBuffer;
- }
-
private Either satisfiesCompletionCondition(
final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
final Optional completionCondition =
diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java
new file mode 100644
index 000000000000..da5bef8efe9e
--- /dev/null
+++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
+ * one or more contributor license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding copyright ownership.
+ * Licensed under the Zeebe Community License 1.1. You may not use this file
+ * except in compliance with the Zeebe Community License 1.1.
+ */
+package io.camunda.zeebe.engine.processing.bpmn.behavior;
+
+import static io.camunda.zeebe.util.buffer.BufferUtil.cloneBuffer;
+import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.camunda.zeebe.el.Expression;
+import io.camunda.zeebe.el.impl.StaticExpression;
+import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
+import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
+import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableLoopCharacteristics;
+import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
+import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
+import io.camunda.zeebe.protocol.record.value.ErrorType;
+import io.camunda.zeebe.util.Either;
+import io.camunda.zeebe.util.buffer.BufferUtil;
+import java.util.Optional;
+import org.agrona.DirectBuffer;
+import org.agrona.ExpandableArrayBuffer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class MultiInstanceOutputCollectionBehaviorTest {
+ /*
+ * Note: this test class caused some controversy between the author and the reviewer. The reviewer
+ * feared that it will too costly to maintain and doesn't provide much value in return. The author
+ * felt it will be alright. We didn't manage to come to an agreement, so we leave it to you, future
+ * developer. If you had to maintain the test and feel it was too cumbersome and didn't add value,
+ * feel free to remove this test. If you felt maintaining it was ok, and the test does have value,
+ * feel free to remove this comment.
+ */
+
+ @Test // regression test for #9143
+ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() {
+ // given
+ final var collectionWithSize1 = createCollection(1);
+ final var elementToAdd = createMsgPackString("element to add");
+ final var indexThatIsOutOfBounds = 2;
+ final var outputElementExpression = new StaticExpression("dummy expression");
+ final var outputCollectionName = wrapString("OUTPUT_COLLECTION");
+ final var loopCharacteristics =
+ createLoopCharacteristics(outputCollectionName, outputElementExpression);
+ final var flowScopeContextKey = 12345L;
+
+ final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS);
+ when(mockStateBehavior.getLocalVariable(any(), eq(outputCollectionName)))
+ .thenReturn(collectionWithSize1);
+
+ final var mockExpressionProcessor = mock(ExpressionProcessor.class);
+ when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementExpression), anyLong()))
+ .thenReturn(Either.right(elementToAdd));
+
+ final var mockElement = mock(ExecutableMultiInstanceBody.class);
+ when(mockElement.getLoopCharacteristics()).thenReturn(loopCharacteristics);
+
+ final var mockChildContext = mock(BpmnElementContext.class);
+ when(mockStateBehavior.getElementInstance(mockChildContext).getMultiInstanceLoopCounter())
+ .thenReturn(indexThatIsOutOfBounds);
+
+ final var mockFlowScopeContext = mock(BpmnElementContext.class);
+ when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey);
+
+ final var sut =
+ new MultiInstanceOutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor);
+
+ // when
+ final var result =
+ sut.updateOutputCollection(mockElement, mockChildContext, mockFlowScopeContext);
+
+ // then
+ assertThat(result.isLeft()).isTrue();
+
+ final var failure = result.getLeft();
+ assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR);
+ assertThat(failure.getMessage())
+ .isEqualTo(
+ "Unable to update an item in output collection 'OUTPUT_COLLECTION' at position 2 because the size of the collection is: 1. This may happen when multiple BPMN elements write to the same variable.");
+ assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey);
+ }
+
+ @Test
+ void shouldReturnFailureWhenWritingToOutputCollectionWhichIsNotArray() {
+ // given
+ final var unexpectedValueType = createMsgPackString("lorem ipsum");
+ final var elementToAdd = createMsgPackString("element to add");
+ final var index = 1;
+ final var outputElementExpression = new StaticExpression("dummy expression");
+ final var outputCollectionName = wrapString("OUTPUT_COLLECTION");
+ final var loopCharacteristics =
+ createLoopCharacteristics(outputCollectionName, outputElementExpression);
+ final var flowScopeContextKey = 12345L;
+
+ final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS);
+ when(mockStateBehavior.getLocalVariable(any(), eq(outputCollectionName)))
+ .thenReturn(unexpectedValueType);
+
+ final var mockExpressionProcessor = mock(ExpressionProcessor.class);
+ when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementExpression), anyLong()))
+ .thenReturn(Either.right(elementToAdd));
+
+ final var mockElement = mock(ExecutableMultiInstanceBody.class);
+ when(mockElement.getLoopCharacteristics()).thenReturn(loopCharacteristics);
+
+ final var mockChildContext = mock(BpmnElementContext.class);
+ when(mockStateBehavior.getElementInstance(mockChildContext).getMultiInstanceLoopCounter())
+ .thenReturn(index);
+
+ final var mockFlowScopeContext = mock(BpmnElementContext.class);
+ when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey);
+
+ final var sut =
+ new MultiInstanceOutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor);
+
+ // when
+ final var result =
+ sut.updateOutputCollection(mockElement, mockChildContext, mockFlowScopeContext);
+
+ // then
+ assertThat(result.isLeft()).isTrue();
+
+ final var failure = result.getLeft();
+ assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR);
+ assertThat(failure.getMessage())
+ .isEqualTo(
+ "Unable to update an item in output collection 'OUTPUT_COLLECTION' because the type of the output collection is: STRING. This may happen when multiple BPMN elements write to the same variable.");
+ assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey);
+ }
+
+ private ExecutableLoopCharacteristics createLoopCharacteristics(
+ final DirectBuffer outputCollection, final Expression outputElement) {
+ return new ExecutableLoopCharacteristics(
+ false,
+ Optional.empty(),
+ null,
+ Optional.empty(),
+ Optional.of(outputCollection),
+ Optional.of(outputElement));
+ }
+
+ private DirectBuffer createCollection(final int size) {
+ final var writer = new MsgPackWriter();
+ final var buffer = new ExpandableArrayBuffer();
+
+ writer.wrap(buffer, 0);
+
+ // initialize the array with nil
+ writer.writeArrayHeader(size);
+ for (var i = 0; i < size; i++) {
+ writer.writeNil();
+ }
+
+ final var length = writer.getOffset();
+
+ return cloneBuffer(buffer, 0, length);
+ }
+
+ private DirectBuffer createMsgPackString(final String input) {
+ final var writer = new MsgPackWriter();
+ final var buffer = new ExpandableArrayBuffer();
+
+ writer.wrap(buffer, 0);
+
+ // initialize the array with nil
+ writer.writeString(BufferUtil.wrapString(input));
+
+ final var length = writer.getOffset();
+
+ return cloneBuffer(buffer, 0, length);
+ }
+}
diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java
index 4efa2f8d57ca..b2542a9d969f 100644
--- a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java
+++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java
@@ -16,6 +16,7 @@
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.client.IncidentClient.ResolveIncidentClient;
import io.camunda.zeebe.model.bpmn.Bpmn;
+import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
@@ -627,6 +628,157 @@ public void shouldResolveIncidentDueToCompletionCondition() {
.isTrue();
}
+ /**
+ * This test covers the scenario where a multi instance cannot be completed, because updating the
+ * output collection fails due to an index out of bounds. The index out of bounds is caused
+ * because a) the output collection is initialized with the cardinality of the multi instance when
+ * the multi instance is activated, and b) the collection is modified and shrunk to a smaller
+ * size.
+ */
+ @Test // regression test for #9143
+ public void shouldCreateAndResolveIncidentIfOutputElementCannotBeReplacedInOutputCollection() {
+ // given
+ final var processId = "index-out-of-bounds-in-output-collection";
+ final var collectionWithThreeElements = "=[1]";
+ final var collectionWithNoElements = "=[]";
+ final var outputCollectionName = "outputItems";
+
+ final var process =
+ createProcessThatModifiesOutputCollection(
+ processId, collectionWithThreeElements, collectionWithNoElements, outputCollectionName);
+
+ ENGINE.deployment().withXmlResource(process).deploy();
+
+ // when (raise incident)
+ final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create();
+
+ // then (incident is raised)
+ final Record incidentEvent =
+ RecordingExporter.incidentRecords(IncidentIntent.CREATED)
+ .withProcessInstanceKey(processInstanceKey)
+ .getFirst();
+
+ Assertions.assertThat(incidentEvent.getValue())
+ .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR)
+ .hasErrorMessage(
+ "Unable to update an item in output collection 'outputItems' at position 1 because the size of the collection is: 0. This may happen when multiple BPMN elements write to the same variable.")
+ .hasProcessInstanceKey(processInstanceKey);
+
+ // when (resolve incident)
+ ENGINE
+ .variables()
+ .ofScope(incidentEvent.getValue().getVariableScopeKey())
+ .withDocument(Maps.of(entry(outputCollectionName, List.of(1))))
+ .update();
+ ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve();
+
+ // then (incident is resolved)
+ assertThat(
+ RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
+ .withElementType(BpmnElementType.PROCESS)
+ .withProcessInstanceKey(processInstanceKey)
+ .limitToProcessInstanceCompleted()
+ .exists())
+ .describedAs("the process has completed")
+ .isTrue();
+ org.assertj.core.api.Assertions.assertThat(
+ RecordingExporter.variableRecords(VariableIntent.UPDATED)
+ .withProcessInstanceKey(processInstanceKey)
+ .withName(outputCollectionName)
+ .withValue("[1]")
+ .exists())
+ .isTrue();
+ }
+
+ /**
+ * This test covers the scenario where a multi instance cannot be completed, because updating the
+ * output collection fails because the output collection is not an array.
+ */
+ @Test
+ public void shouldCreateAndResolveIncidentIfOutputCollectionHasWrongType() {
+ // given
+ final var processId = "output-collection-is-overwritten-by-string";
+ final var collectionWithThreeElements = "=[1]";
+ final var overwriteWithString = "=\"String overwrite\"";
+ final var outputCollectionName = "outputItems";
+
+ final var process =
+ createProcessThatModifiesOutputCollection(
+ processId, collectionWithThreeElements, overwriteWithString, outputCollectionName);
+
+ ENGINE.deployment().withXmlResource(process).deploy();
+
+ // when (raise incident)
+ final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create();
+
+ // then (incident is raised)
+ final Record incidentEvent =
+ RecordingExporter.incidentRecords(IncidentIntent.CREATED)
+ .withProcessInstanceKey(processInstanceKey)
+ .getFirst();
+
+ Assertions.assertThat(incidentEvent.getValue())
+ .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR)
+ .hasErrorMessage(
+ "Unable to update an item in output collection 'outputItems' because the type of the output collection is: STRING. This may happen when multiple BPMN elements write to the same variable.")
+ .hasProcessInstanceKey(processInstanceKey);
+
+ // when (resolve incident)
+ ENGINE
+ .variables()
+ .ofScope(incidentEvent.getValue().getVariableScopeKey())
+ .withDocument(Maps.of(entry(outputCollectionName, List.of(1))))
+ .update();
+ ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve();
+
+ // then (incident is resolved)
+ assertThat(
+ RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
+ .withElementType(BpmnElementType.PROCESS)
+ .withProcessInstanceKey(processInstanceKey)
+ .limitToProcessInstanceCompleted()
+ .exists())
+ .describedAs("the process has completed")
+ .isTrue();
+
+ org.assertj.core.api.Assertions.assertThat(
+ RecordingExporter.variableRecords(VariableIntent.UPDATED)
+ .withProcessInstanceKey(processInstanceKey)
+ .withName(outputCollectionName)
+ .withValue("[1]")
+ .exists())
+ .isTrue();
+ }
+
+ private BpmnModelInstance createProcessThatModifiesOutputCollection(
+ final String processId,
+ final String initialValueForCollection,
+ final String overwrittenValue,
+ final String outputCollectionName) {
+ return Bpmn.createExecutableProcess(processId)
+ .startEvent()
+ .zeebeOutput(
+ initialValueForCollection, // initializes input collection
+ INPUT_COLLECTION)
+ .subProcess()
+ .multiInstance(
+ mi ->
+ mi.parallel()
+ .zeebeInputCollectionExpression(INPUT_COLLECTION)
+ .zeebeInputElement(INPUT_ELEMENT)
+ .zeebeOutputCollection(
+ outputCollectionName) // initialize output collection with size in input
+ // collection
+ .zeebeOutputElementExpression(INPUT_ELEMENT))
+ .embeddedSubProcess()
+ .startEvent()
+ .zeebeOutput(overwrittenValue, outputCollectionName) // overwrite output collection
+ .endEvent()
+ .subProcessDone()
+ .endEvent()
+ .done();
+ }
+
private static void completeNthJob(final long processInstanceKey, final int n) {
final var nthJob = findNthJob(processInstanceKey, n);
ENGINE.job().withKey(nthJob.getKey()).complete();
diff --git a/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 000000000000..1f0955d450f0
--- /dev/null
+++ b/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline