diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java index bc1f6eef704d..0c7668ca3445 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/ProcessEventProcessors.java @@ -15,6 +15,7 @@ import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionCreateProcessor; import io.camunda.zeebe.engine.processing.message.ProcessMessageSubscriptionDeleteProcessor; import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender; +import io.camunda.zeebe.engine.processing.processinstance.ActivateProcessInstanceBatchProcessor; import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceProcessor; import io.camunda.zeebe.engine.processing.processinstance.CreateProcessInstanceWithResultProcessor; import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceCommandProcessor; @@ -225,12 +226,21 @@ private static void addProcessInstanceBatchStreamProcessors( final TypedRecordProcessors typedRecordProcessors, final MutableProcessingState processingState, final Writers writers) { - final TerminateProcessInstanceBatchProcessor terminateBatchProcessor = - new TerminateProcessInstanceBatchProcessor( - writers, processingState.getKeyGenerator(), processingState.getElementInstanceState()); - typedRecordProcessors.onCommand( - ValueType.PROCESS_INSTANCE_BATCH, - ProcessInstanceBatchIntent.TERMINATE, - terminateBatchProcessor); + typedRecordProcessors + .onCommand( + ValueType.PROCESS_INSTANCE_BATCH, + ProcessInstanceBatchIntent.TERMINATE, + new TerminateProcessInstanceBatchProcessor( + writers, + processingState.getKeyGenerator(), + processingState.getElementInstanceState())) + .onCommand( + ValueType.PROCESS_INSTANCE_BATCH, + ProcessInstanceBatchIntent.ACTIVATE, + new ActivateProcessInstanceBatchProcessor( + writers, + processingState.getKeyGenerator(), + processingState.getElementInstanceState(), + processingState.getProcessState())); } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java index b7bddce12657..74ddc2dd3b3a 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java @@ -304,6 +304,23 @@ public long activateChildInstanceWithKey( return childInstanceKey; } + /** + * Activate a given amount of children of a multi-instance element. + * + * @param context the context of the multi-instance element + * @param amount the amount of children for which we will write an activate command + */ + public void activateChildInstancesInBatches(final BpmnElementContext context, final int amount) { + final var record = + new ProcessInstanceBatchRecord() + .setProcessInstanceKey(context.getProcessInstanceKey()) + .setBatchElementInstanceKey(context.getElementInstanceKey()) + .setIndex(amount); + + final var key = keyGenerator.nextKey(); + commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, record); + } + public void activateElementInstanceInFlowScope( final BpmnElementContext context, final ExecutableFlowElement element) { diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java index 789588cd2cb3..7be0e8f2c017 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/container/MultiInstanceBodyProcessor.java @@ -274,7 +274,7 @@ private void activate( if (loopCharacteristics.isSequential()) { createInnerInstance(element, activated); } else { - inputCollection.forEach(item -> createInnerInstance(element, activated)); + stateTransitionBehavior.activateChildInstancesInBatches(context, inputCollection.size()); } } diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ActivateProcessInstanceBatchProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ActivateProcessInstanceBatchProcessor.java new file mode 100644 index 000000000000..a36d1aa732e5 --- /dev/null +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/processinstance/ActivateProcessInstanceBatchProcessor.java @@ -0,0 +1,104 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.engine.processing.processinstance; + +import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMultiInstanceBody; +import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter; +import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; +import io.camunda.zeebe.engine.state.immutable.ElementInstanceState; +import io.camunda.zeebe.engine.state.immutable.ProcessState; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.camunda.zeebe.stream.api.records.TypedRecord; +import io.camunda.zeebe.stream.api.state.KeyGenerator; + +public final class ActivateProcessInstanceBatchProcessor + implements TypedRecordProcessor { + private final TypedCommandWriter commandWriter; + private final KeyGenerator keyGenerator; + private final ElementInstanceState elementInstanceState; + private final ProcessState processState; + + public ActivateProcessInstanceBatchProcessor( + final Writers writers, + final KeyGenerator keyGenerator, + final ElementInstanceState elementInstanceState, + final ProcessState processState) { + commandWriter = writers.command(); + this.keyGenerator = keyGenerator; + this.elementInstanceState = elementInstanceState; + this.processState = processState; + } + + @Override + public void processRecord(final TypedRecord record) { + final var recordValue = record.getValue(); + + final ProcessInstanceRecord childInstanceRecord = createChildInstanceRecord(recordValue); + var amountOfChildInstancesToActivate = recordValue.getIndex(); + while (amountOfChildInstancesToActivate > 0) { + if (canWriteCommands(record, childInstanceRecord)) { + final long childInstanceKey = keyGenerator.nextKey(); + commandWriter.appendFollowUpCommand( + childInstanceKey, ProcessInstanceIntent.ACTIVATE_ELEMENT, childInstanceRecord); + amountOfChildInstancesToActivate--; + } else { + writeFollowupBatchCommand(recordValue, amountOfChildInstancesToActivate); + break; + } + } + } + + private ProcessInstanceRecord createChildInstanceRecord( + final ProcessInstanceBatchRecord recordValue) { + final var parentElementInstance = + elementInstanceState.getInstance(recordValue.getBatchElementInstanceKey()); + final var processDefinition = + processState + .getProcessByKey(parentElementInstance.getValue().getProcessDefinitionKey()) + .getProcess(); + + final var parentElement = + processDefinition.getElementById(parentElementInstance.getValue().getElementId()); + final var childElement = ((ExecutableMultiInstanceBody) parentElement).getInnerActivity(); + + final var childInstanceRecord = new ProcessInstanceRecord(); + childInstanceRecord.wrap(parentElementInstance.getValue()); + childInstanceRecord + .setFlowScopeKey(parentElementInstance.getKey()) + .setElementId(childElement.getId()) + .setBpmnElementType(childElement.getElementType()) + .setBpmnEventType(childElement.getEventType()); + return childInstanceRecord; + } + + private void writeFollowupBatchCommand( + final ProcessInstanceBatchRecord recordValue, final long index) { + final var nextBatchRecord = + new ProcessInstanceBatchRecord() + .setProcessInstanceKey(recordValue.getProcessInstanceKey()) + .setBatchElementInstanceKey(recordValue.getBatchElementInstanceKey()) + .setIndex(index); + final long key = keyGenerator.nextKey(); + commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.ACTIVATE, nextBatchRecord); + } + + private boolean canWriteCommands( + final TypedRecord record, + final ProcessInstanceRecord childInstanceRecord) { + // We must have space in the batch to write both the ACTIVATE command as the potential + // follow-up batch command. An excessive 8Kb is added to account for metadata. This is way + // more than will be necessary. + final var expectedCommandLength = + record.getLength() + childInstanceRecord.getLength() + (1024 * 8); + return commandWriter.canWriteCommandOfLength(expectedCommandLength); + } +} diff --git a/protocol-impl/src/main/java/io/camunda/zeebe/protocol/impl/record/value/processinstance/ProcessInstanceBatchRecord.java b/protocol-impl/src/main/java/io/camunda/zeebe/protocol/impl/record/value/processinstance/ProcessInstanceBatchRecord.java index 1fca3b8535d1..3a40b0b96e9f 100644 --- a/protocol-impl/src/main/java/io/camunda/zeebe/protocol/impl/record/value/processinstance/ProcessInstanceBatchRecord.java +++ b/protocol-impl/src/main/java/io/camunda/zeebe/protocol/impl/record/value/processinstance/ProcessInstanceBatchRecord.java @@ -19,9 +19,17 @@ public final class ProcessInstanceBatchRecord extends UnifiedRecordValue new LongProperty("batchElementInstanceKey"); /** - * The index is used to determine the beginning of the next batch. If the index equals -1 it means - * there won't be another batch. For the TERMINATE intent this index will be the element instance - * key of the first child instance of the next batch. + * The index is used to keep track of the position in the batch. When the index is -1, there won't + * be another batch. + * + *

Depending on the Intent the index is used differently: + * + *

    + *
  • TERMINATE - The index is the element instance key of the first child instance of the next + * batch. + *
  • ACTIVATE - The index is a counter, indicating how many more child instances need to be + * activated. + *
*/ private final LongProperty indexProperty = new LongProperty("index", -1L); diff --git a/protocol/src/main/java/io/camunda/zeebe/protocol/record/intent/ProcessInstanceBatchIntent.java b/protocol/src/main/java/io/camunda/zeebe/protocol/record/intent/ProcessInstanceBatchIntent.java index e68ad6585bf0..bdf61bb1c5cb 100644 --- a/protocol/src/main/java/io/camunda/zeebe/protocol/record/intent/ProcessInstanceBatchIntent.java +++ b/protocol/src/main/java/io/camunda/zeebe/protocol/record/intent/ProcessInstanceBatchIntent.java @@ -16,7 +16,8 @@ package io.camunda.zeebe.protocol.record.intent; public enum ProcessInstanceBatchIntent implements ProcessInstanceRelatedIntent { - TERMINATE(0); + TERMINATE(0), + ACTIVATE(1); private final short value; private final boolean shouldBlacklist; @@ -34,6 +35,8 @@ public static Intent from(final short value) { switch (value) { case 0: return TERMINATE; + case 1: + return ACTIVATE; default: return UNKNOWN; } diff --git a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java index 040bbfd95006..8bb91f326ae4 100644 --- a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java +++ b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java @@ -179,7 +179,9 @@ private void hasTerminatedInMultipleBatches( .withProcessInstanceKey(processInstance.getProcessInstanceKey()) .withBatchElementInstanceKey(batchElementInstanceKey) .limit(2)) - .describedAs("Has terminated in multiple batches") + .describedAs( + "Has terminated in multiple batches. If this assertion fails please decrease " + + "the message size, or increase the amount of element instances.") .hasSize(2); } } diff --git a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/processing/MultiInstanceLargeInputCollectionTest.java b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/processing/MultiInstanceLargeInputCollectionTest.java new file mode 100644 index 000000000000..f5e56b9b8085 --- /dev/null +++ b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/processing/MultiInstanceLargeInputCollectionTest.java @@ -0,0 +1,151 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.it.processing; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.camunda.zeebe.broker.test.EmbeddedBrokerRule; +import io.camunda.zeebe.client.api.response.ActivateJobsResponse; +import io.camunda.zeebe.client.impl.ZeebeObjectMapper; +import io.camunda.zeebe.it.util.GrpcClientRule; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.protocol.record.intent.JobIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.camunda.zeebe.protocol.record.value.BpmnElementType; +import io.camunda.zeebe.test.util.BrokerClassRuleHelper; +import io.camunda.zeebe.test.util.record.RecordingExporter; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.springframework.util.unit.DataSize; + +public final class MultiInstanceLargeInputCollectionTest { + private static final int INPUT_COLLECTION_SIZE = 100; + private static final String INPUT_ELEMENT = "inputElement"; + private static final int MAX_MESSAGE_SIZE_KB = 16; + private static final ZeebeObjectMapper OBJECT_MAPPER = new ZeebeObjectMapper(); + private static final EmbeddedBrokerRule BROKER_RULE = + new EmbeddedBrokerRule( + cfg -> { + cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB)); + cfg.getProcessing().setMaxCommandsInBatch(1); + }); + private static final GrpcClientRule CLIENT_RULE = new GrpcClientRule(BROKER_RULE); + + @ClassRule + public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE); + + @Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper(); + + @Test + public void shouldCompleteParallelMultiInstanceWithLargeInputCollection() { + // given + final long processKey = + CLIENT_RULE.deployProcess( + Bpmn.createExecutableProcess("PROCESS") + .startEvent() + .serviceTask( + "task", + t -> + t.zeebeJobType("task") + .multiInstance( + mi -> + mi.parallel() + .zeebeInputCollectionExpression( + createInputCollection(INPUT_COLLECTION_SIZE)) + .zeebeInputElement(INPUT_ELEMENT))) + .endEvent() + .done()); + + // when + final var processInstanceKey = CLIENT_RULE.createProcessInstance(processKey); + RecordingExporter.jobRecords(JobIntent.CREATED) + .withProcessInstanceKey(processInstanceKey) + .withElementId("task") + .limit(INPUT_COLLECTION_SIZE) + .collect(Collectors.toList()); + final var actualInputElements = completeJobs(); + + // then + hasCreatedJobForEachInputElement(actualInputElements); + hasCompletedElementsAndProcessInCorrectSequence(processInstanceKey); + hasActivatedElementInBatches(processInstanceKey); + } + + private String createInputCollection(final int size) { + final var inputCollection = new ArrayList(); + for (int i = 0; i < size; i++) { + inputCollection.add(i); + } + return OBJECT_MAPPER.toJson(inputCollection); + } + + private Set completeJobs() { + final ActivateJobsResponse activatedJobs = + CLIENT_RULE + .getClient() + .newActivateJobsCommand() + .jobType("task") + .maxJobsToActivate(INPUT_COLLECTION_SIZE) + .fetchVariables(INPUT_ELEMENT) + .send() + .join(); + + final var inputElements = new HashSet(); + activatedJobs + .getJobs() + .forEach( + job -> { + inputElements.add((Integer) job.getVariablesAsMap().get(INPUT_ELEMENT)); + CLIENT_RULE.getClient().newCompleteCommand(job.getKey()).send().join(); + }); + return inputElements; + } + + private void hasCreatedJobForEachInputElement(final Set actualInputElements) { + final var expectedInputElements = new HashSet(); + for (int i = 0; i < INPUT_COLLECTION_SIZE; i++) { + expectedInputElements.add(i); + } + assertThat(actualInputElements).containsAll(expectedInputElements); + } + + private void hasCompletedElementsAndProcessInCorrectSequence(final long processInstanceKey) { + assertThat( + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) + .withProcessInstanceKey(processInstanceKey) + .limitToProcessInstanceCompleted()) + .map(record -> record.getValue().getBpmnElementType()) + .describedAs("Has completed process and all jobs") + // Plus 4 for the start event, multi instance, end event and process + .hasSize(INPUT_COLLECTION_SIZE + 4) + .describedAs("Completed in correct sequence") + .startsWith(BpmnElementType.START_EVENT, BpmnElementType.SERVICE_TASK) + .endsWith( + BpmnElementType.SERVICE_TASK, + BpmnElementType.MULTI_INSTANCE_BODY, + BpmnElementType.END_EVENT, + BpmnElementType.PROCESS); + } + + private void hasActivatedElementInBatches(final long processInstanceKey) { + assertThat( + RecordingExporter.processInstanceBatchRecords() + .withProcessInstanceKey(processInstanceKey) + .limit(2)) + .describedAs( + "Has activated in multiple batches. If this assertion fails please decrease " + + "the message size, or increase the input collection.") + .hasSize(2); + } +}