Skip to content

Commit

Permalink
feat(engine): distribute deployment in post commit tasks
Browse files Browse the repository at this point in the history
This change makes it so that we do not distribute a deployment during the processing of the CREATE command, but as post commit task afters this command has been processed.

The reasoning behind this change is that the writing of the deployment distribution events/commands could happen in an unexpected order. This is because of the event buffering. Once we write an event this gets buffered. When we notify a different partition about the deployment it will write (and possibly process) the command immediately. This results in a situation where the different partition could write it's commands/events before we commands/events of the CREATE command have been written, making the ordering seem backwards.

E.g.:
1. We receive a Deployment.CREATE command

2. During processing we will write the DeploymentDistribution.DISTRIBUTING event. During processing we also send a message to the other partitions in order to distribute the deployment.

3. The DeploymentDistribution.DISTRIBUTING event is written to the buffer. Nothing is written to the log stream yet. The other partition receives the message and writes a Deployment.DISTRIBUTE command. At this point this partition is idle so it will immediately start processing this command.

4. Here we run into a race condition. If the second partition sends the response back to the first partition before the first partition has finished processing the Deployment.CREATE command it will write the DeploymentDistribution.COMPLETE before it writes the buffered events.
  • Loading branch information
remcowesterhoud committed Oct 12, 2022
1 parent 8778dea commit 33070a4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void processRecord(

stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key);
deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects);
messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription(
deploymentEvent, stateWriter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
Expand Down Expand Up @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior(
stateWriter = writers.state();
}

public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) {
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);

otherPartitions.forEach(
Expand All @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);

distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
return true;
});
});

if (otherPartitions.isEmpty()) {
Expand Down

0 comments on commit 33070a4

Please sign in to comment.