Skip to content

Commit

Permalink
merge: #8648
Browse files Browse the repository at this point in the history
8648: Assign versions for deployed decisions and DRGs r=saig0 a=saig0

## Description

* check the DMN resources of a deployment if the DRGs/decisions were deployed before
  * a DRG/decision is a duplicate if the latest DRG/decision in the state has the same resource name and the same checksum (similar to the check of BPMN processes)
  * mark duplicated DRG/decisions and assign the latest versions from the state
  * increase the version if the DRG/decision is different from the latest DRG/decision in the state
* reject deploy command if two DRGs or decisions have the same ID (similar to the check of BPMN process ids)

## Related issues

closes #8174



Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-cloud[bot] and saig0 committed Jan 27, 2022
2 parents ddf8e5c + aacca62 commit 1ab92a6
Show file tree
Hide file tree
Showing 5 changed files with 526 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public DeploymentTransformer(
zeebeState.getProcessState(),
expressionProcessor);
final var dmnResourceTransformer =
new DmnResourceTransformer(keyGenerator, stateWriter, this::getChecksum);
new DmnResourceTransformer(
keyGenerator, stateWriter, this::getChecksum, zeebeState.getDecisionState());

resourceTransformers =
Map.ofEntries(
Expand Down Expand Up @@ -141,7 +142,7 @@ public String getRejectionReason() {
return rejectionReason;
}

private DeploymentResourceTransformer getResourceTransformer(String resourceName) {
private DeploymentResourceTransformer getResourceTransformer(final String resourceName) {
return resourceTransformers.entrySet().stream()
.filter(entry -> resourceName.endsWith(entry.getKey()))
.map(Entry::getValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,54 @@
*/
package io.camunda.zeebe.engine.processing.deployment.transform;

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapString;

import io.camunda.zeebe.dmn.DecisionEngine;
import io.camunda.zeebe.dmn.DecisionEngineFactory;
import io.camunda.zeebe.dmn.ParsedDecision;
import io.camunda.zeebe.dmn.ParsedDecisionRequirementsGraph;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.deployment.PersistedDecisionRequirements;
import io.camunda.zeebe.engine.state.immutable.DecisionState;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsMetadataRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.record.intent.DecisionIntent;
import io.camunda.zeebe.protocol.record.intent.DecisionRequirementsIntent;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsMetadataValue;
import io.camunda.zeebe.util.Either;
import java.io.ByteArrayInputStream;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;

public final class DmnResourceTransformer implements DeploymentResourceTransformer {

private static final int INITIAL_VERSION = 1;

private static final Either<Failure, Object> NO_DUPLICATES = Either.right(null);

private final DecisionEngine decisionEngine = DecisionEngineFactory.createDecisionEngine();

private final KeyGenerator keyGenerator;
private final StateWriter stateWriter;
private final Function<DeploymentResource, DirectBuffer> checksumGenerator;
private final DecisionState decisionState;

public DmnResourceTransformer(
final KeyGenerator keyGenerator,
final StateWriter stateWriter,
final Function<DeploymentResource, DirectBuffer> checksumGenerator) {
final Function<DeploymentResource, DirectBuffer> checksumGenerator,
final DecisionState decisionState) {
this.keyGenerator = keyGenerator;
this.stateWriter = stateWriter;
this.checksumGenerator = checksumGenerator;
this.decisionState = decisionState;
}

@Override
Expand All @@ -50,81 +65,207 @@ public Either<Failure, Void> transformResource(
final var parsedDrg = decisionEngine.parse(dmnResource);

if (parsedDrg.isValid()) {
appendMetadataToDeploymentEvent(resource, parsedDrg, deployment);
writeRecords(deployment, resource);
return Either.right(null);
return checkForDuplicateIds(resource, parsedDrg, deployment)
.map(
noDuplicates -> {
appendMetadataToDeploymentEvent(resource, parsedDrg, deployment);
writeRecords(deployment, resource);
return null;
});

} else {
final var failure = new Failure(parsedDrg.getFailureMessage());
return Either.left(failure);
}
}

private Either<Failure, ?> checkForDuplicateIds(
final DeploymentResource resource,
final ParsedDecisionRequirementsGraph parsedDrg,
final DeploymentRecord deploymentEvent) {

return checkDuplicatedDrgIds(resource, parsedDrg, deploymentEvent)
.flatMap(noDuplicates -> checkDuplicatedDecisionIds(resource, parsedDrg, deploymentEvent));
}

private Either<Failure, ?> checkDuplicatedDrgIds(
final DeploymentResource resource,
final ParsedDecisionRequirementsGraph parsedDrg,
final DeploymentRecord deploymentEvent) {

final var decisionRequirementsId = parsedDrg.getId();

return deploymentEvent.getDecisionRequirementsMetadata().stream()
.filter(drg -> drg.getDecisionRequirementsId().equals(decisionRequirementsId))
.findFirst()
.map(
duplicatedDrg -> {
final var failureMessage =
String.format(
"Expected the decision requirements ids to be unique within a deployment"
+ " but found a duplicated id '%s' in the resources '%s' and '%s'.",
decisionRequirementsId,
duplicatedDrg.getResourceName(),
resource.getResourceName());
return Either.left(new Failure(failureMessage));
})
.orElse(NO_DUPLICATES);
}

private Either<Failure, ?> checkDuplicatedDecisionIds(
final DeploymentResource resource,
final ParsedDecisionRequirementsGraph parsedDrg,
final DeploymentRecord deploymentEvent) {

final var decisionIds =
parsedDrg.getDecisions().stream().map(ParsedDecision::getId).collect(Collectors.toList());

return deploymentEvent.getDecisionsMetadata().stream()
.filter(decision -> decisionIds.contains(decision.getDecisionId()))
.findFirst()
.map(
duplicatedDecision -> {
final var failureMessage =
String.format(
"Expected the decision ids to be unique within a deployment"
+ " but found a duplicated id '%s' in the resources '%s' and '%s'.",
duplicatedDecision.getDecisionId(),
findResourceName(
deploymentEvent, duplicatedDecision.getDecisionRequirementsKey()),
resource.getResourceName());
return Either.left(new Failure(failureMessage));
})
.orElse(NO_DUPLICATES);
}

private String findResourceName(
final DeploymentRecord deploymentEvent, final long decisionRequirementsKey) {

return deploymentEvent.getDecisionRequirementsMetadata().stream()
.filter(drg -> drg.getDecisionRequirementsKey() == decisionRequirementsKey)
.map(DecisionRequirementsMetadataValue::getResourceName)
.findFirst()
.orElse("<?>");
}

private void appendMetadataToDeploymentEvent(
final DeploymentResource resource,
final ParsedDecisionRequirementsGraph parsedDrg,
final DeploymentRecord deploymentEvent) {

final var decisionRequirementsKey = keyGenerator.nextKey();
final var checksum = checksumGenerator.apply(resource);
final LongSupplier newDecisionRequirementsKey = keyGenerator::nextKey;
final DirectBuffer checksum = checksumGenerator.apply(resource);
final var drgRecord = deploymentEvent.decisionRequirementsMetadata().add();

deploymentEvent
.decisionRequirementsMetadata()
.add()
.setDecisionRequirementsKey(decisionRequirementsKey)
drgRecord
.setDecisionRequirementsId(parsedDrg.getId())
.setDecisionRequirementsName(parsedDrg.getName())
.setDecisionRequirementsVersion(1)
.setNamespace(parsedDrg.getNamespace())
.setResourceName(resource.getResourceName())
.setChecksum(checksum);

decisionState
.findLatestDecisionRequirementsById(wrapString(parsedDrg.getId()))
.ifPresentOrElse(
latestDrg -> {
final int latestVersion = latestDrg.getDecisionRequirementsVersion();
final boolean isDuplicate = isDuplicate(resource, checksum, latestDrg);
if (isDuplicate) {
drgRecord
.setDecisionRequirementsKey(latestDrg.getDecisionRequirementsKey())
.setDecisionRequirementsVersion(latestVersion)
.markAsDuplicate();
} else {
drgRecord
.setDecisionRequirementsKey(newDecisionRequirementsKey.getAsLong())
.setDecisionRequirementsVersion(latestVersion + 1);
}
},
() ->
drgRecord
.setDecisionRequirementsKey(newDecisionRequirementsKey.getAsLong())
.setDecisionRequirementsVersion(INITIAL_VERSION));

parsedDrg
.getDecisions()
.forEach(
decision -> {
final var decisionKey = keyGenerator.nextKey();
final LongSupplier newDecisionKey = keyGenerator::nextKey;

deploymentEvent
.decisionsMetadata()
.add()
.setDecisionKey(decisionKey)
final var decisionRecord = deploymentEvent.decisionsMetadata().add();
decisionRecord
.setDecisionId(decision.getId())
.setDecisionName(decision.getName())
.setVersion(1)
.setDecisionRequirementsId(parsedDrg.getId())
.setDecisionRequirementsKey(decisionRequirementsKey);
.setDecisionRequirementsKey(drgRecord.getDecisionRequirementsKey());

decisionState
.findLatestDecisionById(wrapString(decision.getId()))
.ifPresentOrElse(
latestDecision -> {
final var latestVersion = latestDecision.getVersion();
final var isDuplicate =
latestDecision.getDecisionRequirementsKey()
== drgRecord.getDecisionRequirementsKey();
if (isDuplicate) {
decisionRecord
.setDecisionKey(latestDecision.getDecisionKey())
.setVersion(latestVersion)
.markAsDuplicate();
} else {
decisionRecord
.setDecisionKey(newDecisionKey.getAsLong())
.setVersion(latestVersion + 1);
}
},
() ->
decisionRecord
.setDecisionKey(newDecisionKey.getAsLong())
.setVersion(INITIAL_VERSION));
});
}

private boolean isDuplicate(
final DeploymentResource resource,
final DirectBuffer checksum,
final PersistedDecisionRequirements drg) {

return drg.getResourceName().equals(resource.getResourceNameBuffer())
&& drg.getChecksum().equals(checksum);
}

private void writeRecords(final DeploymentRecord deployment, final DeploymentResource resource) {

for (DecisionRequirementsMetadataRecord drg : deployment.decisionRequirementsMetadata()) {
stateWriter.appendFollowUpEvent(
drg.getDecisionRequirementsKey(),
DecisionRequirementsIntent.CREATED,
new DecisionRequirementsRecord()
.setDecisionRequirementsKey(drg.getDecisionRequirementsKey())
.setDecisionRequirementsId(drg.getDecisionRequirementsId())
.setDecisionRequirementsName(drg.getDecisionRequirementsName())
.setDecisionRequirementsVersion(drg.getDecisionRequirementsVersion())
.setNamespace(drg.getNamespace())
.setResourceName(drg.getResourceName())
.setChecksum(drg.getChecksumBuffer())
.setResource(resource.getResourceBuffer()));
for (final DecisionRequirementsMetadataRecord drg : deployment.decisionRequirementsMetadata()) {
if (!drg.isDuplicate()) {
stateWriter.appendFollowUpEvent(
drg.getDecisionRequirementsKey(),
DecisionRequirementsIntent.CREATED,
new DecisionRequirementsRecord()
.setDecisionRequirementsKey(drg.getDecisionRequirementsKey())
.setDecisionRequirementsId(drg.getDecisionRequirementsId())
.setDecisionRequirementsName(drg.getDecisionRequirementsName())
.setDecisionRequirementsVersion(drg.getDecisionRequirementsVersion())
.setNamespace(drg.getNamespace())
.setResourceName(drg.getResourceName())
.setChecksum(drg.getChecksumBuffer())
.setResource(resource.getResourceBuffer()));
}
}

for (DecisionRecord decision : deployment.decisionsMetadata()) {
stateWriter.appendFollowUpEvent(
decision.getDecisionKey(),
DecisionIntent.CREATED,
new DecisionRecord()
.setDecisionKey(decision.getDecisionKey())
.setDecisionId(decision.getDecisionId())
.setDecisionName(decision.getDecisionName())
.setVersion(decision.getVersion())
.setDecisionRequirementsId(decision.getDecisionRequirementsId())
.setDecisionRequirementsKey(decision.getDecisionRequirementsKey()));
for (final DecisionRecord decision : deployment.decisionsMetadata()) {
if (!decision.isDuplicate()) {
stateWriter.appendFollowUpEvent(
decision.getDecisionKey(),
DecisionIntent.CREATED,
new DecisionRecord()
.setDecisionKey(decision.getDecisionKey())
.setDecisionId(decision.getDecisionId())
.setDecisionName(decision.getDecisionName())
.setVersion(decision.getVersion())
.setDecisionRequirementsId(decision.getDecisionRequirementsId())
.setDecisionRequirementsKey(decision.getDecisionRequirementsKey()));
}
}
}
}

0 comments on commit 1ab92a6

Please sign in to comment.