Skip to content

Commit

Permalink
Not subscribe to new subscription if element is in ACTIVATING state (#…
Browse files Browse the repository at this point in the history
…19256)

## Description

During PIM (Process Instance Migration) an element shouldn't subscribe
into a new subscription if the element state is `ACTIVATING`, due to a
previous incident occurred

## Related issues

closes #19212
  • Loading branch information
korthout committed Jun 12, 2024
2 parents cae0e53 + 60ae4d0 commit 83bd020
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.msgpack.spec.MsgPackHelper;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
Expand Down Expand Up @@ -284,6 +285,37 @@ private void tryMigrateElementInstance(
.setBpmnProcessId(targetProcessDefinition.getBpmnProcessId())
.setTenantId(elementInstance.getValue().getTenantId())));

if (ProcessInstanceIntent.ELEMENT_ACTIVATING != elementInstance.getState()) {
// Elements in ACTIVATING state haven't subscribed to events yet. We shouldn't subscribe such
// elements to events during migration either. For elements that have been ACTIVATED, a
// subscription would already exist if needed. So, we want to deal with the expected event
// subscriptions. See: https://github.com/camunda/camunda/issues/19212
handleCatchEvents(
elementInstance,
targetProcessDefinition,
sourceElementIdToTargetElementId,
elementInstanceRecord,
targetElementId,
processInstanceKey,
elementId);
}
}

/**
* Unsubscribes the element instance from unmapped catch events in the source process, and
* subscribes it to unmapped catch events in the target process.
*
* <p>In the future, this method will also migrate event subscriptions if mappings are provided
* for the associated catch events.
*/
private void handleCatchEvents(
final ElementInstance elementInstance,
final DeployedProcess targetProcessDefinition,
final Map<String, String> sourceElementIdToTargetElementId,
final ProcessInstanceRecord elementInstanceRecord,
final String targetElementId,
final long processInstanceKey,
final String elementId) {
final var context = new BpmnElementContextImpl();
context.init(elementInstance.getKey(), elementInstanceRecord, elementInstance.getState());
final var targetElement =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.processinstance.migration;

import static io.camunda.zeebe.engine.processing.processinstance.migration.MigrationTestUtil.extractProcessDefinitionKeyByProcessId;
import static io.camunda.zeebe.protocol.record.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.IncidentIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.IncidentRecordValue;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;

public class MigrateProcessInstanceRegressionTest {
@ClassRule public static final EngineRule ENGINE = EngineRule.singlePartition();

@Rule public final TestWatcher watcher = new RecordingExporterTestWatcher();
@Rule public final BrokerClassRuleHelper helper = new BrokerClassRuleHelper();

// https://github.com/camunda/camunda/issues/19212
@Test
public void shouldResolveIncidentAfterMigratingActivatingElementWithMessageBoundaryEvent() {
// given
final String sourceProcessId = helper.getBpmnProcessId();
final String targetProcessId = helper.getBpmnProcessId() + "2";
final var deployment =
ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(sourceProcessId)
.startEvent()
.serviceTask("A", t -> t.zeebeJobType("task"))
.boundaryEvent(
"msg",
b ->
b.message(
m -> m.name("msg").zeebeCorrelationKeyExpression("missing_var")))
.endEvent()
.moveToNode("A")
.endEvent()
.done())
.withXmlResource(
Bpmn.createExecutableProcess(targetProcessId)
.startEvent()
.serviceTask("B", t -> t.zeebeJobType("task"))
.boundaryEvent(
"msg",
b ->
b.message(
m -> m.name("msg").zeebeCorrelationKeyExpression("existing_var")))
.endEvent()
.moveToNode("B")
.endEvent("target_process_end")
.done())
.deploy();
final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(sourceProcessId)
.withVariable("existing_var", "key")
.create();

final Record<IncidentRecordValue> incident =
RecordingExporter.incidentRecords(IncidentIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.getFirst();

final long targetProcessDefinitionKey =
extractProcessDefinitionKeyByProcessId(deployment, targetProcessId);

ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.migration()
.withTargetProcessDefinitionKey(targetProcessDefinitionKey)
.addMappingInstruction("A", "B")
.migrate();

// when
ENGINE.incident().ofInstance(processInstanceKey).withKey(incident.getKey()).resolve();

// then
assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.exists())
.describedAs("Expect that the problem was resolved, so we could create the job")
.isTrue();
}
}

0 comments on commit 83bd020

Please sign in to comment.