Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate children using the new ProcessInstanceBatch command #12604

Merged
merged 4 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;

public final class BpmnStateBehavior {
Expand Down Expand Up @@ -99,15 +98,6 @@ public ElementInstance getFlowScopeInstance(final BpmnElementContext context) {
return elementInstanceState.getInstance(context.getFlowScopeKey());
}

public List<BpmnElementContext> getChildInstances(final BpmnElementContext context) {
return elementInstanceState.getChildren(context.getElementInstanceKey()).stream()
.map(
childInstance ->
context.copy(
childInstance.getKey(), childInstance.getValue(), childInstance.getState()))
.collect(Collectors.toList());
}

public BpmnElementContext getFlowScopeContext(final BpmnElementContext context) {
final var flowScope = getFlowScopeInstance(context);
return context.copy(flowScope.getKey(), flowScope.getValue(), flowScope.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.deployment.DeployedProcess;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceBatchRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.stream.api.state.KeyGenerator;
Expand Down Expand Up @@ -318,26 +320,27 @@ public void activateElementInstanceInFlowScope(
}

/**
* Terminate all child instances of the given scope.
* Terminate all child instances of the given scope. Terminating is done in batches. It is
* triggered by writing the ProcessInstanceBatch TERMINATE command.
*
* @param context the scope to terminate the child instances of
* @return {@code true} if the scope has no active child instances
*/
public boolean terminateChildInstances(final BpmnElementContext context) {

stateBehavior.getChildInstances(context).stream()
.filter(child -> ProcessInstanceLifecycle.canTerminate(child.getIntent()))
.forEach(
childInstanceContext ->
commandWriter.appendFollowUpCommand(
childInstanceContext.getElementInstanceKey(),
ProcessInstanceIntent.TERMINATE_ELEMENT,
childInstanceContext.getRecordValue()));

final var elementInstance = stateBehavior.getElementInstance(context);
final var activeChildInstances = elementInstance.getNumberOfActiveElementInstances();

return activeChildInstances == 0;
if (activeChildInstances == 0) {
return true;
} else {
final var batchRecord =
new ProcessInstanceBatchRecord()
.setProcessInstanceKey(context.getProcessInstanceKey())
.setBatchElementInstanceKey(context.getElementInstanceKey());
final var key = keyGenerator.nextKey();
commandWriter.appendFollowUpCommand(key, ProcessInstanceBatchIntent.TERMINATE, batchRecord);
return false;
}
}

public <T extends ExecutableFlowNode> void takeOutgoingSequenceFlows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceBatchIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand Down Expand Up @@ -256,6 +257,7 @@ public void shouldTerminateSubProcessBeforeTriggeringBoundaryEvent() {
tuple(ValueType.PROCESS_EVENT, ProcessEventIntent.TRIGGERING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchIntent.TERMINATE),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.JOB, JobIntent.CANCELED),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,9 +584,10 @@ public void shouldRejectTerminateIfElementIsTerminated() {
.endEvent()
.done());

final var timerCreated =
RecordingExporter.timerRecords(TimerIntent.CREATED)
final var serviceTaskActivated =
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("a")
.getFirst();

final var jobCreated =
Expand All @@ -596,7 +597,8 @@ public void shouldRejectTerminateIfElementIsTerminated() {

// when
engine.writeRecords(
cancelProcessInstanceCommand(processInstanceKey), triggerTimerCommand(timerCreated));
terminateElementCommand(serviceTaskActivated),
terminateElementCommand(serviceTaskActivated));

// then
final var rejectedCommand =
Expand Down Expand Up @@ -632,6 +634,12 @@ private RecordToWrite triggerTimerCommand(final Record<TimerRecordValue> timer)
return RecordToWrite.command().timer(TimerIntent.TRIGGER, timer.getValue()).key(timer.getKey());
}

private RecordToWrite terminateElementCommand(final Record<ProcessInstanceRecordValue> record) {
return RecordToWrite.command()
.processInstance(ProcessInstanceIntent.TERMINATE_ELEMENT, record.getValue())
.key(record.getKey());
}

private void assertThatCommandIsRejected(
final Record<ProcessInstanceRecordValue> command, final String rejectionReason) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.it.client.command;

import static io.camunda.zeebe.client.impl.util.DataSizeUtil.ONE_KB;
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.broker.test.EmbeddedBrokerRule;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.springframework.util.unit.DataSize;

public final class CancelProcessInstanceInBatchesTest {

private static final int AMOUNT_OF_ELEMENT_INSTANCES = 100;
private static final int MAX_MESSAGE_SIZE_KB = 32;
private static final EmbeddedBrokerRule BROKER_RULE =
new EmbeddedBrokerRule(
cfg -> {
cfg.getNetwork().setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
cfg.getGateway()
.getNetwork()
.setMaxMessageSize(DataSize.ofKilobytes(MAX_MESSAGE_SIZE_KB));
// Note: this is a bout the batch for writing to the logstream, not the terminate batch
cfg.getProcessing().setMaxCommandsInBatch(1);
});
private static final GrpcClientRule CLIENT_RULE =
new GrpcClientRule(
BROKER_RULE,
zeebeClientBuilder -> zeebeClientBuilder.maxMessageSize(MAX_MESSAGE_SIZE_KB * ONE_KB));

@ClassRule
public static RuleChain ruleChain = RuleChain.outerRule(BROKER_RULE).around(CLIENT_RULE);

@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();

@Test
public void shouldCancelInstanceWithMoreChildrenThanTheBatchSizeCanHandle() {
// given
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.zeebeOutputExpression("0", "count")
.exclusiveGateway("joining")
.parallelGateway("split")
.userTask("userTask")
.endEvent()
.moveToLastGateway()
.exclusiveGateway("forking")
.sequenceFlowId("sequenceToEnd")
.conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2))
.endEvent("endEvent")
.moveToLastExclusiveGateway()
.defaultFlow()
.scriptTask(
"increment", task -> task.zeebeExpression("count + 1").zeebeResultVariable("count"))
.connectTo("joining")
.done());

// when
final ProcessInstanceEvent processInstance = startProcessInstance();
cancelProcessInstance(processInstance);

// then
hasTerminatedProcessInstance(processInstance);
hasTerminatedAllUserTasks(processInstance);
hasTerminatedInMultipleBatches(processInstance, processInstance.getProcessInstanceKey());
}

