Skip to content

Commit

Permalink
merge: #13500
Browse files Browse the repository at this point in the history
13500: [Backport stable/8.1] Resolve flaky EmbeddedSubProcessConcurrencyTest r=remcowesterhoud a=backport-action

# Description
Backport of #13498 to `stable/8.1`.

relates to #11844

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Jul 14, 2023
2 parents 9c2a8c0 + 4a5a25e commit bb4f922
Showing 1 changed file with 30 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.EmbeddedSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
Expand Down Expand Up @@ -53,28 +52,21 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
subprocess ->
subprocess
.startEvent()
.parallelGateway()
.serviceTask("task", b -> b.zeebeJobType("task"))
.moveToLastGateway()
.serviceTask("task2", b -> b.zeebeJobType("task2"))
.endEvent()
.moveToActivity("subProcess")
.boundaryEvent(
"msgBoundary",
boundary ->
boundary
.cancelActivity(true)
.message(
msg ->
msg.name("boundary")
.zeebeCorrelationKeyExpression("correlationKey")))
"errorBoundary",
boundary -> boundary.cancelActivity(true).error("boundaryError"))
.endEvent()
.done();

final Consumer<EventSubProcessBuilder> eventSubProcessBuilder =
eventSubProcess ->
eventSubProcess
.startEvent("eventSubProcessStartEvent")
.message(
m -> m.name("eventSubProcess").zeebeCorrelationKeyExpression("correlationKey"))
.endEvent();
eventSubProcess.startEvent("eventSubProcessStartEvent").error("espError").endEvent();

ENGINE
.deployment()
Expand All @@ -89,47 +81,33 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
.done())
.deploy();

final var processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("correlationKey", "correlationKey")
.create();
final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();

assertThat(
RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(2))
.describedAs(
"The 2 message subscriptions must be created before we publish the "
+ "messages. As the messages have a TTL of 0 seconds")
.describedAs("")
.hasSize(2);
final var jobs =
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(2)
.toList();
final var firstJob = jobs.get(0);
final var secondJob = jobs.get(1);

// when
// We need to make sure no records are written in between the publish commands. This could
// We need to make sure no records are written in between throw error commands. This could
// cause the test to become flaky.
final var boundaryJobRecord = new JobRecord();
boundaryJobRecord.wrapWithoutVariables((JobRecord) firstJob.getValue());
boundaryJobRecord.setErrorCode(BufferUtil.wrapString("boundaryError"));
final var espJobRecord = new JobRecord();
espJobRecord.wrapWithoutVariables((JobRecord) secondJob.getValue());
espJobRecord.setErrorCode(BufferUtil.wrapString("espError"));
ENGINE.writeRecords(
// First we publish a message to try and trigger the boundary event
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName("boundary")
.setTimeToLive(0L)
.setCorrelationKey("correlationKey")
.setVariables(BufferUtil.wrapString(""))),
// Next we publish a message to trigger the event sub process. This will interrupt the
// flow scope of the sub process, whilst the subprocess is being terminated because of the
// boundary event
// First we trigger the boundary event
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName("eventSubProcess")
.setTimeToLive(0L)
.setCorrelationKey("correlationKey")
.setVariables(BufferUtil.wrapString(""))));
.key(firstJob.getKey())
.job(JobIntent.THROW_ERROR, boundaryJobRecord),
// Next we trigger the event sub process. This will interrupt the flow scope of the sub
// process, whilst the subprocess is being terminated because of the boundary event
RecordToWrite.command().key(secondJob.getKey()).job(JobIntent.THROW_ERROR, espJobRecord));

// then
assertThat(
Expand All @@ -138,7 +116,7 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
.filter(r -> r.getValueType() == ValueType.PROCESS_EVENT)
.withIntent(ProcessEventIntent.TRIGGERING))
.extracting(r -> ((ProcessEventRecordValue) r.getValue()).getTargetElementId())
.containsExactly("msgBoundary", "eventSubProcessStartEvent");
.containsExactly("errorBoundary", "eventSubProcessStartEvent");

// No event should be TRIGGERED. We don't want to trigger the boundary event.
// The event sub process does not write a TRIGGERED event.
Expand Down

0 comments on commit bb4f922

Please sign in to comment.