diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateBehavior.java index 18ecc8ab1fcb..f92515937076 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateBehavior.java @@ -21,7 +21,6 @@ import io.camunda.zeebe.protocol.record.value.BpmnElementType; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import org.agrona.DirectBuffer; public final class BpmnStateBehavior { @@ -99,15 +98,6 @@ public ElementInstance getFlowScopeInstance(final BpmnElementContext context) { return elementInstanceState.getInstance(context.getFlowScopeKey()); } - public List getChildInstances(final BpmnElementContext context) { - return elementInstanceState.getChildren(context.getElementInstanceKey()).stream() - .map( - childInstance -> - context.copy( - childInstance.getKey(), childInstance.getValue(), childInstance.getState())) - .collect(Collectors.toList()); - } - public BpmnElementContext getFlowScopeContext(final BpmnElementContext context) { final var flowScope = getFlowScopeInstance(context); return context.copy(flowScope.getKey(), flowScope.getValue(), flowScope.getState()); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java index 7d3f04b9f126..50744d5462e8 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/bpmn/behavior/BpmnStateTransitionBehavior.java @@ -22,7 +22,9 @@ import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.engine.state.KeyGenerator; import io.camunda.zeebe.engine.state.deployment.DeployedProcess; +import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord; import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; import io.camunda.zeebe.util.Either; @@ -306,26 +308,27 @@ public void activateElementInstanceInFlowScope( } /** - * Terminate all child instances of the given scope. + * Terminate all child instances of the given scope. Terminating is done in batches. It is + * triggered by writing the ProcessInstanceBatch TERMINATE command. * * @param context the scope to terminate the child instances of * @return {@code true} if the scope has no active child instances */ public boolean terminateChildInstances(final BpmnElementContext context) { - - stateBehavior.getChildInstances(context).stream() - .filter(child -> ProcessInstanceLifecycle.canTerminate(child.getIntent())) - .forEach( - childInstanceContext -> - commandWriter.appendFollowUpCommand( - childInstanceContext.getElementInstanceKey(), - ProcessInstanceIntent.TERMINATE_ELEMENT, - childInstanceContext.getRecordValue())); - final var elementInstance = stateBehavior.getElementInstance(context); final var activeChildInstances = elementInstance.getNumberOfActiveElementInstances(); - return activeChildInstances == 0; + if (activeChildInstances == 0) { + return true; + } else { + final var batchRecord = + new ProcessInstanceBatchRecord() + .setProcessInstanceKey(context.getProcessInstanceKey()) + .setBatchElementInstanceKey(context.getElementInstanceKey()); + final var key = keyGenerator.nextKey(); + commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.TERMINATE, batchRecord); + return false; + } } public void takeOutgoingSequenceFlows( diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/boundary/BoundaryEventTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/boundary/BoundaryEventTest.java index 045d40836f00..5cf5b6a29e63 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/boundary/BoundaryEventTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/bpmn/boundary/BoundaryEventTest.java @@ -23,6 +23,7 @@ import io.camunda.zeebe.protocol.record.intent.IncidentIntent; import io.camunda.zeebe.protocol.record.intent.JobIntent; import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent; import io.camunda.zeebe.protocol.record.intent.TimerIntent; @@ -256,6 +257,7 @@ public void shouldTerminateSubProcessBeforeTriggeringBoundaryEvent() { tuple(ValueType.PROCESS_EVENT, ProcessEventIntent.TRIGGERING), tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT), tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING), + tuple(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchIntent.TERMINATE), tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT), tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING), tuple(ValueType.JOB, JobIntent.CANCELED), diff --git a/engine/src/test/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceCommandRejectionTest.java b/engine/src/test/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceCommandRejectionTest.java index 6fd688f7cdcd..18578eb014d3 100644 --- a/engine/src/test/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceCommandRejectionTest.java +++ b/engine/src/test/java/io/camunda/zeebe/engine/processing/processinstance/ProcessInstanceCommandRejectionTest.java @@ -584,9 +584,10 @@ public void shouldRejectTerminateIfElementIsTerminated() { .endEvent() .done()); - final var timerCreated = - RecordingExporter.timerRecords(TimerIntent.CREATED) + final var serviceTaskActivated = + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED) .withProcessInstanceKey(processInstanceKey) + .withElementId("a") .getFirst(); final var jobCreated = @@ -596,7 +597,8 @@ public void shouldRejectTerminateIfElementIsTerminated() { // when engine.writeRecords( - cancelProcessInstanceCommand(processInstanceKey), triggerTimerCommand(timerCreated)); + terminateElementCommand(serviceTaskActivated), + terminateElementCommand(serviceTaskActivated)); // then final var rejectedCommand = @@ -632,6 +634,12 @@ private RecordToWrite triggerTimerCommand(final Record timer) return RecordToWrite.command().timer(TimerIntent.TRIGGER, timer.getValue()).key(timer.getKey()); } + private RecordToWrite terminateElementCommand(final Record record) { + return RecordToWrite.command() + .processInstance(ProcessInstanceIntent.TERMINATE_ELEMENT, record.getValue()) + .key(record.getKey()); + } + private void assertThatCommandIsRejected( final Record command, final String rejectionReason) { diff --git a/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java new file mode 100644 index 000000000000..acf1c2d7ac2e --- /dev/null +++ b/qa/integration-tests/src/test/java/io/camunda/zeebe/it/client/command/CancelProcessInstanceInBatchesTest.java @@ -0,0 +1,177 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.it.client.command; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.camunda.zeebe.broker.test.EmbeddedBrokerRule; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.it.util.GrpcClientRule; +import io.camunda.zeebe.model.bpmn.Bpmn; +import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; +import io.camunda.zeebe.protocol.record.value.BpmnElementType; +import io.camunda.zeebe.test.util.BrokerClassRuleHelper; +import io.camunda.zeebe.test.util.record.RecordingExporter; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.RuleChain; +import org.springframework.util.unit.DataSize; + +public final class CancelProcessInstanceInBatchesTest { + + private static final int AMOUNT_OF_ELEMENT_INSTANCES = 100; + private static final int MAX_MESSAGE_SIZE_KB = 32; + private static final EmbeddedBrokerRule BROKER_RULE = + new EmbeddedBrokerRule( + cfg -> { + cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB)); + // Note: this is a bout the batch for writing to the logstream, not the terminate batch + cfg.getProcessing().setMaxCommandsInBatch(1); + }); + private static final GrpcClientRule CLIENT_RULE = new GrpcClientRule(BROKER_RULE); + + @ClassRule + public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE); + + @Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper(); + + @Test + public void shouldCancelInstanceWithMoreChildrenThanTheBatchSizeCanHandle() { + // given + CLIENT_RULE.deployProcess( + Bpmn.createExecutableProcess("PROCESS") + .startEvent() + .zeebeOutputExpression("0", "count") + .exclusiveGateway("joining") + .parallelGateway("split") + .userTask("userTask") + .endEvent() + .moveToLastGateway() + .exclusiveGateway("forking") + .sequenceFlowId("sequenceToEnd") + .conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2)) + .endEvent("endEvent") + .moveToLastExclusiveGateway() + .defaultFlow() + .intermediateThrowEvent( + "increment", task -> task.zeebeOutputExpression("count + 1", "count")) + .connectTo("joining") + .done()); + + // when + final ProcessInstanceEvent processInstance = startProcessInstance(); + cancelProcessInstance(processInstance); + + // then + hasTerminatedProcessInstance(processInstance); + hasTerminatedAllUserTasks(processInstance); + hasTerminatedInMultipleBatches(processInstance, processInstance.getProcessInstanceKey()); + } + + @Test + public void shouldCancelSubprocessWithMoreNestedChildrenThanTheBatchSizeCanHandle() { + // given + CLIENT_RULE.deployProcess( + Bpmn.createExecutableProcess("PROCESS") + .startEvent() + .subProcess( + "subprocess", + sp -> + sp.embeddedSubProcess() + .startEvent() + .zeebeOutputExpression("0", "count") + .exclusiveGateway("joining") + .parallelGateway("split") + .userTask("userTask") + .endEvent() + .moveToLastGateway() + .exclusiveGateway("forking") + .sequenceFlowId("sequenceToEnd") + .conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2)) + .endEvent("endEvent") + .moveToLastExclusiveGateway() + .defaultFlow() + .intermediateThrowEvent( + "increment", task -> task.zeebeOutputExpression("count + 1", "count")) + .connectTo("joining")) + .endEvent() + .done()); + + // when + final ProcessInstanceEvent processInstance = startProcessInstance(); + final var subProcess = + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED) + .withProcessInstanceKey(processInstance.getProcessInstanceKey()) + .withElementId("subprocess") + .getFirst(); + cancelProcessInstance(processInstance); + + // then + hasTerminatedProcessInstance(processInstance); + hasTerminatedAllUserTasks(processInstance); + hasTerminatedInMultipleBatches(processInstance, subProcess.getKey()); + } + + private ProcessInstanceEvent startProcessInstance() { + final var processInstance = + CLIENT_RULE + .getClient() + .newCreateInstanceCommand() + .bpmnProcessId("PROCESS") + .latestVersion() + .send() + .join(); + return processInstance; + } + + private void cancelProcessInstance(final ProcessInstanceEvent processInstance) { + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED) + .withProcessInstanceKey(processInstance.getProcessInstanceKey()) + .withElementId("endEvent") + .limit(1) + .await(); + + CLIENT_RULE + .getClient() + .newCancelInstanceCommand(processInstance.getProcessInstanceKey()) + .send(); + } + + private void hasTerminatedProcessInstance(final ProcessInstanceEvent processInstance) { + assertThat( + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED) + .withProcessInstanceKey(processInstance.getProcessInstanceKey()) + .withElementType(BpmnElementType.PROCESS) + .limitToProcessInstanceTerminated() + .exists()) + .describedAs("Has terminated process instance") + .isTrue(); + } + + private void hasTerminatedAllUserTasks(final ProcessInstanceEvent processInstance) { + assertThat( + RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED) + .withProcessInstanceKey(processInstance.getProcessInstanceKey()) + .withElementId("userTask") + .limit(AMOUNT_OF_ELEMENT_INSTANCES)) + .describedAs("Has terminated all " + AMOUNT_OF_ELEMENT_INSTANCES + " user tasks") + .hasSize(AMOUNT_OF_ELEMENT_INSTANCES); + } + + private void hasTerminatedInMultipleBatches( + final ProcessInstanceEvent processInstance, final long batchElementInstanceKey) { + assertThat( + RecordingExporter.processInstanceBatchRecords() + .withProcessInstanceKey(processInstance.getProcessInstanceKey()) + .withBatchElementInstanceKey(batchElementInstanceKey) + .limit(2)) + .describedAs("Has terminated in multiple batches") + .hasSize(2); + } +} diff --git a/test-util/src/main/java/io/camunda/zeebe/test/util/record/ProcessInstanceBatchRecordStream.java b/test-util/src/main/java/io/camunda/zeebe/test/util/record/ProcessInstanceBatchRecordStream.java new file mode 100644 index 000000000000..0c0152d392bc --- /dev/null +++ b/test-util/src/main/java/io/camunda/zeebe/test/util/record/ProcessInstanceBatchRecordStream.java @@ -0,0 +1,41 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.camunda.zeebe.test.util.record; + +import io.camunda.zeebe.protocol.record.Record; +import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue; +import java.util.stream.Stream; + +public class ProcessInstanceBatchRecordStream + extends ExporterRecordStream< + ProcessInstanceBatchRecordValue, ProcessInstanceBatchRecordStream> { + + public ProcessInstanceBatchRecordStream( + final Stream> wrappedStream) { + super(wrappedStream); + } + + @Override + protected ProcessInstanceBatchRecordStream supply( + final Stream> wrappedStream) { + return new ProcessInstanceBatchRecordStream(wrappedStream); + } + + public ProcessInstanceBatchRecordStream withProcessInstanceKey(final long processInstanceKey) { + return valueFilter(v -> v.getProcessInstanceKey() == processInstanceKey); + } + + public ProcessInstanceBatchRecordStream withBatchElementInstanceKey( + final long batchElementInstanceKey) { + return valueFilter(v -> v.getBatchElementInstanceKey() == batchElementInstanceKey); + } + + public ProcessInstanceBatchRecordStream withIndex(final long index) { + return valueFilter(v -> v.getIndex() == index); + } +} diff --git a/test-util/src/main/java/io/camunda/zeebe/test/util/record/RecordingExporter.java b/test-util/src/main/java/io/camunda/zeebe/test/util/record/RecordingExporter.java index 46a1e8281ecc..6b1bf070cb4e 100644 --- a/test-util/src/main/java/io/camunda/zeebe/test/util/record/RecordingExporter.java +++ b/test-util/src/main/java/io/camunda/zeebe/test/util/record/RecordingExporter.java @@ -34,6 +34,7 @@ import io.camunda.zeebe.protocol.record.value.MessageRecordValue; import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue; import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue; +import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue; import io.camunda.zeebe.protocol.record.value.ProcessInstanceCreationRecordValue; import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue; import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue; @@ -227,6 +228,11 @@ public static ProcessInstanceRecordStream processInstanceRecords( return processInstanceRecords().withIntent(intent); } + public static ProcessInstanceBatchRecordStream processInstanceBatchRecords() { + return new ProcessInstanceBatchRecordStream( + records(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchRecordValue.class)); + } + public static TimerRecordStream timerRecords() { return new TimerRecordStream(records(ValueType.TIMER, TimerRecordValue.class)); }