Skip to content

Commit

Permalink
merge: #12604
Browse files Browse the repository at this point in the history
12604: Terminate children using the new `ProcessInstanceBatch` command r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->

This PR switches the termination of child instances to use the new `ProcessInstanceBatch` command.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #12538 
closes #11355 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed May 1, 2023
2 parents b5fde1c + 12439f7 commit e3b025a
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;

public final class BpmnStateBehavior {
Expand Down Expand Up @@ -99,15 +98,6 @@ public ElementInstance getFlowScopeInstance(final BpmnElementContext context) {
return elementInstanceState.getInstance(context.getFlowScopeKey());
}

public List<BpmnElementContext> getChildInstances(final BpmnElementContext context) {
return elementInstanceState.getChildren(context.getElementInstanceKey()).stream()
.map(
childInstance ->
context.copy(
childInstance.getKey(), childInstance.getValue(), childInstance.getState()))
.collect(Collectors.toList());
}

public BpmnElementContext getFlowScopeContext(final BpmnElementContext context) {
final var flowScope = getFlowScopeInstance(context);
return context.copy(flowScope.getKey(), flowScope.getValue(), flowScope.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.deployment.DeployedProcess;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
Expand Down Expand Up @@ -318,26 +320,27 @@ public void activateElementInstanceInFlowScope(
}

/**
* Terminate all child instances of the given scope.
* Terminate all child instances of the given scope. Terminating is done in batches. It is
* triggered by writing the ProcessInstanceBatch TERMINATE command.
*
* @param context the scope to terminate the child instances of
* @return {@code true} if the scope has no active child instances
*/
public boolean terminateChildInstances(final BpmnElementContext context) {

stateBehavior.getChildInstances(context).stream()
.filter(child -> ProcessInstanceLifecycle.canTerminate(child.getIntent()))
.forEach(
childInstanceContext ->
commandWriter.appendFollowUpCommand(
childInstanceContext.getElementInstanceKey(),
ProcessInstanceIntent.TERMINATE_ELEMENT,
childInstanceContext.getRecordValue()));

final var elementInstance = stateBehavior.getElementInstance(context);
final var activeChildInstances = elementInstance.getNumberOfActiveElementInstances();

return activeChildInstances == 0;
if (activeChildInstances == 0) {
return true;
} else {
final var batchRecord =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(context.getProcessInstanceKey())
.setBatchElementInstanceKey(context.getElementInstanceKey());
final var key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.TERMINATE, batchRecord);
return false;
}
}

public <T extends ExecutableFlowNode> void takeOutgoingSequenceFlows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand Down Expand Up @@ -256,6 +257,7 @@ public void shouldTerminateSubProcessBeforeTriggeringBoundaryEvent() {
tuple(ValueType.PROCESS_EVENT, ProcessEventIntent.TRIGGERING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchIntent.TERMINATE),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.JOB, JobIntent.CANCELED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,10 @@ public void shouldRejectTerminateIfElementIsTerminated() {
.endEvent()
.done());

final var timerCreated =
RecordingExporter.timerRecords(TimerIntent.CREATED)
final var serviceTaskActivated =
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("a")
.getFirst();

final var jobCreated =
Expand All @@ -596,7 +597,8 @@ public void shouldRejectTerminateIfElementIsTerminated() {

// when
engine.writeRecords(
cancelProcessInstanceCommand(processInstanceKey), triggerTimerCommand(timerCreated));
terminateElementCommand(serviceTaskActivated),
terminateElementCommand(serviceTaskActivated));

// then
final var rejectedCommand =
Expand Down Expand Up @@ -632,6 +634,12 @@ private RecordToWrite triggerTimerCommand(final Record<TimerRecordValue> timer)
return RecordToWrite.command().timer(TimerIntent.TRIGGER, timer.getValue()).key(timer.getKey());
}

private RecordToWrite terminateElementCommand(final Record<ProcessInstanceRecordValue> record) {
return RecordToWrite.command()
.processInstance(ProcessInstanceIntent.TERMINATE_ELEMENT, record.getValue())
.key(record.getKey());
}

private void assertThatCommandIsRejected(
final Record<ProcessInstanceRecordValue> command, final String rejectionReason) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.client.command;

import static io.camunda.zeebe.client.impl.util.DataSizeUtil.ONE_KB;
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public final class CancelProcessInstanceInBatchesTest {

private static final int AMOUNT_OF_ELEMENT_INSTANCES = 100;
private static final int MAX_MESSAGE_SIZE_KB = 32;
private static final EmbeddedBrokerRule BROKER_RULE =
new EmbeddedBrokerRule(
cfg -> {
cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
cfg.getGateway()
.getNetwork()
.setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
// Note: this is a bout the batch for writing to the logstream, not the terminate batch
cfg.getProcessing().setMaxCommandsInBatch(1);
});
private static final GrpcClientRule CLIENT_RULE =
new GrpcClientRule(
BROKER_RULE,
zeebeClientBuilder -> zeebeClientBuilder.maxMessageSize(MAX_MESSAGE_SIZE_KB * ONE_KB));

@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE);

@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();

@Test
public void shouldCancelInstanceWithMoreChildrenThanTheBatchSizeCanHandle() {
// given
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.zeebeOutputExpression("0", "count")
.exclusiveGateway("joining")
.parallelGateway("split")
.userTask("userTask")
.endEvent()
.moveToLastGateway()
.exclusiveGateway("forking")
.sequenceFlowId("sequenceToEnd")
.conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2))
.endEvent("endEvent")
.moveToLastExclusiveGateway()
.defaultFlow()
.scriptTask(
"increment", task -> task.zeebeExpression("count + 1").zeebeResultVariable("count"))
.connectTo("joining")
.done());

