Skip to content

Commit

Permalink
merge: #10711
Browse files Browse the repository at this point in the history
10711: [Backport stable/8.1] Distribute deployment in post commit tasks r=remcowesterhoud a=backport-action

# Description
Backport of #10689 to `stable/8.1`.

relates to #9964

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 13, 2022
2 parents f9e2d3d + 918f864 commit 533e22d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Expand Up @@ -112,7 +112,7 @@ public void processRecord(

stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key);
deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects);
messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription(
deploymentEvent, stateWriter);

Expand Down
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
Expand Down Expand Up @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior(
stateWriter = writers.state();
}

public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) {
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);

otherPartitions.forEach(
Expand All @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);

distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
return true;
});
});

if (otherPartitions.isEmpty()) {
Expand Down

0 comments on commit 533e22d

Please sign in to comment.