Skip to content

Commit

Permalink
merge: #10715
Browse files Browse the repository at this point in the history
10715: Backport 10689 to 8.0 r=korthout a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
Backport #10689 to stable 8.0

## Related issues

<!-- Which issues are closed by this PR or are related -->




Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 13, 2022
2 parents b4b8802 + 32ef5df commit 74d2e1f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void processRecord(

stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key);
deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects);
messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription(
deploymentEvent, stateWriter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
Expand Down Expand Up @@ -53,7 +54,10 @@ public DeploymentDistributionBehavior(
commandWriter = writers.command();
}

public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) {
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);

otherPartitions.forEach(
Expand All @@ -62,7 +66,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);

distributeDeploymentToPartition(partitionId, key, copiedDeploymentBuffer);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(partitionId, key, copiedDeploymentBuffer);
return true;
});
});

if (otherPartitions.isEmpty()) {
Expand Down

0 comments on commit 74d2e1f

Please sign in to comment.