From 54283d88cd34f184832bd6e2c224081aaed97552 Mon Sep 17 00:00:00 2001 From: pihme Date: Tue, 19 Apr 2022 13:20:17 +0200 Subject: [PATCH 01/14] refactor(engine): extract logic regarding output collections --- .../container/MultiInstanceBodyProcessor.java | 177 ++++++++++-------- 1 file changed, 98 insertions(+), 79 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index 45b12f2ece58..b6b9f7436a41 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -42,16 +42,14 @@ public final class MultiInstanceBodyProcessor new UnsafeBuffer(new byte[Long.BYTES + 1]); private final DirectBuffer loopCounterVariableView = new UnsafeBuffer(0, 0); - private final MsgPackReader variableReader = new MsgPackReader(); private final MsgPackWriter variableWriter = new MsgPackWriter(); - private final ExpandableArrayBuffer variableBuffer = new ExpandableArrayBuffer(); - private final DirectBuffer resultBuffer = new UnsafeBuffer(0, 0); private final ExpressionProcessor expressionBehavior; private final BpmnStateTransitionBehavior stateTransitionBehavior; private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior; private final BpmnStateBehavior stateBehavior; private final BpmnIncidentBehavior incidentBehavior; + private final OutputCollectionBehavior outputCollectionBehavior; public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) { stateTransitionBehavior = bpmnBehaviors.stateTransitionBehavior(); @@ -59,6 +57,8 @@ public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) { stateBehavior = bpmnBehaviors.stateBehavior(); expressionBehavior = bpmnBehaviors.expressionBehavior(); incidentBehavior = bpmnBehaviors.incidentBehavior(); + outputCollectionBehavior = + new OutputCollectionBehavior(stateBehavior, bpmnBehaviors.expressionBehavior()); } @Override @@ -150,7 +150,8 @@ public void onTerminate( final ExecutableMultiInstanceBody element, final BpmnElementContext flowScopeContext, final BpmnElementContext childContext) { - final var updatedOrFailure = updateOutputCollection(element, childContext, flowScopeContext); + final var updatedOrFailure = + outputCollectionBehavior.updateOutputCollection(element, childContext, flowScopeContext); if (updatedOrFailure.isLeft()) { return updatedOrFailure; } @@ -247,7 +248,8 @@ private void activate( .getOutputCollection() .ifPresent( variableName -> - initializeOutputCollection(activated, variableName, inputCollection.size())); + outputCollectionBehavior.initializeOutputCollection( + activated, variableName, inputCollection.size())); if (inputCollection.isEmpty()) { // complete the multi-instance body immediately @@ -337,99 +339,116 @@ private DirectBuffer wrapLoopCounter(final int loopCounter) { return loopCounterVariableView; } - private void initializeOutputCollection( - final BpmnElementContext context, final DirectBuffer variableName, final int size) { - - variableWriter.wrap(variableBuffer, 0); + private Either satisfiesCompletionCondition( + final ExecutableMultiInstanceBody element, final BpmnElementContext context) { + final Optional completionCondition = + element.getLoopCharacteristics().getCompletionCondition(); - // initialize the array with nil - variableWriter.writeArrayHeader(size); - for (var i = 0; i < size; i++) { - variableWriter.writeNil(); + if (completionCondition.isPresent()) { + return expressionBehavior.evaluateBooleanExpression( + completionCondition.get(), context.getElementInstanceKey()); } + return Either.right(false); + } - final var length = variableWriter.getOffset(); + private static final class OutputCollectionBehavior { - stateBehavior.setLocalVariable(context, variableName, variableBuffer, 0, length); - } + private final MsgPackReader variableReader = new MsgPackReader(); + private final MsgPackWriter variableWriter = new MsgPackWriter(); + private final ExpandableArrayBuffer variableBuffer = new ExpandableArrayBuffer(); + private final DirectBuffer resultBuffer = new UnsafeBuffer(0, 0); - private Either updateOutputCollection( - final ExecutableMultiInstanceBody element, - final BpmnElementContext childContext, - final BpmnElementContext flowScopeContext) { + private final BpmnStateBehavior stateBehavior; + private final ExpressionProcessor expressionProcessor; - return element - .getLoopCharacteristics() - .getOutputCollection() - .map( - variableName -> - updateOutputCollection(element, childContext, flowScopeContext, variableName)) - .orElse(Either.right(null)); - } + private OutputCollectionBehavior( + final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) { + this.stateBehavior = stateBehavior; + this.expressionProcessor = expressionProcessor; + } - private Either updateOutputCollection( - final ExecutableMultiInstanceBody element, - final BpmnElementContext childContext, - final BpmnElementContext flowScopeContext, - final DirectBuffer variableName) { + private void initializeOutputCollection( + final BpmnElementContext context, final DirectBuffer variableName, final int size) { - final var loopCounter = - stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter(); + variableWriter.wrap(variableBuffer, 0); - return readOutputElementVariable(element, childContext) - .map( - elementVariable -> { - // we need to read the output element variable before the current collection - // is read, because readOutputElementVariable(Context) uses the same - // buffer as getVariableLocal this could also be avoided by cloning the current - // collection, but that is slower. - final var currentCollection = - stateBehavior.getLocalVariable(flowScopeContext, variableName); - final var updatedCollection = - insertAt(currentCollection, loopCounter, elementVariable); - stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); + // initialize the array with nil + variableWriter.writeArrayHeader(size); + for (var i = 0; i < size; i++) { + variableWriter.writeNil(); + } - return null; - }); - } + final var length = variableWriter.getOffset(); - private Either readOutputElementVariable( - final ExecutableMultiInstanceBody element, final BpmnElementContext context) { - final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow(); - return expressionBehavior.evaluateAnyExpression(expression, context.getElementInstanceKey()); - } + stateBehavior.setLocalVariable(context, variableName, variableBuffer, 0, length); + } + + private Either updateOutputCollection( + final ExecutableMultiInstanceBody element, + final BpmnElementContext childContext, + final BpmnElementContext flowScopeContext) { + + return element + .getLoopCharacteristics() + .getOutputCollection() + .map( + variableName -> + updateOutputCollection(element, childContext, flowScopeContext, variableName)) + .orElse(Either.right(null)); + } + + private Either updateOutputCollection( + final ExecutableMultiInstanceBody element, + final BpmnElementContext childContext, + final BpmnElementContext flowScopeContext, + final DirectBuffer variableName) { - private DirectBuffer insertAt( - final DirectBuffer array, final int index, final DirectBuffer element) { + final var loopCounter = + stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter(); + + return readOutputElementVariable(element, childContext) + .map( + elementVariable -> { + // we need to read the output element variable before the current collection + // is read, because readOutputElementVariable(Context) uses the same + // buffer as getVariableLocal this could also be avoided by cloning the current + // collection, but that is slower. + final var currentCollection = + stateBehavior.getLocalVariable(flowScopeContext, variableName); + final var updatedCollection = + insertAt(currentCollection, loopCounter, elementVariable); + stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); + + return null; + }); + } - variableReader.wrap(array, 0, array.capacity()); - variableReader.readArrayHeader(); - variableReader.skipValues((long) index - 1L); + private Either readOutputElementVariable( + final ExecutableMultiInstanceBody element, final BpmnElementContext context) { + final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow(); + return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey()); + } - final var offsetBefore = variableReader.getOffset(); - variableReader.skipValue(); - final var offsetAfter = variableReader.getOffset(); + private DirectBuffer insertAt( + final DirectBuffer array, final int index, final DirectBuffer element) { - variableWriter.wrap(variableBuffer, 0); - variableWriter.writeRaw(array, 0, offsetBefore); - variableWriter.writeRaw(element); - variableWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter); + variableReader.wrap(array, 0, array.capacity()); + variableReader.readArrayHeader(); + variableReader.skipValues((long) index - 1L); - final var length = variableWriter.getOffset(); + final var offsetBefore = variableReader.getOffset(); + variableReader.skipValue(); + final var offsetAfter = variableReader.getOffset(); - resultBuffer.wrap(variableBuffer, 0, length); - return resultBuffer; - } + variableWriter.wrap(variableBuffer, 0); + variableWriter.writeRaw(array, 0, offsetBefore); + variableWriter.writeRaw(element); + variableWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter); - private Either satisfiesCompletionCondition( - final ExecutableMultiInstanceBody element, final BpmnElementContext context) { - final Optional completionCondition = - element.getLoopCharacteristics().getCompletionCondition(); + final var length = variableWriter.getOffset(); - if (completionCondition.isPresent()) { - return expressionBehavior.evaluateBooleanExpression( - completionCondition.get(), context.getElementInstanceKey()); + resultBuffer.wrap(variableBuffer, 0, length); + return resultBuffer; } - return Either.right(false); } } From 70b1006c21e6cac4ff5f2fd70f2b37825bfca313 Mon Sep 17 00:00:00 2001 From: pihme Date: Tue, 19 Apr 2022 13:22:55 +0200 Subject: [PATCH 02/14] refactor(engine): rename variable --- .../bpmn/container/MultiInstanceBodyProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index b6b9f7436a41..27e8aad1cf9f 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -42,7 +42,7 @@ public final class MultiInstanceBodyProcessor new UnsafeBuffer(new byte[Long.BYTES + 1]); private final DirectBuffer loopCounterVariableView = new UnsafeBuffer(0, 0); - private final MsgPackWriter variableWriter = new MsgPackWriter(); + private final MsgPackWriter loopCounterWriter = new MsgPackWriter(); private final ExpressionProcessor expressionBehavior; private final BpmnStateTransitionBehavior stateTransitionBehavior; @@ -330,10 +330,10 @@ private void createInnerInstance( } private DirectBuffer wrapLoopCounter(final int loopCounter) { - variableWriter.wrap(loopCounterVariableBuffer, 0); + loopCounterWriter.wrap(loopCounterVariableBuffer, 0); - variableWriter.writeInteger(loopCounter); - final var length = variableWriter.getOffset(); + loopCounterWriter.writeInteger(loopCounter); + final var length = loopCounterWriter.getOffset(); loopCounterVariableView.wrap(loopCounterVariableBuffer, 0, length); return loopCounterVariableView; From 85bd71361b40802527a925caba62b0c3f0f9c4a2 Mon Sep 17 00:00:00 2001 From: pihme Date: Tue, 19 Apr 2022 13:33:53 +0200 Subject: [PATCH 03/14] refactor(engine): move private class up - also rename variables to be more descriptive --- .../behavior/OutputCollectionBehavior.java | 120 ++++++++++++++++++ .../container/MultiInstanceBodyProcessor.java | 104 +-------------- 2 files changed, 121 insertions(+), 103 deletions(-) create mode 100644 engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java new file mode 100644 index 000000000000..276c62ff7f01 --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -0,0 +1,120 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.bpmn.behavior; + +import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext; +import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; +import io.camunda.zeebe.engine.processing.common.Failure; +import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; +import io.camunda.zeebe.msgpack.spec.MsgPackReader; +import io.camunda.zeebe.msgpack.spec.MsgPackWriter; +import io.camunda.zeebe.util.Either; +import org.agrona.DirectBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.agrona.concurrent.UnsafeBuffer; + +public final class OutputCollectionBehavior { + + private final MsgPackReader outputCollectionReader = new MsgPackReader(); + private final MsgPackWriter outputCollectionWriter = new MsgPackWriter(); + private final ExpandableArrayBuffer outputCollectionBuffer = new ExpandableArrayBuffer(); + private final DirectBuffer updatedOutputCollectionBuffer = new UnsafeBuffer(0, 0); + + private final BpmnStateBehavior stateBehavior; + private final ExpressionProcessor expressionProcessor; + + public OutputCollectionBehavior( + final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) { + this.stateBehavior = stateBehavior; + this.expressionProcessor = expressionProcessor; + } + + public void initializeOutputCollection( + final BpmnElementContext context, final DirectBuffer variableName, final int size) { + + outputCollectionWriter.wrap(outputCollectionBuffer, 0); + + // initialize the array with nil + outputCollectionWriter.writeArrayHeader(size); + for (var i = 0; i < size; i++) { + outputCollectionWriter.writeNil(); + } + + final var length = outputCollectionWriter.getOffset(); + + stateBehavior.setLocalVariable(context, variableName, outputCollectionBuffer, 0, length); + } + + public Either updateOutputCollection( + final ExecutableMultiInstanceBody element, + final BpmnElementContext childContext, + final BpmnElementContext flowScopeContext) { + + return element + .getLoopCharacteristics() + .getOutputCollection() + .map( + variableName -> + updateOutputCollection(element, childContext, flowScopeContext, variableName)) + .orElse(Either.right(null)); + } + + private Either updateOutputCollection( + final ExecutableMultiInstanceBody element, + final BpmnElementContext childContext, + final BpmnElementContext flowScopeContext, + final DirectBuffer variableName) { + + final var loopCounter = + stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter(); + + return readOutputElementVariable(element, childContext) + .map( + elementVariable -> { + // we need to read the output element variable before the current collection + // is read, because readOutputElementVariable(Context) uses the same + // buffer as getVariableLocal this could also be avoided by cloning the current + // collection, but that is slower. + final var currentCollection = + stateBehavior.getLocalVariable(flowScopeContext, variableName); + final var updatedCollection = + insertAt(currentCollection, loopCounter, elementVariable); + stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); + + return null; + }); + } + + private Either readOutputElementVariable( + final ExecutableMultiInstanceBody element, final BpmnElementContext context) { + final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow(); + return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey()); + } + + private DirectBuffer insertAt( + final DirectBuffer array, final int index, final DirectBuffer element) { + + outputCollectionReader.wrap(array, 0, array.capacity()); + outputCollectionReader.readArrayHeader(); + outputCollectionReader.skipValues((long) index - 1L); + + final var offsetBefore = outputCollectionReader.getOffset(); + outputCollectionReader.skipValue(); + final var offsetAfter = outputCollectionReader.getOffset(); + + outputCollectionWriter.wrap(outputCollectionBuffer, 0); + outputCollectionWriter.writeRaw(array, 0, offsetBefore); + outputCollectionWriter.writeRaw(element); + outputCollectionWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter); + + final var length = outputCollectionWriter.getOffset(); + + updatedOutputCollectionBuffer.wrap(outputCollectionBuffer, 0, length); + return updatedOutputCollectionBuffer; + } +} diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index 27e8aad1cf9f..b01b50d26979 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -15,11 +15,11 @@ import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior; import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior; import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior; +import io.camunda.zeebe.engine.processing.bpmn.behavior.OutputCollectionBehavior; import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; import io.camunda.zeebe.msgpack.spec.MsgPackHelper; -import io.camunda.zeebe.msgpack.spec.MsgPackReader; import io.camunda.zeebe.msgpack.spec.MsgPackWriter; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.ErrorType; @@ -28,7 +28,6 @@ import java.util.List; import java.util.Optional; import org.agrona.DirectBuffer; -import org.agrona.ExpandableArrayBuffer; import org.agrona.MutableDirectBuffer; import org.agrona.concurrent.UnsafeBuffer; @@ -350,105 +349,4 @@ private Either satisfiesCompletionCondition( } return Either.right(false); } - - private static final class OutputCollectionBehavior { - - private final MsgPackReader variableReader = new MsgPackReader(); - private final MsgPackWriter variableWriter = new MsgPackWriter(); - private final ExpandableArrayBuffer variableBuffer = new ExpandableArrayBuffer(); - private final DirectBuffer resultBuffer = new UnsafeBuffer(0, 0); - - private final BpmnStateBehavior stateBehavior; - private final ExpressionProcessor expressionProcessor; - - private OutputCollectionBehavior( - final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) { - this.stateBehavior = stateBehavior; - this.expressionProcessor = expressionProcessor; - } - - private void initializeOutputCollection( - final BpmnElementContext context, final DirectBuffer variableName, final int size) { - - variableWriter.wrap(variableBuffer, 0); - - // initialize the array with nil - variableWriter.writeArrayHeader(size); - for (var i = 0; i < size; i++) { - variableWriter.writeNil(); - } - - final var length = variableWriter.getOffset(); - - stateBehavior.setLocalVariable(context, variableName, variableBuffer, 0, length); - } - - private Either updateOutputCollection( - final ExecutableMultiInstanceBody element, - final BpmnElementContext childContext, - final BpmnElementContext flowScopeContext) { - - return element - .getLoopCharacteristics() - .getOutputCollection() - .map( - variableName -> - updateOutputCollection(element, childContext, flowScopeContext, variableName)) - .orElse(Either.right(null)); - } - - private Either updateOutputCollection( - final ExecutableMultiInstanceBody element, - final BpmnElementContext childContext, - final BpmnElementContext flowScopeContext, - final DirectBuffer variableName) { - - final var loopCounter = - stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter(); - - return readOutputElementVariable(element, childContext) - .map( - elementVariable -> { - // we need to read the output element variable before the current collection - // is read, because readOutputElementVariable(Context) uses the same - // buffer as getVariableLocal this could also be avoided by cloning the current - // collection, but that is slower. - final var currentCollection = - stateBehavior.getLocalVariable(flowScopeContext, variableName); - final var updatedCollection = - insertAt(currentCollection, loopCounter, elementVariable); - stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); - - return null; - }); - } - - private Either readOutputElementVariable( - final ExecutableMultiInstanceBody element, final BpmnElementContext context) { - final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow(); - return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey()); - } - - private DirectBuffer insertAt( - final DirectBuffer array, final int index, final DirectBuffer element) { - - variableReader.wrap(array, 0, array.capacity()); - variableReader.readArrayHeader(); - variableReader.skipValues((long) index - 1L); - - final var offsetBefore = variableReader.getOffset(); - variableReader.skipValue(); - final var offsetAfter = variableReader.getOffset(); - - variableWriter.wrap(variableBuffer, 0); - variableWriter.writeRaw(array, 0, offsetBefore); - variableWriter.writeRaw(element); - variableWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter); - - final var length = variableWriter.getOffset(); - - resultBuffer.wrap(variableBuffer, 0, length); - return resultBuffer; - } - } } From 5791041955e405a9494b4a2bd8a2efed00c39742 Mon Sep 17 00:00:00 2001 From: pihme Date: Tue, 19 Apr 2022 13:44:41 +0200 Subject: [PATCH 04/14] refactor(engine): change construction of class --- .../engine/processing/bpmn/behavior/BpmnBehaviors.java | 2 ++ .../processing/bpmn/behavior/BpmnBehaviorsImpl.java | 9 +++++++++ .../bpmn/behavior/OutputCollectionBehavior.java | 2 +- .../bpmn/container/MultiInstanceBodyProcessor.java | 3 +-- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java index cd382b9d9c3a..80a1a651bd75 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java @@ -35,4 +35,6 @@ public interface BpmnBehaviors { BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior(); BpmnJobBehavior jobBehavior(); + + OutputCollectionBehavior outputCollectionBehavior(); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java index b4f2f76ce2bd..620cec3ab945 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java @@ -39,6 +39,8 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors { private final BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior; private final BpmnJobBehavior jobBehavior; + private final OutputCollectionBehavior outputCollectionBehavior; + public BpmnBehaviorsImpl( final ExpressionProcessor expressionBehavior, final SideEffects sideEffects, @@ -105,6 +107,8 @@ public BpmnBehaviorsImpl( stateBehavior, incidentBehavior, jobMetrics); + + outputCollectionBehavior = new OutputCollectionBehavior(stateBehavior, expressionBehavior()); } @Override @@ -166,4 +170,9 @@ public BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior() public BpmnJobBehavior jobBehavior() { return jobBehavior; } + + @Override + public OutputCollectionBehavior outputCollectionBehavior() { + return outputCollectionBehavior; + } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 276c62ff7f01..3cdf17093d61 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -28,7 +28,7 @@ public final class OutputCollectionBehavior { private final BpmnStateBehavior stateBehavior; private final ExpressionProcessor expressionProcessor; - public OutputCollectionBehavior( + OutputCollectionBehavior( final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) { this.stateBehavior = stateBehavior; this.expressionProcessor = expressionProcessor; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index b01b50d26979..8f9225edd065 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -56,8 +56,7 @@ public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) { stateBehavior = bpmnBehaviors.stateBehavior(); expressionBehavior = bpmnBehaviors.expressionBehavior(); incidentBehavior = bpmnBehaviors.incidentBehavior(); - outputCollectionBehavior = - new OutputCollectionBehavior(stateBehavior, bpmnBehaviors.expressionBehavior()); + outputCollectionBehavior = bpmnBehaviors.outputCollectionBehavior(); } @Override From c786b01b9b7832565194b695b7844696ef9e905b Mon Sep 17 00:00:00 2001 From: pihme Date: Tue, 19 Apr 2022 15:31:47 +0200 Subject: [PATCH 05/14] test(engine): implement test for #9143 --- engine/pom.xml | 6 + .../OutputCollectionBehaviorTest.java | 103 ++++++++++++++++++ .../org.mockito.plugins.MockMaker | 1 + 3 files changed, 110 insertions(+) create mode 100644 engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java create mode 100644 engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/engine/pom.xml b/engine/pom.xml index 9e7940be74a6..ce0f3af31f50 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -152,6 +152,12 @@ test + + org.mockito + mockito-junit-jupiter + test + + org.awaitility awaitility diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java new file mode 100644 index 000000000000..618ff399579b --- /dev/null +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java @@ -0,0 +1,103 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.bpmn.behavior; + +import static io.camunda.zeebe.util.buffer.BufferUtil.cloneBuffer; +import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.camunda.zeebe.el.Expression; +import io.camunda.zeebe.el.impl.StaticExpression; +import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext; +import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; +import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableLoopCharacteristics; +import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; +import io.camunda.zeebe.msgpack.spec.MsgPackWriter; +import io.camunda.zeebe.util.Either; +import java.util.Optional; +import org.agrona.DirectBuffer; +import org.agrona.ExpandableArrayBuffer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Answers; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class OutputCollectionBehaviorTest { + + @Test // regression test for #9143 + void shouldRaiseIncidentWhenOverwritingOutputCollectionWithLessElements() { + // given + final var collectionWithSize1 = createCollection(1); + final var elementToAdd = wrapString("element to add"); + final var indexThatIsOutOfBounds = 2; + final var outputElementsExpression = new StaticExpression("OUTPUT_COLLECTION"); + final var outputElementName = wrapString("OUTPUT_ELEMENT"); + final var loopCharacteristics = + createLoopCharacteristics(outputElementName, outputElementsExpression); + + final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS); + when(mockStateBehavior.getLocalVariable(any(), eq(outputElementName))) + .thenReturn(collectionWithSize1); + + final var mockExpressionProcessor = mock(ExpressionProcessor.class); + when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementsExpression), anyLong())) + .thenReturn(Either.right(elementToAdd)); + + final var mockElement = mock(ExecutableMultiInstanceBody.class); + when(mockElement.getLoopCharacteristics()).thenReturn(loopCharacteristics); + + final var mockChildContext = mock(BpmnElementContext.class); + when(mockStateBehavior.getElementInstance(mockChildContext).getMultiInstanceLoopCounter()) + .thenReturn(indexThatIsOutOfBounds); + + final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); + + // when + final var result = sut.updateOutputCollection(mockElement, mockChildContext, null); + + // then + assertThat(result.isLeft()).isTrue(); + + // final var failure = result.getLeft(); + // TODO ass assertions on failure + } + + private ExecutableLoopCharacteristics createLoopCharacteristics( + final DirectBuffer outputElementName, final Expression outputElementExpression) { + return new ExecutableLoopCharacteristics( + false, + Optional.empty(), + null, + Optional.empty(), + Optional.of(outputElementName), + Optional.of(outputElementExpression)); + } + + private DirectBuffer createCollection(final int size) { + final var writer = new MsgPackWriter(); + final var buffer = new ExpandableArrayBuffer(); + + writer.wrap(buffer, 0); + + // initialize the array with nil + writer.writeArrayHeader(size); + for (var i = 0; i < size; i++) { + writer.writeNil(); + } + + final var length = writer.getOffset(); + + return cloneBuffer(buffer, 0, length); + } +} diff --git a/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000000..1f0955d450f0 --- /dev/null +++ b/engine/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline From e42977f5e4d48cf6f9c804958597eb104b931f88 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 07:59:56 +0200 Subject: [PATCH 06/14] refactor(engine): rename method --- .../processing/bpmn/behavior/OutputCollectionBehavior.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 3cdf17093d61..5c1eb143c836 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -83,7 +83,7 @@ private Either updateOutputCollection( final var currentCollection = stateBehavior.getLocalVariable(flowScopeContext, variableName); final var updatedCollection = - insertAt(currentCollection, loopCounter, elementVariable); + replaceAt(currentCollection, loopCounter, elementVariable); stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); return null; @@ -96,7 +96,7 @@ private Either readOutputElementVariable( return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey()); } - private DirectBuffer insertAt( + private DirectBuffer replaceAt( final DirectBuffer array, final int index, final DirectBuffer element) { outputCollectionReader.wrap(array, 0, array.capacity()); From 036e1d9718831b451681055c3943f905f2f4f2a8 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 08:40:15 +0200 Subject: [PATCH 07/14] fix(engine): detect out of bounds access --- .../behavior/OutputCollectionBehavior.java | 42 ++++++++++++++----- .../OutputCollectionBehaviorTest.java | 18 ++++++-- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 5c1eb143c836..176fc031ca11 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -7,12 +7,15 @@ */ package io.camunda.zeebe.engine.processing.bpmn.behavior; +import static io.camunda.zeebe.util.buffer.BufferUtil.bufferAsString; + import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext; import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; import io.camunda.zeebe.msgpack.spec.MsgPackReader; import io.camunda.zeebe.msgpack.spec.MsgPackWriter; +import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.Either; import org.agrona.DirectBuffer; import org.agrona.ExpandableArrayBuffer; @@ -74,7 +77,7 @@ private Either updateOutputCollection( stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter(); return readOutputElementVariable(element, childContext) - .map( + .flatMap( elementVariable -> { // we need to read the output element variable before the current collection // is read, because readOutputElementVariable(Context) uses the same @@ -82,11 +85,18 @@ private Either updateOutputCollection( // collection, but that is slower. final var currentCollection = stateBehavior.getLocalVariable(flowScopeContext, variableName); - final var updatedCollection = - replaceAt(currentCollection, loopCounter, elementVariable); - stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection); - - return null; + return replaceAt( + currentCollection, + loopCounter, + elementVariable, + flowScopeContext.getFlowScopeKey(), + variableName) + .map( + updatedCollection -> { + stateBehavior.setLocalVariable( + flowScopeContext, variableName, updatedCollection); + return null; + }); }); } @@ -96,11 +106,23 @@ private Either readOutputElementVariable( return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey()); } - private DirectBuffer replaceAt( - final DirectBuffer array, final int index, final DirectBuffer element) { + private Either replaceAt( + final DirectBuffer array, + final int index, + final DirectBuffer element, + final long variableScopeKey, + final DirectBuffer variableName) { outputCollectionReader.wrap(array, 0, array.capacity()); - outputCollectionReader.readArrayHeader(); + final int size = outputCollectionReader.readArrayHeader(); + if (index > size) { + return Either.left( + new Failure( + "Unable to update item in output collection '%s' at position %d because the size of the collection is: %d. This happens when multiple BPMN elements write to the same variable." + .formatted(bufferAsString(variableName), index, size), + ErrorType.IO_MAPPING_ERROR, + variableScopeKey)); + } outputCollectionReader.skipValues((long) index - 1L); final var offsetBefore = outputCollectionReader.getOffset(); @@ -115,6 +137,6 @@ private DirectBuffer replaceAt( final var length = outputCollectionWriter.getOffset(); updatedOutputCollectionBuffer.wrap(outputCollectionBuffer, 0, length); - return updatedOutputCollectionBuffer; + return Either.right(updatedOutputCollectionBuffer); } } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java index 618ff399579b..f7c98f285ebb 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java @@ -23,6 +23,7 @@ import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableLoopCharacteristics; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; import io.camunda.zeebe.msgpack.spec.MsgPackWriter; +import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.Either; import java.util.Optional; import org.agrona.DirectBuffer; @@ -36,7 +37,7 @@ public class OutputCollectionBehaviorTest { @Test // regression test for #9143 - void shouldRaiseIncidentWhenOverwritingOutputCollectionWithLessElements() { + void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { // given final var collectionWithSize1 = createCollection(1); final var elementToAdd = wrapString("element to add"); @@ -45,6 +46,7 @@ void shouldRaiseIncidentWhenOverwritingOutputCollectionWithLessElements() { final var outputElementName = wrapString("OUTPUT_ELEMENT"); final var loopCharacteristics = createLoopCharacteristics(outputElementName, outputElementsExpression); + final var flowScopeContextKey = 12345L; final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS); when(mockStateBehavior.getLocalVariable(any(), eq(outputElementName))) @@ -61,16 +63,24 @@ void shouldRaiseIncidentWhenOverwritingOutputCollectionWithLessElements() { when(mockStateBehavior.getElementInstance(mockChildContext).getMultiInstanceLoopCounter()) .thenReturn(indexThatIsOutOfBounds); + final var mockFlowScopeContext = mock(BpmnElementContext.class); + when(mockFlowScopeContext.getFlowScopeKey()).thenReturn(flowScopeContextKey); + final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); // when - final var result = sut.updateOutputCollection(mockElement, mockChildContext, null); + final var result = + sut.updateOutputCollection(mockElement, mockChildContext, mockFlowScopeContext); // then assertThat(result.isLeft()).isTrue(); - // final var failure = result.getLeft(); - // TODO ass assertions on failure + final var failure = result.getLeft(); + assertThat(failure.getErrorType()).isEqualTo(ErrorType.IO_MAPPING_ERROR); + assertThat(failure.getMessage()) + .isEqualTo( + "Unable to update item in output collection 'OUTPUT_ELEMENT' at position 2 because the size of the collection is: 1. This happens when multiple BPMN elements write to the same variable."); + assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey); } private ExecutableLoopCharacteristics createLoopCharacteristics( From d97a5f3998a821f7e6ce14e253641cf11f30e2a4 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 10:32:13 +0200 Subject: [PATCH 08/14] refactor(engine): change error type --- .../processing/bpmn/behavior/OutputCollectionBehavior.java | 2 +- .../processing/bpmn/behavior/OutputCollectionBehaviorTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 176fc031ca11..8d9d85be8e33 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -120,7 +120,7 @@ private Either replaceAt( new Failure( "Unable to update item in output collection '%s' at position %d because the size of the collection is: %d. This happens when multiple BPMN elements write to the same variable." .formatted(bufferAsString(variableName), index, size), - ErrorType.IO_MAPPING_ERROR, + ErrorType.EXTRACT_VALUE_ERROR, variableScopeKey)); } outputCollectionReader.skipValues((long) index - 1L); diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java index f7c98f285ebb..c435a7e3a256 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java @@ -76,7 +76,7 @@ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { assertThat(result.isLeft()).isTrue(); final var failure = result.getLeft(); - assertThat(failure.getErrorType()).isEqualTo(ErrorType.IO_MAPPING_ERROR); + assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR); assertThat(failure.getMessage()) .isEqualTo( "Unable to update item in output collection 'OUTPUT_ELEMENT' at position 2 because the size of the collection is: 1. This happens when multiple BPMN elements write to the same variable."); From 8302ec9e51ddb3257c2558038cb628f71e789cc2 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 11:03:38 +0200 Subject: [PATCH 09/14] fix(engine): detect writing to wrong value type - also change variables in tests, which had been confused --- .../behavior/OutputCollectionBehavior.java | 13 ++- .../OutputCollectionBehaviorTest.java | 82 ++++++++++++++++--- 2 files changed, 84 insertions(+), 11 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 8d9d85be8e33..9aa00d1cb1af 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -14,6 +14,7 @@ import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; import io.camunda.zeebe.msgpack.spec.MsgPackReader; +import io.camunda.zeebe.msgpack.spec.MsgPackType; import io.camunda.zeebe.msgpack.spec.MsgPackWriter; import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.Either; @@ -114,7 +115,17 @@ private Either replaceAt( final DirectBuffer variableName) { outputCollectionReader.wrap(array, 0, array.capacity()); - final int size = outputCollectionReader.readArrayHeader(); + final var token = outputCollectionReader.readToken(); + if (token.getType() != MsgPackType.ARRAY) { + return Either.left( + new Failure( + "Unable to update item in output collection '%s' because the type of the variable is: %s. This happens when multiple BPMN elements write to the same variable." + .formatted(bufferAsString(variableName), token.getType()), + ErrorType.EXTRACT_VALUE_ERROR, + variableScopeKey)); + } + + final int size = token.getSize(); if (index > size) { return Either.left( new Failure( diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java index c435a7e3a256..81d7150c6f88 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java @@ -25,6 +25,7 @@ import io.camunda.zeebe.msgpack.spec.MsgPackWriter; import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.Either; +import io.camunda.zeebe.util.buffer.BufferUtil; import java.util.Optional; import org.agrona.DirectBuffer; import org.agrona.ExpandableArrayBuffer; @@ -40,20 +41,20 @@ public class OutputCollectionBehaviorTest { void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { // given final var collectionWithSize1 = createCollection(1); - final var elementToAdd = wrapString("element to add"); + final var elementToAdd = createMsgPackString("element to add"); final var indexThatIsOutOfBounds = 2; - final var outputElementsExpression = new StaticExpression("OUTPUT_COLLECTION"); - final var outputElementName = wrapString("OUTPUT_ELEMENT"); + final var outputElementExpression = new StaticExpression("dummy expression"); + final var outputCollectionName = wrapString("OUTPUT_COLLECTION"); final var loopCharacteristics = - createLoopCharacteristics(outputElementName, outputElementsExpression); + createLoopCharacteristics(outputCollectionName, outputElementExpression); final var flowScopeContextKey = 12345L; final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS); - when(mockStateBehavior.getLocalVariable(any(), eq(outputElementName))) + when(mockStateBehavior.getLocalVariable(any(), eq(outputCollectionName))) .thenReturn(collectionWithSize1); final var mockExpressionProcessor = mock(ExpressionProcessor.class); - when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementsExpression), anyLong())) + when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementExpression), anyLong())) .thenReturn(Either.right(elementToAdd)); final var mockElement = mock(ExecutableMultiInstanceBody.class); @@ -79,19 +80,66 @@ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR); assertThat(failure.getMessage()) .isEqualTo( - "Unable to update item in output collection 'OUTPUT_ELEMENT' at position 2 because the size of the collection is: 1. This happens when multiple BPMN elements write to the same variable."); + "Unable to update item in output collection 'OUTPUT_COLLECTION' at position 2 because the size of the collection is: 1. This happens when multiple BPMN elements write to the same variable."); + assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey); + } + + @Test + void shouldReturnFailureWhenWritingToOutputCollectionWhichIsNotArray() { + // given + final var unexpectedValueType = createMsgPackString("lorem ipsum"); + final var elementToAdd = createMsgPackString("element to add"); + final var index = 1; + final var outputElementExpression = new StaticExpression("dummy expression"); + final var outputCollectionName = wrapString("OUTPUT_COLLECTION"); + final var loopCharacteristics = + createLoopCharacteristics(outputCollectionName, outputElementExpression); + final var flowScopeContextKey = 12345L; + + final var mockStateBehavior = mock(BpmnStateBehavior.class, Answers.RETURNS_DEEP_STUBS); + when(mockStateBehavior.getLocalVariable(any(), eq(outputCollectionName))) + .thenReturn(unexpectedValueType); + + final var mockExpressionProcessor = mock(ExpressionProcessor.class); + when(mockExpressionProcessor.evaluateAnyExpression(eq(outputElementExpression), anyLong())) + .thenReturn(Either.right(elementToAdd)); + + final var mockElement = mock(ExecutableMultiInstanceBody.class); + when(mockElement.getLoopCharacteristics()).thenReturn(loopCharacteristics); + + final var mockChildContext = mock(BpmnElementContext.class); + when(mockStateBehavior.getElementInstance(mockChildContext).getMultiInstanceLoopCounter()) + .thenReturn(index); + + final var mockFlowScopeContext = mock(BpmnElementContext.class); + when(mockFlowScopeContext.getFlowScopeKey()).thenReturn(flowScopeContextKey); + + final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); + + // when + final var result = + sut.updateOutputCollection(mockElement, mockChildContext, mockFlowScopeContext); + + // then + assertThat(result.isLeft()).isTrue(); + + final var failure = result.getLeft(); + assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR); + assertThat(failure.getMessage()) + .isEqualTo( + "Unable to update item in output collection 'OUTPUT_COLLECTION' because the type of the variable is: STRING. This happens when multiple BPMN elements write to the same variable."); assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey); } private ExecutableLoopCharacteristics createLoopCharacteristics( - final DirectBuffer outputElementName, final Expression outputElementExpression) { + final DirectBuffer outputCollection, final Expression outputElement) { return new ExecutableLoopCharacteristics( false, Optional.empty(), null, Optional.empty(), - Optional.of(outputElementName), - Optional.of(outputElementExpression)); + Optional.of(outputCollection), + Optional.of(outputElement)); } private DirectBuffer createCollection(final int size) { @@ -110,4 +158,18 @@ private DirectBuffer createCollection(final int size) { return cloneBuffer(buffer, 0, length); } + + private DirectBuffer createMsgPackString(final String input) { + final var writer = new MsgPackWriter(); + final var buffer = new ExpandableArrayBuffer(); + + writer.wrap(buffer, 0); + + // initialize the array with nil + writer.writeString(BufferUtil.wrapString(input)); + + final var length = writer.getOffset(); + + return cloneBuffer(buffer, 0, length); + } } From 7765c5ce8cb152308fbc9ba090a3e1b8640010a7 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 12:01:45 +0200 Subject: [PATCH 10/14] test(engine): test incident is raised/resolvable for out of bounds access to outbound collection --- .../behavior/OutputCollectionBehavior.java | 2 +- .../incident/MultiInstanceIncidentTest.java | 106 ++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java index 9aa00d1cb1af..be8297bea571 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java @@ -90,7 +90,7 @@ private Either updateOutputCollection( currentCollection, loopCounter, elementVariable, - flowScopeContext.getFlowScopeKey(), + flowScopeContext.getElementInstanceKey(), variableName) .map( updatedCollection -> { diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java index 4efa2f8d57ca..2d9a58b6a97d 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java @@ -16,6 +16,7 @@ import io.camunda.zeebe.engine.util.Records; import io.camunda.zeebe.engine.util.client.IncidentClient.ResolveIncidentClient; import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; import io.camunda.zeebe.protocol.record.Assertions; import io.camunda.zeebe.protocol.record.Record; @@ -627,6 +628,111 @@ public void shouldResolveIncidentDueToCompletionCondition() { .isTrue(); } + @Test // regression test for #9143 + /* + This test covers the scenario where a multi instance cannot be completed, because updating the + output collection fails due to an index out of bounds. The index out of bounds is caused + because a) the output collection is initialized with the cardinality of the multi instance when + the multi instance is activated, and b) the collection is modified and shrunk to a smaller + size. + */ + public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollection() { + // given + final var processId = "index-out-of-bounds-in-output-collection"; + final var collectionWithThreeElements = "=[1,2,3]"; + final var collectionWithNoElements = "=[]"; + final var outputCollectionName = "outputItems"; + + final var process = + createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + processId, collectionWithThreeElements, collectionWithNoElements, outputCollectionName); + + ENGINE.deployment().withXmlResource(process).deploy(); + + // when + final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); + + // then + final Record incidentEvent = + RecordingExporter.incidentRecords(IncidentIntent.CREATED) + .withProcessInstanceKey(processInstanceKey) + .getFirst(); + + Assertions.assertThat(incidentEvent.getValue()) + .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR) + .hasErrorMessage( + "Unable to update item in output collection 'outputItems' at position 1 because the size of the collection is: 0. This happens when multiple BPMN elements write to the same variable.") + .hasProcessInstanceKey(processInstanceKey); + } + + @Test + public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCollection() { + // given + final var processId = "index-out-of-bounds-in-output-collection"; + final var collectionWithOneElement = "=[1]"; + final var collectionWithNoElements = "=[]"; + final var outputCollectionName = "outputItems"; + + final var process = + createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + processId, collectionWithOneElement, collectionWithNoElements, outputCollectionName); + + ENGINE.deployment().withXmlResource(process).deploy(); + final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); + + final Record incidentEvent = + RecordingExporter.incidentRecords(IncidentIntent.CREATED) + .withProcessInstanceKey(processInstanceKey) + .getFirst(); + // when + ENGINE + .variables() + .ofScope(incidentEvent.getValue().getVariableScopeKey()) + .withDocument(Maps.of(entry(outputCollectionName, List.of(1)))) + .update(); + ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve(); + + // then the process is able to complete + assertThat( + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) + .withElementType(BpmnElementType.PROCESS) + .withProcessInstanceKey(processInstanceKey) + .limitToProcessInstanceCompleted() + .exists()) + .describedAs("the process has completed") + .isTrue(); + } + + private BpmnModelInstance createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + final String processId, + final String collectionWithThreeElements, + final String collectionWithNoElements, + final String outputCollectionName) { + return Bpmn.createExecutableProcess(processId) + .startEvent() + .zeebeOutput( + collectionWithThreeElements, // initializes input collection with three elements + INPUT_COLLECTION) + .subProcess() + .multiInstance( + mi -> + mi.parallel() + .zeebeInputCollectionExpression(INPUT_COLLECTION) + .zeebeInputElement(INPUT_ELEMENT) + .zeebeOutputCollection( + outputCollectionName) // initialize output collection with three elements + .zeebeOutputElementExpression(INPUT_ELEMENT)) + .embeddedSubProcess() + .startEvent() + .zeebeOutput( + collectionWithNoElements, + outputCollectionName) // overwrite output collection with empty list + .endEvent() + .subProcessDone() + .endEvent() + .done(); + } + private static void completeNthJob(final long processInstanceKey, final int n) { final var nthJob = findNthJob(processInstanceKey, n); ENGINE.job().withKey(nthJob.getKey()).complete(); From 76f0fa9cdc3d7007c81fbceb14d202886f2c0f7e Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 12:12:41 +0200 Subject: [PATCH 11/14] test(engine): test incident is raised/resolvable for outbound collection not an array --- .../incident/MultiInstanceIncidentTest.java | 105 +++++++++++++++--- 1 file changed, 88 insertions(+), 17 deletions(-) diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java index 2d9a58b6a97d..1af90e1d34fa 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java @@ -628,14 +628,14 @@ public void shouldResolveIncidentDueToCompletionCondition() { .isTrue(); } + /** + * This test covers the scenario where a multi instance cannot be completed, because updating the + * output collection fails due to an index out of bounds. The index out of bounds is caused + * because a) the output collection is initialized with the cardinality of the multi instance when + * the multi instance is activated, and b) the collection is modified and shrunk to a smaller + * size. + */ @Test // regression test for #9143 - /* - This test covers the scenario where a multi instance cannot be completed, because updating the - output collection fails due to an index out of bounds. The index out of bounds is caused - because a) the output collection is initialized with the cardinality of the multi instance when - the multi instance is activated, and b) the collection is modified and shrunk to a smaller - size. - */ public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollection() { // given final var processId = "index-out-of-bounds-in-output-collection"; @@ -644,7 +644,7 @@ public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollectio final var outputCollectionName = "outputItems"; final var process = - createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + createProcessThatModifiesOutputCollection( processId, collectionWithThreeElements, collectionWithNoElements, outputCollectionName); ENGINE.deployment().withXmlResource(process).deploy(); @@ -674,7 +674,7 @@ public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCo final var outputCollectionName = "outputItems"; final var process = - createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + createProcessThatModifiesOutputCollection( processId, collectionWithOneElement, collectionWithNoElements, outputCollectionName); ENGINE.deployment().withXmlResource(process).deploy(); @@ -703,15 +703,87 @@ public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCo .isTrue(); } - private BpmnModelInstance createProcessThatCausesIndexOutOfBoundsAccessToOutputCollection( + /** + * This test covers the scenario where a multi instance cannot be completed, because updating the + * output collection fails because the output collection is not an array. + */ + @Test + public void shouldCreateIncidentIfOutputCollectionHasWrongType() { + // given + final var processId = "output-collection-is-overwritten-by-string"; + final var collectionWithThreeElements = "=[1,2,3]"; + final var overwriteWithString = "=\"String overwrite\""; + final var outputCollectionName = "outputItems"; + + final var process = + createProcessThatModifiesOutputCollection( + processId, collectionWithThreeElements, overwriteWithString, outputCollectionName); + + ENGINE.deployment().withXmlResource(process).deploy(); + + // when + final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); + + // then + final Record incidentEvent = + RecordingExporter.incidentRecords(IncidentIntent.CREATED) + .withProcessInstanceKey(processInstanceKey) + .getFirst(); + + Assertions.assertThat(incidentEvent.getValue()) + .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR) + .hasErrorMessage( + "Unable to update item in output collection 'outputItems' because the type of the variable is: STRING. This happens when multiple BPMN elements write to the same variable.") + .hasProcessInstanceKey(processInstanceKey); + } + + @Test + public void shouldResolveIncidentRaisedByOutputCollectionHasWrongType() { + // given + final var processId = "output-collection-is-overwritten-by-string"; + final var collectionWithOneElement = "=[1]"; + final var overwriteWithString = "=\"String overwrite\""; + final var outputCollectionName = "outputItems"; + + final var process = + createProcessThatModifiesOutputCollection( + processId, collectionWithOneElement, overwriteWithString, outputCollectionName); + + ENGINE.deployment().withXmlResource(process).deploy(); + final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); + + final Record incidentEvent = + RecordingExporter.incidentRecords(IncidentIntent.CREATED) + .withProcessInstanceKey(processInstanceKey) + .getFirst(); + // when + ENGINE + .variables() + .ofScope(incidentEvent.getValue().getVariableScopeKey()) + .withDocument(Maps.of(entry(outputCollectionName, List.of(1)))) + .update(); + ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve(); + + // then the process is able to complete + assertThat( + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) + .withElementType(BpmnElementType.PROCESS) + .withProcessInstanceKey(processInstanceKey) + .limitToProcessInstanceCompleted() + .exists()) + .describedAs("the process has completed") + .isTrue(); + } + + private BpmnModelInstance createProcessThatModifiesOutputCollection( final String processId, - final String collectionWithThreeElements, - final String collectionWithNoElements, + final String initialValueForCollection, + final String overwrittenValue, final String outputCollectionName) { return Bpmn.createExecutableProcess(processId) .startEvent() .zeebeOutput( - collectionWithThreeElements, // initializes input collection with three elements + initialValueForCollection, // initializes input collection INPUT_COLLECTION) .subProcess() .multiInstance( @@ -720,13 +792,12 @@ private BpmnModelInstance createProcessThatCausesIndexOutOfBoundsAccessToOutputC .zeebeInputCollectionExpression(INPUT_COLLECTION) .zeebeInputElement(INPUT_ELEMENT) .zeebeOutputCollection( - outputCollectionName) // initialize output collection with three elements + outputCollectionName) // initialize output collection with size in input + // collection .zeebeOutputElementExpression(INPUT_ELEMENT)) .embeddedSubProcess() .startEvent() - .zeebeOutput( - collectionWithNoElements, - outputCollectionName) // overwrite output collection with empty list + .zeebeOutput(overwrittenValue, outputCollectionName) // overwrite output collection .endEvent() .subProcessDone() .endEvent() From 594787ea01a840674b265c6e5a47d51cd5b74f41 Mon Sep 17 00:00:00 2001 From: pihme Date: Wed, 20 Apr 2022 13:18:44 +0200 Subject: [PATCH 12/14] fixup --- .../bpmn/behavior/OutputCollectionBehaviorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java index 81d7150c6f88..ca3f13d988ce 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java @@ -65,7 +65,7 @@ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { .thenReturn(indexThatIsOutOfBounds); final var mockFlowScopeContext = mock(BpmnElementContext.class); - when(mockFlowScopeContext.getFlowScopeKey()).thenReturn(flowScopeContextKey); + when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey); final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); @@ -112,7 +112,7 @@ void shouldReturnFailureWhenWritingToOutputCollectionWhichIsNotArray() { .thenReturn(index); final var mockFlowScopeContext = mock(BpmnElementContext.class); - when(mockFlowScopeContext.getFlowScopeKey()).thenReturn(flowScopeContextKey); + when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey); final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); From 310053cf0b06bafa9529dae384577bc3b5d44466 Mon Sep 17 00:00:00 2001 From: pihme Date: Mon, 25 Apr 2022 19:13:39 +0200 Subject: [PATCH 13/14] refactor(engine): rename class --- .../processing/bpmn/behavior/BpmnBehaviors.java | 2 +- .../processing/bpmn/behavior/BpmnBehaviorsImpl.java | 9 +++++---- ...ava => MultiInstanceOutputCollectionBehavior.java} | 4 ++-- .../bpmn/container/MultiInstanceBodyProcessor.java | 11 ++++++----- ...=> MultiInstanceOutputCollectionBehaviorTest.java} | 8 +++++--- 5 files changed, 19 insertions(+), 15 deletions(-) rename engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/{OutputCollectionBehavior.java => MultiInstanceOutputCollectionBehavior.java} (98%) rename engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/{OutputCollectionBehaviorTest.java => MultiInstanceOutputCollectionBehaviorTest.java} (96%) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java index 80a1a651bd75..d558eef9ccbb 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviors.java @@ -36,5 +36,5 @@ public interface BpmnBehaviors { BpmnJobBehavior jobBehavior(); - OutputCollectionBehavior outputCollectionBehavior(); + MultiInstanceOutputCollectionBehavior outputCollectionBehavior(); } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java index 620cec3ab945..2895daf33848 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnBehaviorsImpl.java @@ -39,7 +39,7 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors { private final BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior; private final BpmnJobBehavior jobBehavior; - private final OutputCollectionBehavior outputCollectionBehavior; + private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior; public BpmnBehaviorsImpl( final ExpressionProcessor expressionBehavior, @@ -108,7 +108,8 @@ public BpmnBehaviorsImpl( incidentBehavior, jobMetrics); - outputCollectionBehavior = new OutputCollectionBehavior(stateBehavior, expressionBehavior()); + multiInstanceOutputCollectionBehavior = + new MultiInstanceOutputCollectionBehavior(stateBehavior, expressionBehavior()); } @Override @@ -172,7 +173,7 @@ public BpmnJobBehavior jobBehavior() { } @Override - public OutputCollectionBehavior outputCollectionBehavior() { - return outputCollectionBehavior; + public MultiInstanceOutputCollectionBehavior outputCollectionBehavior() { + return multiInstanceOutputCollectionBehavior; } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java similarity index 98% rename from engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java rename to engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java index be8297bea571..8a3d39dc8623 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java @@ -22,7 +22,7 @@ import org.agrona.ExpandableArrayBuffer; import org.agrona.concurrent.UnsafeBuffer; -public final class OutputCollectionBehavior { +public final class MultiInstanceOutputCollectionBehavior { private final MsgPackReader outputCollectionReader = new MsgPackReader(); private final MsgPackWriter outputCollectionWriter = new MsgPackWriter(); @@ -32,7 +32,7 @@ public final class OutputCollectionBehavior { private final BpmnStateBehavior stateBehavior; private final ExpressionProcessor expressionProcessor; - OutputCollectionBehavior( + MultiInstanceOutputCollectionBehavior( final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) { this.stateBehavior = stateBehavior; this.expressionProcessor = expressionProcessor; diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index 8f9225edd065..0d7439d2946d 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -15,7 +15,7 @@ import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior; import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior; import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior; -import io.camunda.zeebe.engine.processing.bpmn.behavior.OutputCollectionBehavior; +import io.camunda.zeebe.engine.processing.bpmn.behavior.MultiInstanceOutputCollectionBehavior; import io.camunda.zeebe.engine.processing.common.ExpressionProcessor; import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; @@ -48,7 +48,7 @@ public final class MultiInstanceBodyProcessor private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior; private final BpmnStateBehavior stateBehavior; private final BpmnIncidentBehavior incidentBehavior; - private final OutputCollectionBehavior outputCollectionBehavior; + private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior; public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) { stateTransitionBehavior = bpmnBehaviors.stateTransitionBehavior(); @@ -56,7 +56,7 @@ public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) { stateBehavior = bpmnBehaviors.stateBehavior(); expressionBehavior = bpmnBehaviors.expressionBehavior(); incidentBehavior = bpmnBehaviors.incidentBehavior(); - outputCollectionBehavior = bpmnBehaviors.outputCollectionBehavior(); + multiInstanceOutputCollectionBehavior = bpmnBehaviors.outputCollectionBehavior(); } @Override @@ -149,7 +149,8 @@ public void onTerminate( final BpmnElementContext flowScopeContext, final BpmnElementContext childContext) { final var updatedOrFailure = - outputCollectionBehavior.updateOutputCollection(element, childContext, flowScopeContext); + multiInstanceOutputCollectionBehavior.updateOutputCollection( + element, childContext, flowScopeContext); if (updatedOrFailure.isLeft()) { return updatedOrFailure; } @@ -246,7 +247,7 @@ private void activate( .getOutputCollection() .ifPresent( variableName -> - outputCollectionBehavior.initializeOutputCollection( + multiInstanceOutputCollectionBehavior.initializeOutputCollection( activated, variableName, inputCollection.size())); if (inputCollection.isEmpty()) { diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java similarity index 96% rename from engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java rename to engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java index ca3f13d988ce..54354bc25e05 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/OutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java @@ -35,7 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class OutputCollectionBehaviorTest { +public class MultiInstanceOutputCollectionBehaviorTest { @Test // regression test for #9143 void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { @@ -67,7 +67,8 @@ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { final var mockFlowScopeContext = mock(BpmnElementContext.class); when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey); - final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); + final var sut = + new MultiInstanceOutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); // when final var result = @@ -114,7 +115,8 @@ void shouldReturnFailureWhenWritingToOutputCollectionWhichIsNotArray() { final var mockFlowScopeContext = mock(BpmnElementContext.class); when(mockFlowScopeContext.getElementInstanceKey()).thenReturn(flowScopeContextKey); - final var sut = new OutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); + final var sut = + new MultiInstanceOutputCollectionBehavior(mockStateBehavior, mockExpressionProcessor); // when final var result = From 84a5c24b70e11a86720adae5d735da286a627991 Mon Sep 17 00:00:00 2001 From: pihme Date: Mon, 25 Apr 2022 19:36:36 +0200 Subject: [PATCH 14/14] refactor(engine): incorporate review feedback --- ...MultiInstanceOutputCollectionBehavior.java | 49 +++++++---- ...iInstanceOutputCollectionBehaviorTest.java | 12 ++- .../incident/MultiInstanceIncidentTest.java | 83 +++++++------------ 3 files changed, 72 insertions(+), 72 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java index 8a3d39dc8623..839d19f92560 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehavior.java @@ -14,10 +14,12 @@ import io.camunda.zeebe.engine.processing.common.Failure; import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; import io.camunda.zeebe.msgpack.spec.MsgPackReader; +import io.camunda.zeebe.msgpack.spec.MsgPackToken; import io.camunda.zeebe.msgpack.spec.MsgPackType; import io.camunda.zeebe.msgpack.spec.MsgPackWriter; import io.camunda.zeebe.protocol.record.value.ErrorType; import io.camunda.zeebe.util.Either; +import java.util.Optional; import org.agrona.DirectBuffer; import org.agrona.ExpandableArrayBuffer; import org.agrona.concurrent.UnsafeBuffer; @@ -116,24 +118,13 @@ private Either replaceAt( outputCollectionReader.wrap(array, 0, array.capacity()); final var token = outputCollectionReader.readToken(); - if (token.getType() != MsgPackType.ARRAY) { - return Either.left( - new Failure( - "Unable to update item in output collection '%s' because the type of the variable is: %s. This happens when multiple BPMN elements write to the same variable." - .formatted(bufferAsString(variableName), token.getType()), - ErrorType.EXTRACT_VALUE_ERROR, - variableScopeKey)); - } - final int size = token.getSize(); - if (index > size) { - return Either.left( - new Failure( - "Unable to update item in output collection '%s' at position %d because the size of the collection is: %d. This happens when multiple BPMN elements write to the same variable." - .formatted(bufferAsString(variableName), index, size), - ErrorType.EXTRACT_VALUE_ERROR, - variableScopeKey)); + final var optValidationFailure = + validateIsCollectionAndHasAppropriateSIze(index, variableScopeKey, variableName, token); + if (optValidationFailure.isPresent()) { + return Either.left(optValidationFailure.get()); } + outputCollectionReader.skipValues((long) index - 1L); final var offsetBefore = outputCollectionReader.getOffset(); @@ -150,4 +141,30 @@ private Either replaceAt( updatedOutputCollectionBuffer.wrap(outputCollectionBuffer, 0, length); return Either.right(updatedOutputCollectionBuffer); } + + private Optional validateIsCollectionAndHasAppropriateSIze( + final int index, + final long variableScopeKey, + final DirectBuffer variableName, + final MsgPackToken token) { + if (token.getType() != MsgPackType.ARRAY) { + return Optional.of( + new Failure( + "Unable to update an item in output collection '%s' because the type of the output collection is: %s. This may happen when multiple BPMN elements write to the same variable." + .formatted(bufferAsString(variableName), token.getType()), + ErrorType.EXTRACT_VALUE_ERROR, + variableScopeKey)); + } + + final int size = token.getSize(); + if (index > size) { + return Optional.of( + new Failure( + "Unable to update an item in output collection '%s' at position %d because the size of the collection is: %d. This may happen when multiple BPMN elements write to the same variable." + .formatted(bufferAsString(variableName), index, size), + ErrorType.EXTRACT_VALUE_ERROR, + variableScopeKey)); + } + return Optional.empty(); + } } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java index 54354bc25e05..da5bef8efe9e 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/behavior/MultiInstanceOutputCollectionBehaviorTest.java @@ -36,6 +36,14 @@ @ExtendWith(MockitoExtension.class) public class MultiInstanceOutputCollectionBehaviorTest { + /* + * Note: this test class caused some controversy between the author and the reviewer. The reviewer + * feared that it will too costly to maintain and doesn't provide much value in return. The author + * felt it will be alright. We didn't manage to come to an agreement, so we leave it to you, future + * developer. If you had to maintain the test and feel it was too cumbersome and didn't add value, + * feel free to remove this test. If you felt maintaining it was ok, and the test does have value, + * feel free to remove this comment. + */ @Test // regression test for #9143 void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { @@ -81,7 +89,7 @@ void shouldReturnFailureWhenWritingToOutputCollectionOutOfBounds() { assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR); assertThat(failure.getMessage()) .isEqualTo( - "Unable to update item in output collection 'OUTPUT_COLLECTION' at position 2 because the size of the collection is: 1. This happens when multiple BPMN elements write to the same variable."); + "Unable to update an item in output collection 'OUTPUT_COLLECTION' at position 2 because the size of the collection is: 1. This may happen when multiple BPMN elements write to the same variable."); assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey); } @@ -129,7 +137,7 @@ void shouldReturnFailureWhenWritingToOutputCollectionWhichIsNotArray() { assertThat(failure.getErrorType()).isEqualTo(ErrorType.EXTRACT_VALUE_ERROR); assertThat(failure.getMessage()) .isEqualTo( - "Unable to update item in output collection 'OUTPUT_COLLECTION' because the type of the variable is: STRING. This happens when multiple BPMN elements write to the same variable."); + "Unable to update an item in output collection 'OUTPUT_COLLECTION' because the type of the output collection is: STRING. This may happen when multiple BPMN elements write to the same variable."); assertThat(failure.getVariableScopeKey()).isEqualTo(flowScopeContextKey); } diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java index 1af90e1d34fa..b2542a9d969f 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/incident/MultiInstanceIncidentTest.java @@ -636,10 +636,10 @@ public void shouldResolveIncidentDueToCompletionCondition() { * size. */ @Test // regression test for #9143 - public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollection() { + public void shouldCreateAndResolveIncidentIfOutputElementCannotBeReplacedInOutputCollection() { // given final var processId = "index-out-of-bounds-in-output-collection"; - final var collectionWithThreeElements = "=[1,2,3]"; + final var collectionWithThreeElements = "=[1]"; final var collectionWithNoElements = "=[]"; final var outputCollectionName = "outputItems"; @@ -649,10 +649,10 @@ public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollectio ENGINE.deployment().withXmlResource(process).deploy(); - // when + // when (raise incident) final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); - // then + // then (incident is raised) final Record incidentEvent = RecordingExporter.incidentRecords(IncidentIntent.CREATED) .withProcessInstanceKey(processInstanceKey) @@ -661,30 +661,10 @@ public void shouldCreateIncidentIfOutputElementCannotBeReplacedInOutputCollectio Assertions.assertThat(incidentEvent.getValue()) .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR) .hasErrorMessage( - "Unable to update item in output collection 'outputItems' at position 1 because the size of the collection is: 0. This happens when multiple BPMN elements write to the same variable.") + "Unable to update an item in output collection 'outputItems' at position 1 because the size of the collection is: 0. This may happen when multiple BPMN elements write to the same variable.") .hasProcessInstanceKey(processInstanceKey); - } - @Test - public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCollection() { - // given - final var processId = "index-out-of-bounds-in-output-collection"; - final var collectionWithOneElement = "=[1]"; - final var collectionWithNoElements = "=[]"; - final var outputCollectionName = "outputItems"; - - final var process = - createProcessThatModifiesOutputCollection( - processId, collectionWithOneElement, collectionWithNoElements, outputCollectionName); - - ENGINE.deployment().withXmlResource(process).deploy(); - final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); - - final Record incidentEvent = - RecordingExporter.incidentRecords(IncidentIntent.CREATED) - .withProcessInstanceKey(processInstanceKey) - .getFirst(); - // when + // when (resolve incident) ENGINE .variables() .ofScope(incidentEvent.getValue().getVariableScopeKey()) @@ -692,7 +672,7 @@ public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCo .update(); ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve(); - // then the process is able to complete + // then (incident is resolved) assertThat( RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) .withElementType(BpmnElementType.PROCESS) @@ -701,6 +681,13 @@ public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCo .exists()) .describedAs("the process has completed") .isTrue(); + org.assertj.core.api.Assertions.assertThat( + RecordingExporter.variableRecords(VariableIntent.UPDATED) + .withProcessInstanceKey(processInstanceKey) + .withName(outputCollectionName) + .withValue("[1]") + .exists()) + .isTrue(); } /** @@ -708,10 +695,10 @@ public void shouldResolveIncidentRaisedByOutputElementCannotBeReplacedInOutputCo * output collection fails because the output collection is not an array. */ @Test - public void shouldCreateIncidentIfOutputCollectionHasWrongType() { + public void shouldCreateAndResolveIncidentIfOutputCollectionHasWrongType() { // given final var processId = "output-collection-is-overwritten-by-string"; - final var collectionWithThreeElements = "=[1,2,3]"; + final var collectionWithThreeElements = "=[1]"; final var overwriteWithString = "=\"String overwrite\""; final var outputCollectionName = "outputItems"; @@ -721,10 +708,10 @@ public void shouldCreateIncidentIfOutputCollectionHasWrongType() { ENGINE.deployment().withXmlResource(process).deploy(); - // when + // when (raise incident) final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); - // then + // then (incident is raised) final Record incidentEvent = RecordingExporter.incidentRecords(IncidentIntent.CREATED) .withProcessInstanceKey(processInstanceKey) @@ -733,30 +720,10 @@ public void shouldCreateIncidentIfOutputCollectionHasWrongType() { Assertions.assertThat(incidentEvent.getValue()) .hasErrorType(ErrorType.EXTRACT_VALUE_ERROR) .hasErrorMessage( - "Unable to update item in output collection 'outputItems' because the type of the variable is: STRING. This happens when multiple BPMN elements write to the same variable.") + "Unable to update an item in output collection 'outputItems' because the type of the output collection is: STRING. This may happen when multiple BPMN elements write to the same variable.") .hasProcessInstanceKey(processInstanceKey); - } - - @Test - public void shouldResolveIncidentRaisedByOutputCollectionHasWrongType() { - // given - final var processId = "output-collection-is-overwritten-by-string"; - final var collectionWithOneElement = "=[1]"; - final var overwriteWithString = "=\"String overwrite\""; - final var outputCollectionName = "outputItems"; - - final var process = - createProcessThatModifiesOutputCollection( - processId, collectionWithOneElement, overwriteWithString, outputCollectionName); - - ENGINE.deployment().withXmlResource(process).deploy(); - final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(processId).create(); - final Record incidentEvent = - RecordingExporter.incidentRecords(IncidentIntent.CREATED) - .withProcessInstanceKey(processInstanceKey) - .getFirst(); - // when + // when (resolve incident) ENGINE .variables() .ofScope(incidentEvent.getValue().getVariableScopeKey()) @@ -764,7 +731,7 @@ public void shouldResolveIncidentRaisedByOutputCollectionHasWrongType() { .update(); ENGINE.incident().ofInstance(processInstanceKey).withKey(incidentEvent.getKey()).resolve(); - // then the process is able to complete + // then (incident is resolved) assertThat( RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) .withElementType(BpmnElementType.PROCESS) @@ -773,6 +740,14 @@ public void shouldResolveIncidentRaisedByOutputCollectionHasWrongType() { .exists()) .describedAs("the process has completed") .isTrue(); + + org.assertj.core.api.Assertions.assertThat( + RecordingExporter.variableRecords(VariableIntent.UPDATED) + .withProcessInstanceKey(processInstanceKey) + .withName(outputCollectionName) + .withValue("[1]") + .exists()) + .isTrue(); } private BpmnModelInstance createProcessThatModifiesOutputCollection(