Skip to content

Commit

Permalink
merge: #9178
Browse files Browse the repository at this point in the history
9178: Fix ArrayOutOfBounds / collection has wrong type errors by raising an incident r=pihme a=pihme

## Description

* Move updating of outbound collection into behavior class
* Some refactorings (mostly renames)
* Check for index out of bounds, implement tests for it
* Check for outbound collection is not an array, implement tests for it

## Related issues

closes #9143



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Apr 26, 2022
2 parents b0d7781 + 84a5c24 commit 8c8974a
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 95 deletions.
6 changes: 6 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public interface BpmnBehaviors {
BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior();

BpmnJobBehavior jobBehavior();

MultiInstanceOutputCollectionBehavior outputCollectionBehavior();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public final class BpmnBehaviorsImpl implements BpmnBehaviors {
private final BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior;
private final BpmnJobBehavior jobBehavior;

private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior;

public BpmnBehaviorsImpl(
final ExpressionProcessor expressionBehavior,
final SideEffects sideEffects,
Expand Down Expand Up @@ -105,6 +107,9 @@ public BpmnBehaviorsImpl(
stateBehavior,
incidentBehavior,
jobMetrics);

multiInstanceOutputCollectionBehavior =
new MultiInstanceOutputCollectionBehavior(stateBehavior, expressionBehavior());
}

@Override
Expand Down Expand Up @@ -166,4 +171,9 @@ public BpmnBufferedMessageStartEventBehavior bufferedMessageStartEventBehavior()
public BpmnJobBehavior jobBehavior() {
return jobBehavior;
}

@Override
public MultiInstanceOutputCollectionBehavior outputCollectionBehavior() {
return multiInstanceOutputCollectionBehavior;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.bpmn.behavior;

import static io.camunda.zeebe.util.buffer.BufferUtil.bufferAsString;

import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
import io.camunda.zeebe.msgpack.spec.MsgPackReader;
import io.camunda.zeebe.msgpack.spec.MsgPackToken;
import io.camunda.zeebe.msgpack.spec.MsgPackType;
import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
import io.camunda.zeebe.protocol.record.value.ErrorType;
import io.camunda.zeebe.util.Either;
import java.util.Optional;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.concurrent.UnsafeBuffer;

public final class MultiInstanceOutputCollectionBehavior {

private final MsgPackReader outputCollectionReader = new MsgPackReader();
private final MsgPackWriter outputCollectionWriter = new MsgPackWriter();
private final ExpandableArrayBuffer outputCollectionBuffer = new ExpandableArrayBuffer();
private final DirectBuffer updatedOutputCollectionBuffer = new UnsafeBuffer(0, 0);

private final BpmnStateBehavior stateBehavior;
private final ExpressionProcessor expressionProcessor;

MultiInstanceOutputCollectionBehavior(
final BpmnStateBehavior stateBehavior, final ExpressionProcessor expressionProcessor) {
this.stateBehavior = stateBehavior;
this.expressionProcessor = expressionProcessor;
}

public void initializeOutputCollection(
final BpmnElementContext context, final DirectBuffer variableName, final int size) {

outputCollectionWriter.wrap(outputCollectionBuffer, 0);

// initialize the array with nil
outputCollectionWriter.writeArrayHeader(size);
for (var i = 0; i < size; i++) {
outputCollectionWriter.writeNil();
}

final var length = outputCollectionWriter.getOffset();

stateBehavior.setLocalVariable(context, variableName, outputCollectionBuffer, 0, length);
}

public Either<Failure, Void> updateOutputCollection(
final ExecutableMultiInstanceBody element,
final BpmnElementContext childContext,
final BpmnElementContext flowScopeContext) {

return element
.getLoopCharacteristics()
.getOutputCollection()
.map(
variableName ->
updateOutputCollection(element, childContext, flowScopeContext, variableName))
.orElse(Either.right(null));
}

private Either<Failure, Void> updateOutputCollection(
final ExecutableMultiInstanceBody element,
final BpmnElementContext childContext,
final BpmnElementContext flowScopeContext,
final DirectBuffer variableName) {

final var loopCounter =
stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter();

return readOutputElementVariable(element, childContext)
.flatMap(
elementVariable -> {
// we need to read the output element variable before the current collection
// is read, because readOutputElementVariable(Context) uses the same
// buffer as getVariableLocal this could also be avoided by cloning the current
// collection, but that is slower.
final var currentCollection =
stateBehavior.getLocalVariable(flowScopeContext, variableName);
return replaceAt(
currentCollection,
loopCounter,
elementVariable,
flowScopeContext.getElementInstanceKey(),
variableName)
.map(
updatedCollection -> {
stateBehavior.setLocalVariable(
flowScopeContext, variableName, updatedCollection);
return null;
});
});
}

private Either<Failure, DirectBuffer> readOutputElementVariable(
final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow();
return expressionProcessor.evaluateAnyExpression(expression, context.getElementInstanceKey());
}

private Either<Failure, DirectBuffer> replaceAt(
final DirectBuffer array,
final int index,
final DirectBuffer element,
final long variableScopeKey,
final DirectBuffer variableName) {

outputCollectionReader.wrap(array, 0, array.capacity());
final var token = outputCollectionReader.readToken();

final var optValidationFailure =
validateIsCollectionAndHasAppropriateSIze(index, variableScopeKey, variableName, token);
if (optValidationFailure.isPresent()) {
return Either.left(optValidationFailure.get());
}

outputCollectionReader.skipValues((long) index - 1L);

final var offsetBefore = outputCollectionReader.getOffset();
outputCollectionReader.skipValue();
final var offsetAfter = outputCollectionReader.getOffset();

outputCollectionWriter.wrap(outputCollectionBuffer, 0);
outputCollectionWriter.writeRaw(array, 0, offsetBefore);
outputCollectionWriter.writeRaw(element);
outputCollectionWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter);

final var length = outputCollectionWriter.getOffset();

updatedOutputCollectionBuffer.wrap(outputCollectionBuffer, 0, length);
return Either.right(updatedOutputCollectionBuffer);
}

private Optional<Failure> validateIsCollectionAndHasAppropriateSIze(
final int index,
final long variableScopeKey,
final DirectBuffer variableName,
final MsgPackToken token) {
if (token.getType() != MsgPackType.ARRAY) {
return Optional.of(
new Failure(
"Unable to update an item in output collection '%s' because the type of the output collection is: %s. This may happen when multiple BPMN elements write to the same variable."
.formatted(bufferAsString(variableName), token.getType()),
ErrorType.EXTRACT_VALUE_ERROR,
variableScopeKey));
}

final int size = token.getSize();
if (index > size) {
return Optional.of(
new Failure(
"Unable to update an item in output collection '%s' at position %d because the size of the collection is: %d. This may happen when multiple BPMN elements write to the same variable."
.formatted(bufferAsString(variableName), index, size),
ErrorType.EXTRACT_VALUE_ERROR,
variableScopeKey));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.MultiInstanceOutputCollectionBehavior;
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody;
import io.camunda.zeebe.msgpack.spec.MsgPackHelper;
import io.camunda.zeebe.msgpack.spec.MsgPackReader;
import io.camunda.zeebe.msgpack.spec.MsgPackWriter;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.ErrorType;
Expand All @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Optional;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

Expand All @@ -42,23 +41,22 @@ public final class MultiInstanceBodyProcessor
new UnsafeBuffer(new byte[Long.BYTES + 1]);
private final DirectBuffer loopCounterVariableView = new UnsafeBuffer(0, 0);

private final MsgPackReader variableReader = new MsgPackReader();
private final MsgPackWriter variableWriter = new MsgPackWriter();
private final ExpandableArrayBuffer variableBuffer = new ExpandableArrayBuffer();
private final DirectBuffer resultBuffer = new UnsafeBuffer(0, 0);
private final MsgPackWriter loopCounterWriter = new MsgPackWriter();

private final ExpressionProcessor expressionBehavior;
private final BpmnStateTransitionBehavior stateTransitionBehavior;
private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior;
private final BpmnStateBehavior stateBehavior;
private final BpmnIncidentBehavior incidentBehavior;
private final MultiInstanceOutputCollectionBehavior multiInstanceOutputCollectionBehavior;

public MultiInstanceBodyProcessor(final BpmnBehaviors bpmnBehaviors) {
stateTransitionBehavior = bpmnBehaviors.stateTransitionBehavior();
eventSubscriptionBehavior = bpmnBehaviors.eventSubscriptionBehavior();
stateBehavior = bpmnBehaviors.stateBehavior();
expressionBehavior = bpmnBehaviors.expressionBehavior();
incidentBehavior = bpmnBehaviors.incidentBehavior();
multiInstanceOutputCollectionBehavior = bpmnBehaviors.outputCollectionBehavior();
}

@Override
Expand Down Expand Up @@ -150,7 +148,9 @@ public void onTerminate(
final ExecutableMultiInstanceBody element,
final BpmnElementContext flowScopeContext,
final BpmnElementContext childContext) {
final var updatedOrFailure = updateOutputCollection(element, childContext, flowScopeContext);
final var updatedOrFailure =
multiInstanceOutputCollectionBehavior.updateOutputCollection(
element, childContext, flowScopeContext);
if (updatedOrFailure.isLeft()) {
return updatedOrFailure;
}
Expand Down Expand Up @@ -247,7 +247,8 @@ private void activate(
.getOutputCollection()
.ifPresent(
variableName ->
initializeOutputCollection(activated, variableName, inputCollection.size()));
multiInstanceOutputCollectionBehavior.initializeOutputCollection(
activated, variableName, inputCollection.size()));

if (inputCollection.isEmpty()) {
// complete the multi-instance body immediately
Expand Down Expand Up @@ -328,99 +329,15 @@ private void createInnerInstance(
}

private DirectBuffer wrapLoopCounter(final int loopCounter) {
variableWriter.wrap(loopCounterVariableBuffer, 0);
loopCounterWriter.wrap(loopCounterVariableBuffer, 0);

variableWriter.writeInteger(loopCounter);
final var length = variableWriter.getOffset();
loopCounterWriter.writeInteger(loopCounter);
final var length = loopCounterWriter.getOffset();

loopCounterVariableView.wrap(loopCounterVariableBuffer, 0, length);
return loopCounterVariableView;
}

private void initializeOutputCollection(
final BpmnElementContext context, final DirectBuffer variableName, final int size) {

variableWriter.wrap(variableBuffer, 0);

// initialize the array with nil
variableWriter.writeArrayHeader(size);
for (var i = 0; i < size; i++) {
variableWriter.writeNil();
}

final var length = variableWriter.getOffset();

stateBehavior.setLocalVariable(context, variableName, variableBuffer, 0, length);
}

private Either<Failure, Void> updateOutputCollection(
final ExecutableMultiInstanceBody element,
final BpmnElementContext childContext,
final BpmnElementContext flowScopeContext) {

return element
.getLoopCharacteristics()
.getOutputCollection()
.map(
variableName ->
updateOutputCollection(element, childContext, flowScopeContext, variableName))
.orElse(Either.right(null));
}

private Either<Failure, Void> updateOutputCollection(
final ExecutableMultiInstanceBody element,
final BpmnElementContext childContext,
final BpmnElementContext flowScopeContext,
final DirectBuffer variableName) {

final var loopCounter =
stateBehavior.getElementInstance(childContext).getMultiInstanceLoopCounter();

return readOutputElementVariable(element, childContext)
.map(
elementVariable -> {
// we need to read the output element variable before the current collection
// is read, because readOutputElementVariable(Context) uses the same
// buffer as getVariableLocal this could also be avoided by cloning the current
// collection, but that is slower.
final var currentCollection =
stateBehavior.getLocalVariable(flowScopeContext, variableName);
final var updatedCollection =
insertAt(currentCollection, loopCounter, elementVariable);
stateBehavior.setLocalVariable(flowScopeContext, variableName, updatedCollection);

return null;
});
}

private Either<Failure, DirectBuffer> readOutputElementVariable(
final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
final var expression = element.getLoopCharacteristics().getOutputElement().orElseThrow();
return expressionBehavior.evaluateAnyExpression(expression, context.getElementInstanceKey());
}

private DirectBuffer insertAt(
final DirectBuffer array, final int index, final DirectBuffer element) {

variableReader.wrap(array, 0, array.capacity());
variableReader.readArrayHeader();
variableReader.skipValues((long) index - 1L);

final var offsetBefore = variableReader.getOffset();
variableReader.skipValue();
final var offsetAfter = variableReader.getOffset();

variableWriter.wrap(variableBuffer, 0);
variableWriter.writeRaw(array, 0, offsetBefore);
variableWriter.writeRaw(element);
variableWriter.writeRaw(array, offsetAfter, array.capacity() - offsetAfter);

final var length = variableWriter.getOffset();

resultBuffer.wrap(variableBuffer, 0, length);
return resultBuffer;
}

private Either<Failure, Boolean> satisfiesCompletionCondition(
final ExecutableMultiInstanceBody element, final BpmnElementContext context) {
final Optional<Expression> completionCondition =
Expand Down

0 comments on commit 8c8974a

Please sign in to comment.