Skip to content

Commit

Permalink
feat(engine): distribute deployment in post commit tasks
Browse files Browse the repository at this point in the history
This change makes it so that we do not distribute a deployment during the processing of the CREATE command, but as post commit task afters this command has been processed.

The reasoning behind this change is that the writing of the deployment distribution events/commands could happen in an unexpected order. This is because of the event buffering. Once we write an event this gets buffered. When we notify a different partition about the deployment it will write (and possibly process) the command immediately. This results in a situation where the different partition could write it's commands/events before we commands/events of the CREATE command have been written, making the ordering seem backwards.

E.g.:
1. We receive a Deployment.CREATE command

2. During processing we will write the DeploymentDistribution.DISTRIBUTING event. During processing we also send a message to the other partitions in order to distribute the deployment.

3. The DeploymentDistribution.DISTRIBUTING event is written to the buffer. Nothing is written to the log stream yet. The other partition receives the message and writes a Deployment.DISTRIBUTE command. At this point this partition is idle so it will immediately start processing this command.

4. Here we run into a race condition. If the second partition sends the response back to the first partition before the first partition has finished processing the Deployment.CREATE command it will write the DeploymentDistribution.COMPLETE before it writes the buffered events.

(cherry picked from commit 33070a4)
  • Loading branch information
remcowesterhoud authored and github-actions[bot] committed Oct 13, 2022
1 parent f9e2d3d commit 918f864
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Expand Up @@ -112,7 +112,7 @@ public void processRecord(

stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key);
deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects);
messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription(
deploymentEvent, stateWriter);

Expand Down
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
Expand Down Expand Up @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior(
stateWriter = writers.state();
}

public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) {
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);

otherPartitions.forEach(
Expand All @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);

distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
return true;
});
});

if (otherPartitions.isEmpty()) {
Expand Down

0 comments on commit 918f864

Please sign in to comment.