-
Notifications
You must be signed in to change notification settings - Fork 556
/
DeploymentDistributionBehavior.java
84 lines (72 loc) · 3.61 KB
/
DeploymentDistributionBehavior.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentDistributionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
public final class DeploymentDistributionBehavior {
private final DeploymentDistributionRecord deploymentDistributionRecord =
new DeploymentDistributionRecord();
private final DeploymentRecord emptyDeploymentRecord = new DeploymentRecord();
private final List<Integer> otherPartitions;
private final DeploymentDistributionCommandSender deploymentDistributionCommandSender;
private final StateWriter stateWriter;
public DeploymentDistributionBehavior(
final Writers writers,
final int partitionsCount,
final DeploymentDistributionCommandSender deploymentDistributionCommandSender) {
otherPartitions =
IntStream.range(Protocol.START_PARTITION_ID, Protocol.START_PARTITION_ID + partitionsCount)
.filter(partition -> partition != Protocol.DEPLOYMENT_PARTITION)
.boxed()
.collect(Collectors.toList());
this.deploymentDistributionCommandSender = deploymentDistributionCommandSender;
stateWriter = writers.state();
}
public void distributeDeployment(
final DeploymentRecord deploymentEvent,
final long key,
final SideEffectQueue sideEffectQueue) {
final var copiedDeploymentBuffer = BufferUtil.createCopy(deploymentEvent);
otherPartitions.forEach(
partitionId -> {
deploymentDistributionRecord.setPartition(partitionId);
stateWriter.appendFollowUpEvent(
key, DeploymentDistributionIntent.DISTRIBUTING, deploymentDistributionRecord);
sideEffectQueue.add(
() -> {
distributeDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);
return true;
});
});
if (otherPartitions.isEmpty()) {
// todo(zell): https://github.com/zeebe-io/zeebe/issues/6314
// we easily reach the record limit if we always write the deployment record
// since no one consumes currently the FULLY_DISTRIBUTED (only the key) we write an empty
// record
stateWriter.appendFollowUpEvent(
key, DeploymentIntent.FULLY_DISTRIBUTED, emptyDeploymentRecord);
}
}
public void distributeDeploymentToPartition(
final long key, final int partitionId, final DirectBuffer copiedDeploymentBuffer) {
final var deploymentRecord = new DeploymentRecord();
deploymentRecord.wrap(copiedDeploymentBuffer);
deploymentDistributionCommandSender.distributeToPartition(key, partitionId, deploymentRecord);
}
}