// when
final ProcessInstanceEvent processInstance = startProcessInstance();
cancelProcessInstance(processInstance);

// then
hasTerminatedProcessInstance(processInstance);
hasTerminatedAllUserTasks(processInstance);
hasTerminatedInMultipleBatches(processInstance, processInstance.getProcessInstanceKey());
}

@Test
public void shouldCancelSubprocessWithMoreNestedChildrenThanTheBatchSizeCanHandle() {
// given
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.subProcess(
"subprocess",
sp ->
sp.embeddedSubProcess()
.startEvent()
.zeebeOutputExpression("0", "count")
.exclusiveGateway("joining")
.parallelGateway("split")
.userTask("userTask")
.endEvent()
.moveToLastGateway()
.exclusiveGateway("forking")
.sequenceFlowId("sequenceToEnd")
.conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2))
.endEvent("endEvent")
.moveToLastExclusiveGateway()
.defaultFlow()
.scriptTask(
"increment",
task -> task.zeebeExpression("count + 1").zeebeResultVariable("count"))
.connectTo("joining"))
.endEvent()
.done());

// when
final ProcessInstanceEvent processInstance = startProcessInstance();
final var subProcess =
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("subprocess")
.getFirst();
cancelProcessInstance(processInstance);

// then
hasTerminatedProcessInstance(processInstance);
hasTerminatedAllUserTasks(processInstance);
hasTerminatedInMultipleBatches(processInstance, subProcess.getKey());
}

private ProcessInstanceEvent startProcessInstance() {
final var processInstance =
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("PROCESS")
.latestVersion()
.send()
.join();
return processInstance;
}

private void cancelProcessInstance(final ProcessInstanceEvent processInstance) {
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("endEvent")
.limit(1)
.await();

CLIENT_RULE
.getClient()
.newCancelInstanceCommand(processInstance.getProcessInstanceKey())
.send();
}

private void hasTerminatedProcessInstance(final ProcessInstanceEvent processInstance) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementType(BpmnElementType.PROCESS)
.limitToProcessInstanceTerminated()
.exists())
.describedAs("Has terminated process instance")
.isTrue();
}

private void hasTerminatedAllUserTasks(final ProcessInstanceEvent processInstance) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("userTask")
.limit(AMOUNT_OF_ELEMENT_INSTANCES))
.describedAs("Has terminated all " + AMOUNT_OF_ELEMENT_INSTANCES + " user tasks")
.hasSize(AMOUNT_OF_ELEMENT_INSTANCES);
}

private void hasTerminatedInMultipleBatches(
final ProcessInstanceEvent processInstance, final long batchElementInstanceKey) {
assertThat(
RecordingExporter.processInstanceBatchRecords()
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withBatchElementInstanceKey(batchElementInstanceKey)
.limit(2))
.describedAs("Has terminated in multiple batches")
.hasSize(2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.test.util.record;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue;
import java.util.stream.Stream;

public class ProcessInstanceBatchRecordStream
extends ExporterRecordStream<
ProcessInstanceBatchRecordValue, ProcessInstanceBatchRecordStream> {

public ProcessInstanceBatchRecordStream(
final Stream<Record<ProcessInstanceBatchRecordValue>> wrappedStream) {
super(wrappedStream);
}

@Override
protected ProcessInstanceBatchRecordStream supply(
final Stream<Record<ProcessInstanceBatchRecordValue>> wrappedStream) {
return new ProcessInstanceBatchRecordStream(wrappedStream);
}

public ProcessInstanceBatchRecordStream withProcessInstanceKey(final long processInstanceKey) {
return valueFilter(v -> v.getProcessInstanceKey() == processInstanceKey);
}

public ProcessInstanceBatchRecordStream withBatchElementInstanceKey(
final long batchElementInstanceKey) {
return valueFilter(v -> v.getBatchElementInstanceKey() == batchElementInstanceKey);
}

public ProcessInstanceBatchRecordStream withIndex(final long index) {
return valueFilter(v -> v.getIndex() == index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceCreationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
Expand Down Expand Up @@ -236,6 +237,11 @@ public static ProcessInstanceRecordStream processInstanceRecords(
return processInstanceRecords().withIntent(intent);
}

public static ProcessInstanceBatchRecordStream processInstanceBatchRecords() {
return new ProcessInstanceBatchRecordStream(
records(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchRecordValue.class));
}

public static TimerRecordStream timerRecords() {
return new TimerRecordStream(records(ValueType.TIMER, TimerRecordValue.class));
}
Expand Down

0 comments on commit e3b025a

Please sign in to comment.