diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java index 3a10967f690..167cd712967 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/DeploymentCreateProcessor.java @@ -112,7 +112,7 @@ public void processRecord( stateWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent); - deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key); + deploymentDistributionBehavior.distributeDeployment(deploymentEvent, key, sideEffects); messageStartEventSubscriptionManager.tryReOpenMessageStartEventSubscription( deploymentEvent, stateWriter); diff --git a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java index c3c378a12d6..a3ff8d7ff68 100644 --- a/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java +++ b/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentDistributionBehavior.java @@ -7,6 +7,7 @@ */ package io.camunda.zeebe.engine.processing.deployment.distribute; +import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue; import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter; import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers; import io.camunda.zeebe.protocol.Protocol; @@ -45,7 +46,10 @@ public DeploymentDistributionBehavior( stateWriter = writers.state(); } - public void distributeDeployment(final DeploymentRecord deploymentEvent, final long key) { + public void distributeDeployment( + final DeploymentRecord deploymentEvent, + final long key, + final SideEffectQueue sideEffectQueue) { final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent); otherPartitions.forEach( @@ -54,7 +58,11 @@ public void distributeDeployment(final DeploymentRecord deploymentEvent, final l stateWriter.appendFollowUpEvent( key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord); - distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer); + sideEffectQueue.add( + () -> { + distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer); + return true; + }); }); if (otherPartitions.isEmpty()) {