@Test
public void shouldCancelSubprocessWithMoreNestedChildrenThanTheBatchSizeCanHandle() {
// given
CLIENT_RULE.deployProcess(
Bpmn.createExecutableProcess("PROCESS")
.startEvent()
.subProcess(
"subprocess",
sp ->
sp.embeddedSubProcess()
.startEvent()
.zeebeOutputExpression("0", "count")
.exclusiveGateway("joining")
.parallelGateway("split")
.userTask("userTask")
.endEvent()
.moveToLastGateway()
.exclusiveGateway("forking")
.sequenceFlowId("sequenceToEnd")
.conditionExpression("count > " + (AMOUNT_OF_ELEMENT_INSTANCES - 2))
.endEvent("endEvent")
.moveToLastExclusiveGateway()
.defaultFlow()
.scriptTask(
"increment",
task -> task.zeebeExpression("count + 1").zeebeResultVariable("count"))
.connectTo("joining"))
.endEvent()
.done());

// when
final ProcessInstanceEvent processInstance = startProcessInstance();
final var subProcess =
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("subprocess")
.getFirst();
cancelProcessInstance(processInstance);

// then
hasTerminatedProcessInstance(processInstance);
hasTerminatedAllUserTasks(processInstance);
hasTerminatedInMultipleBatches(processInstance, subProcess.getKey());
}

private ProcessInstanceEvent startProcessInstance() {
final var processInstance =
CLIENT_RULE
.getClient()
.newCreateInstanceCommand()
.bpmnProcessId("PROCESS")
.latestVersion()
.send()
.join();
return processInstance;
}

private void cancelProcessInstance(final ProcessInstanceEvent processInstance) {
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("endEvent")
.limit(1)
.await();

CLIENT_RULE
.getClient()
.newCancelInstanceCommand(processInstance.getProcessInstanceKey())
.send();
}

private void hasTerminatedProcessInstance(final ProcessInstanceEvent processInstance) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementType(BpmnElementType.PROCESS)
.limitToProcessInstanceTerminated()
.exists())
.describedAs("Has terminated process instance")
.isTrue();
}

private void hasTerminatedAllUserTasks(final ProcessInstanceEvent processInstance) {
assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_TERMINATED)
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withElementId("userTask")
.limit(AMOUNT_OF_ELEMENT_INSTANCES))
.describedAs("Has terminated all " + AMOUNT_OF_ELEMENT_INSTANCES + " user tasks")
.hasSize(AMOUNT_OF_ELEMENT_INSTANCES);
}

private void hasTerminatedInMultipleBatches(
final ProcessInstanceEvent processInstance, final long batchElementInstanceKey) {
assertThat(
RecordingExporter.processInstanceBatchRecords()
.withProcessInstanceKey(processInstance.getProcessInstanceKey())
.withBatchElementInstanceKey(batchElementInstanceKey)
.limit(2))
.describedAs("Has terminated in multiple batches")
.hasSize(2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.test.util.record;

import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue;
import java.util.stream.Stream;

public class ProcessInstanceBatchRecordStream
extends ExporterRecordStream<
ProcessInstanceBatchRecordValue, ProcessInstanceBatchRecordStream> {

public ProcessInstanceBatchRecordStream(
final Stream<Record<ProcessInstanceBatchRecordValue>> wrappedStream) {
super(wrappedStream);
}

@Override
protected ProcessInstanceBatchRecordStream supply(
final Stream<Record<ProcessInstanceBatchRecordValue>> wrappedStream) {
return new ProcessInstanceBatchRecordStream(wrappedStream);
}

public ProcessInstanceBatchRecordStream withProcessInstanceKey(final long processInstanceKey) {
return valueFilter(v -> v.getProcessInstanceKey() == processInstanceKey);
}

public ProcessInstanceBatchRecordStream withBatchElementInstanceKey(
final long batchElementInstanceKey) {
return valueFilter(v -> v.getBatchElementInstanceKey() == batchElementInstanceKey);
}

public ProcessInstanceBatchRecordStream withIndex(final long index) {
return valueFilter(v -> v.getIndex() == index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.camunda.zeebe.protocol.record.value.MessageRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageStartEventSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.MessageSubscriptionRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceBatchRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceCreationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
Expand Down Expand Up @@ -236,6 +237,11 @@ public static ProcessInstanceRecordStream processInstanceRecords(
return processInstanceRecords().withIntent(intent);
}

public static ProcessInstanceBatchRecordStream processInstanceBatchRecords() {
return new ProcessInstanceBatchRecordStream(
records(ValueType.PROCESS_INSTANCE_BATCH, ProcessInstanceBatchRecordValue.class));
}

public static TimerRecordStream timerRecords() {
return new TimerRecordStream(records(ValueType.TIMER, TimerRecordValue.class));
}
Expand Down