Skip to content

Commit

Permalink
merge: #13497
Browse files Browse the repository at this point in the history
13497: Resolve flaky EmbeddedSubProcessTest r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
This test contained a race condition as it was depending on a specific timing of message correlations. This commit changes this test to use error events instead. The behavior is the same, but this should make it deterministic.

In other versions the test was moved to a different class. I will make a separate PR for this with back ports. This only for stable/8.0.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #11844



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Jul 14, 2023
2 parents d446906 + 6a64617 commit 117aba3
Showing 1 changed file with 29 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
import io.camunda.zeebe.model.bpmn.builder.EndEventBuilder;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.Assertions;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.VariableIntent;
Expand Down Expand Up @@ -581,28 +579,21 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
subprocess ->
subprocess
.startEvent()
.parallelGateway()
.serviceTask("task", b -> b.zeebeJobType("task"))
.moveToLastGateway()
.serviceTask("task2", b -> b.zeebeJobType("task2"))
.endEvent()
.moveToActivity("subProcess")
.boundaryEvent(
"msgBoundary",
boundary ->
boundary
.cancelActivity(true)
.message(
msg ->
msg.name("boundary")
.zeebeCorrelationKeyExpression("correlationKey")))
"errorBoundary",
boundary -> boundary.cancelActivity(true).error("boundaryError"))
.endEvent()
.done();

final Consumer<EventSubProcessBuilder> eventSubProcessBuilder =
eventSubProcess ->
eventSubProcess
.startEvent("eventSubProcessStartEvent")
.message(
m -> m.name("eventSubProcess").zeebeCorrelationKeyExpression("correlationKey"))
.endEvent();
eventSubProcess.startEvent("eventSubProcessStartEvent").error("espError").endEvent();

ENGINE
.deployment()
Expand All @@ -617,47 +608,33 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
.done())
.deploy();

final var processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("correlationKey", "correlationKey")
.create();
final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();

assertThat(
RecordingExporter.messageSubscriptionRecords(MessageSubscriptionIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(2))
.describedAs(
"The 2 message subscriptions must be created before we publish the "
+ "messages. As the messages have a TTL of 0 seconds")
.describedAs("")
.hasSize(2);
final var jobs =
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.limit(2)
.toList();
final var firstJob = jobs.get(0);
final var secondJob = jobs.get(1);

// when
// We need to make sure no records are written in between the publish commands. This could
// We need to make sure no records are written in between throw error commands. This could
// cause the test to become flaky.
final var boundaryJobRecord = new JobRecord();
boundaryJobRecord.wrapWithoutVariables((JobRecord) firstJob.getValue());
boundaryJobRecord.setErrorCode(BufferUtil.wrapString("boundaryError"));
final var espJobRecord = new JobRecord();
espJobRecord.wrapWithoutVariables((JobRecord) secondJob.getValue());
espJobRecord.setErrorCode(BufferUtil.wrapString("espError"));
ENGINE.writeRecords(
// First we publish a message to try and trigger the boundary event
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName("boundary")
.setTimeToLive(0L)
.setCorrelationKey("correlationKey")
.setVariables(BufferUtil.wrapString(""))),
// Next we publish a message to trigger the event sub process. This will interrupt the
// flow scope of the sub process, whilst the subprocess is being terminated because of the
// boundary event
// First we trigger the boundary event
RecordToWrite.command()
.message(
MessageIntent.PUBLISH,
new MessageRecord()
.setName("eventSubProcess")
.setTimeToLive(0L)
.setCorrelationKey("correlationKey")
.setVariables(BufferUtil.wrapString(""))));
.key(firstJob.getKey())
.job(JobIntent.THROW_ERROR, boundaryJobRecord),
// Next we trigger the event sub process. This will interrupt the flow scope of the sub
// process, whilst the subprocess is being terminated because of the boundary event
RecordToWrite.command().key(secondJob.getKey()).job(JobIntent.THROW_ERROR, espJobRecord));

// then
assertThat(
Expand All @@ -666,7 +643,7 @@ public void shouldNotTriggerBoundaryEventWhenFlowscopeIsInterrupted() {
.filter(r -> r.getValueType() == ValueType.PROCESS_EVENT)
.withIntent(ProcessEventIntent.TRIGGERING))
.extracting(r -> ((ProcessEventRecordValue) r.getValue()).getTargetElementId())
.containsExactly("msgBoundary", "eventSubProcessStartEvent");
.containsExactly("errorBoundary", "eventSubProcessStartEvent");

// No event should be TRIGGERED. We don't want to trigger the boundary event.
// The event sub process does not write a TRIGGERED event.
Expand Down

0 comments on commit 117aba3

Please sign in to comment.