Skip to content

Commit

Permalink
merge: #10689
Browse files Browse the repository at this point in the history
10689: Distribute deployment in post commit tasks r=korthout a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
This change makes it so that we do not distribute a deployment during the processing of the `CREATE` command, but as post commit task afters this command has been processed.

The reasoning behind this change is that the writing of the deployment distribution events/commands could happen in an unexpected order. This is because of the event buffering. Once we write an event this gets buffered. When we notify a different partition about the deployment it will write (and possibly process) the command immediately. This results in a situation where the different partition could write it's commands/events before we commands/events of the `CREATE` command have been written, making the ordering seem backwards.

E.g.:
1. We receive a `Deployment.CREATE` command
2. During processing we will write the `DeploymentDistribution.DISTRIBUTING` event. During processing we also send a message to the other partitions in order to distribute the deployment.
3. The `DeploymentDistribution.DISTRIBUTING` event is written to the buffer. Nothing is written to the log stream yet. The other partition receives the message and writes a `Deployment.DISTRIBUTE` command. At this point this partition is idle so it will immediately start processing this command.
4. Here we run into a race condition. If the second partition sends the response back to the first partition before the first partition has finished processing the `Deployment.CREATE` command it will write the `DeploymentDistribution.COMPLETE` before it writes the buffered events.


With this change the order of commands/events should always be the same. As a result this fixes the flaky test that is reference as a related issue. The flow will be as follows:

On partition 1
```
Command     Deployment.CREATE                      
Event       Process.CREATED                        
Event       Deployment.CREATED                     
Event       DeplyomentDistribution.DISTRIBUTING     (store distribution to partition N in state)
<Post commit tasks trigger at this point causing the other partitions to starts processing the deployment>
Command     DeploymentDistribution.COMPLETE         
Event       DeploymentDistribution.COMPLETED        
Event       Deployment.FULLY_DISTRIBUTEED           (when all partitions have triggered a complete command)

```

On other partitions
```
Command     Deployment.DISTRIBUTE       (causes the DeploymentDistribution.COMPLETE command to be written on partition 1)
Event       Deployment.DISTRIBUTED      
```

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9964 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 13, 2022
2 parents 17161f6 + 33070a4 commit aff630e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void processRecord(

stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key);
deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects);
messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription(
deploymentEvent, stateWriter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
Expand Down Expand Up @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior(
stateWriter = writers.state();
}

public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) {
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);

otherPartitions.forEach(
Expand All @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);

distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
return true;
});
});

if (otherPartitions.isEmpty()) {
Expand Down

0 comments on commit aff630e

Please sign in to comment.