From 33070a437e5beb2d69b02db1e3a1c50e072739ba Mon Sep 17 00:00:00 2001 From: Remco Westerhoud Date: Wed, 12 Oct 2022 15:02:53 +0200 Subject: [PATCH] feat(engine): distribute deployment in post commit tasks This change makes it so that we do not distribute a deployment during the processing of the CREATE command, but as post commit task afters this command has been processed. The reasoning behind this change is that the writing of the deployment distribution events/commands could happen in an unexpected order. This is because of the event buffering. Once we write an event this gets buffered. When we notify a different partition about the deployment it will write (and possibly process) the command immediately. This results in a situation where the different partition could write it's commands/events before we commands/events of the CREATE command have been written, making the ordering seem backwards. E.g.: 1. We receive a Deployment.CREATE command 2. During processing we will write the DeploymentDistribution.DISTRIBUTING event. During processing we also send a message to the other partitions in order to distribute the deployment. 3. The DeploymentDistribution.DISTRIBUTING event is written to the buffer. Nothing is written to the log stream yet. The other partition receives the message and writes a Deployment.DISTRIBUTE command. At this point this partition is idle so it will immediately start processing this command. 4. Here we run into a race condition. If the second partition sends the response back to the first partition before the first partition has finished processing the Deployment.CREATE command it will write the DeploymentDistribution.COMPLETE before it writes the buffered events. --- .../deployment/DeploymentCreateProcessor.java | 2 +- .../distribute/DeploymentDistributionBehavior.java | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java index 3a10967f6906..167cd7129674 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java @@ -112,7 +112,7 @@ public void processRecord( stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent); - deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key); + deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects); messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription( deploymentEvent, stateWriter); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java index c3c378a12d6a..a3ff8d7ff680 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java @@ -7,6 +7,7 @@ */ package io.camunda.zeebe.engine.processing.deployment.distribute; +import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.protocol.Protocol; @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior( stateWriter = writers.state(); } - public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) { + public void distributeDeployment( + final DeploymentRecord deploymentEvent, + final long key, + final SideEffectQueue sideEffectQueue) { final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent); otherPartitions.forEach( @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l stateWriter.appendFollowUpEvent( key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord); - distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer); + sideEffectQueue.add( + () -> { + distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer); + return true; + }); }); if (otherPartitions.isEmpty()) {