Skip to content

Commit

Permalink
Avoid duplicate modify operations (provisioning)
Browse files Browse the repository at this point in the history
  • Loading branch information
semancik committed Sep 4, 2017
1 parent 182d3f5 commit c7cfdf6
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 71 deletions.
Expand Up @@ -92,6 +92,7 @@ public void setEnabled(Boolean enabled) throws ConnectException, FileNotFoundExc
checkModifyBreak();
delayOperation();
this.enabled = enabled;
recordModify();
}

public Date getValidFrom() {
Expand All @@ -102,6 +103,7 @@ public void setValidFrom(Date validFrom) throws ConnectException, FileNotFoundEx
checkModifyBreak();
delayOperation();
this.validFrom = validFrom;
recordModify();
}

public Date getValidTo() {
Expand All @@ -112,6 +114,7 @@ public void setValidTo(Date validTo) throws ConnectException, FileNotFoundExcept
checkModifyBreak();
delayOperation();
this.validTo = validTo;
recordModify();
}

public BreakMode getModifyBreakMode() {
Expand Down
Expand Up @@ -673,7 +673,10 @@ public <T extends ObjectType> String modifyObject(Class<T> type, String oid,
} else {
cacheRepositoryService.modifyObject(type, oid, modifications, result);
}
result.computeStatus();
if (!result.isInProgress()) {
// This is the case when there is already a conflicting pending operation.
result.computeStatus();
}

} catch (CommunicationException e) {
ProvisioningUtil.recordFatalError(LOGGER, result, "Couldn't modify object: communication problem: " + e.getMessage(), e);
Expand Down
Expand Up @@ -717,6 +717,12 @@ public String modifyShadow(PrismObject<ShadowType> repoShadow, String oid,

ProvisioningContext ctx = ctxFactory.create(repoShadow, additionalAuxiliaryObjectClassQNames, task,
parentResult);

PendingOperationType duplicateOperation = shadowManager.checkAndRecordPendingModifyOperationBeforeExecution(ctx, repoShadow, modifications, task, parentResult);
if (duplicateOperation != null) {
parentResult.recordInProgress();
return repoShadow.getOid();
}

AsynchronousOperationReturnValue<Collection<PropertyDelta<PrismPropertyValue>>> asyncReturnValue;
try {
Expand Down Expand Up @@ -1027,7 +1033,7 @@ private boolean expirePendingOperations(ProvisioningContext ctx, PrismObject<Sha
XMLGregorianCalendar completionTimestamp = pendingOperation.getCompletionTimestamp();

if (isCompleted(statusType) && isOverGrace(now, gracePeriod, completionTimestamp)) {
LOGGER.trace("Deleting pending operation because it is completed '{}' (and over grace): {}", statusType.value(), pendingOperation);
LOGGER.trace("Deleting pending operation because it is completed '{}' (and over grace): {}", statusType==null?null:statusType.value(), pendingOperation);
shadowDelta.addModificationDeleteContainer(new ItemPath(ShadowType.F_PENDING_OPERATION), pendingOperation.clone());
} else {
atLeastOneOperationRemains = true;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;

import javax.xml.namespace.QName;

Expand All @@ -34,6 +35,7 @@
import com.evolveum.midpoint.common.refinery.RefinedObjectClassDefinition;
import com.evolveum.midpoint.prism.Containerable;
import com.evolveum.midpoint.prism.Item;
import com.evolveum.midpoint.prism.Objectable;
import com.evolveum.midpoint.prism.PrismContainer;
import com.evolveum.midpoint.prism.PrismContainerDefinition;
import com.evolveum.midpoint.prism.PrismContainerValue;
Expand Down Expand Up @@ -66,6 +68,7 @@
import com.evolveum.midpoint.repo.api.ModificationPrecondition;
import com.evolveum.midpoint.repo.api.OptimisticLockingRunner;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.api.RepositoryOperation;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.repo.api.VersionPrecondition;
import com.evolveum.midpoint.schema.DeltaConvertor;
Expand Down Expand Up @@ -936,26 +939,95 @@ private PendingOperationType createPendingOperationAdd(PrismObject<ShadowType> r
private void addPendingOperationModify(ProvisioningContext ctx, PrismObject<ShadowType> shadow, Collection<? extends ItemDelta> pendingModifications,
OperationResult resourceOperationResult, OperationResult parentResult) throws ObjectNotFoundException, SchemaException {

ObjectDelta<ShadowType> pendingDelta = shadow.createModifyDelta();
for (ItemDelta pendingModification: pendingModifications) {
pendingDelta.addModification(pendingModification.clone());
}

ObjectDelta<ShadowType> pendingDelta = createModifyDelta(shadow, pendingModifications);
addPendingOperationDelta(ctx, shadow, pendingDelta, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);
}

// returns conflicting operation (pending delta) if there is any
public PendingOperationType checkAndRecordPendingDeleteOperationBeforeExecution(ProvisioningContext ctx,
PrismObject<ShadowType> shadow, Task task, OperationResult parentResult) throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {

ObjectDelta<ShadowType> proposedDelta = shadow.createDeleteDelta();
return checkAndRecordPendingOperationBeforeExecution(ctx, shadow, proposedDelta, task, parentResult);
}

private void recordPendingDeleteOperationAfterExecution(ProvisioningContext ctx, PrismObject<ShadowType> oldRepoShadow,
OperationResult resourceOperationResult, OperationResult parentResult) throws SchemaException, ObjectNotFoundException, CommunicationException, ConfigurationException, ExpressionEvaluationException {

if (ResourceTypeUtil.getRecordPendingOperations(ctx.getResource()) == RecordPendingOperationsType.ALL) {
// We have to look for pending delta for this operation that may already exist. In that case update it
// instead creating a new one.
// We have to re-read the shadow from repository here. We need valid container IDs.
// Also there is some chance that the pending delta might have been created by a different thread.
PrismObject<ShadowType> currentShadow = rereadShadow(oldRepoShadow, parentResult);
ObjectDelta<ShadowType> proposedDelta = oldRepoShadow.createDeleteDelta();
PendingOperationType existingPendingOperation = findExistingPendingOperation(currentShadow, proposedDelta, false);
if (existingPendingOperation != null) {
LOGGER.trace("Found existing pending operation for delete of {}, updating", currentShadow);
updatePendingOperationStatus(ctx, currentShadow, existingPendingOperation, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);
return;
}
}

ObjectDelta<ShadowType> pendingDelta = oldRepoShadow.createDeleteDelta();

addPendingOperationDelta(ctx, oldRepoShadow, pendingDelta, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);
}

private PendingOperationType findExistingPendingOperation(PrismObject<ShadowType> currentShadow, ObjectDelta<ShadowType> proposedDelta, boolean processInProgress) throws SchemaException {
for (PendingOperationType pendingOperation: currentShadow.asObjectable().getPendingOperation()) {
OperationResultStatusType resultStatus = pendingOperation.getResultStatus();
if (!isInProgressOrRequested(resultStatus, processInProgress)) {
continue;
}
ObjectDeltaType deltaType = pendingOperation.getDelta();
if (deltaType == null) {
continue;
}
ObjectDelta<Objectable> delta = DeltaConvertor.createObjectDelta(deltaType, prismContext);
if (!matchPendingDelta(delta, proposedDelta)) {
continue;
}
return pendingOperation;
}
return null;
}

private boolean isInProgressOrRequested(OperationResultStatusType resultStatus, boolean processInProgress) {
if (resultStatus == null) {
return true;
}
if (processInProgress && resultStatus == OperationResultStatusType.IN_PROGRESS) {
return true;
}
return false;
}

public PendingOperationType checkAndRecordPendingModifyOperationBeforeExecution(ProvisioningContext ctx,
PrismObject<ShadowType> repoShadow, Collection<? extends ItemDelta> modifications, Task task, OperationResult parentResult) throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {

ObjectDelta<ShadowType> proposedDelta = createModifyDelta(repoShadow, modifications);
return checkAndRecordPendingOperationBeforeExecution(ctx, repoShadow, proposedDelta, task, parentResult);
}

private ObjectDelta<ShadowType> createModifyDelta(PrismObject<ShadowType> repoShadow, Collection<? extends ItemDelta> modifications) {
ObjectDelta<ShadowType> delta = repoShadow.createModifyDelta();
delta.addModifications(ItemDelta.cloneCollection(modifications));
return delta;
}

// returns conflicting operation (pending delta) if there is any
private PendingOperationType checkAndRecordPendingOperationBeforeExecution(ProvisioningContext ctx,
PrismObject<ShadowType> repoShadow, ObjectDelta<ShadowType> proposedDelta,
Task task, OperationResult parentResult) throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {
ResourceType resource = ctx.getResource();
ResourceConsistencyType consistencyType = resource.getConsistency();
if (consistencyType == null) {
return null;
}


OptimisticLockingRunner<ShadowType,PendingOperationType> runner = new OptimisticLockingRunner.Builder<ShadowType,PendingOperationType>()
.object(shadow)
.object(repoShadow)
.result(parentResult)
.repositoryService(repositoryService)
.maxNumberOfAttempts(10)
Expand All @@ -968,30 +1040,20 @@ public PendingOperationType checkAndRecordPendingDeleteOperationBeforeExecution(
(object) -> {
Boolean avoidDuplicateOperations = consistencyType.isAvoidDuplicateOperations();
if (BooleanUtils.isTrue(avoidDuplicateOperations)) {
for (PendingOperationType pendingOperation: object.asObjectable().getPendingOperation()) {
OperationResultStatusType resultStatus = pendingOperation.getResultStatus();
if (resultStatus != null && resultStatus != OperationResultStatusType.IN_PROGRESS) {
continue;
}
ObjectDeltaType delta = pendingOperation.getDelta();
if (delta == null) {
continue;
}
if (delta.getChangeType() != ChangeTypeType.DELETE) {
continue;
}
LOGGER.debug("Found duplicate operation for delete of {}: {}", object, pendingOperation);
return pendingOperation;
PendingOperationType existingPendingOperation = findExistingPendingOperation(object, proposedDelta, true);
if (existingPendingOperation != null) {
LOGGER.debug("Found duplicate operation for {} of {}: {}", proposedDelta.getChangeType(), object, existingPendingOperation);
return existingPendingOperation;
}
}

if (ResourceTypeUtil.getRecordPendingOperations(resource) != RecordPendingOperationsType.ALL) {
return null;
}

ObjectDelta<ShadowType> pendingDelta = object.createDeleteDelta();

addPendingOperationDelta(ctx, object, pendingDelta, null, null, object.getVersion(), parentResult);
LOGGER.trace("Storing pending operation for {} of {}", proposedDelta.getChangeType(), object);
addPendingOperationDelta(ctx, object, proposedDelta, null, null, object.getVersion(), parentResult);
LOGGER.trace("Stored pending operation for {} of {}", proposedDelta.getChangeType(), object);

// Yes, really return null. We are supposed to return conflicting operation (if found).
// But in this case there is no conflict. This operation does not conflict with itself.
Expand All @@ -1003,46 +1065,13 @@ public PendingOperationType checkAndRecordPendingDeleteOperationBeforeExecution(
// should not happen
throw new SystemException(e);
}

}

private void recordPendingDeleteOperationAfterExecution(ProvisioningContext ctx, PrismObject<ShadowType> oldRepoShadow,
OperationResult resourceOperationResult, OperationResult parentResult) throws SchemaException, ObjectNotFoundException, CommunicationException, ConfigurationException, ExpressionEvaluationException {

if (ResourceTypeUtil.getRecordPendingOperations(ctx.getResource()) == RecordPendingOperationsType.ALL) {
// We have to look for pending delta for this operation that may already exist. In that case update it
// instead creating a new one.
// We have to re-read the shadow from repository here. We need valid container IDs.
// Also there is some chance that the pending delta might have been created by a different thread.
PrismObject<ShadowType> currentShadow = rereadShadow(oldRepoShadow, parentResult);

PendingOperationType existingPendingOperation = null;
for (PendingOperationType pendingOperation: currentShadow.asObjectable().getPendingOperation()) {
OperationResultStatusType resultStatus = pendingOperation.getResultStatus();
if (resultStatus != null) {
continue;
}
ObjectDeltaType delta = pendingOperation.getDelta();
if (delta == null) {
continue;
}
if (delta.getChangeType() != ChangeTypeType.DELETE) {
continue;
}
LOGGER.trace("Found existing pending operation for delete of {}, updating", currentShadow);
existingPendingOperation = pendingOperation;
break;
}

if (existingPendingOperation != null) {
updatePendingOperationStatus(ctx, currentShadow, existingPendingOperation, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);
return;
}
}

ObjectDelta<ShadowType> pendingDelta = oldRepoShadow.createDeleteDelta();

addPendingOperationDelta(ctx, oldRepoShadow, pendingDelta, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);

private boolean matchPendingDelta(ObjectDelta<Objectable> pendingDelta, ObjectDelta<ShadowType> proposedDelta) {
// TODO: is this entirelly reliable?
return pendingDelta.equals(proposedDelta);
}

private PrismObject<ShadowType> rereadShadow(PrismObject<ShadowType> oldRepoShadow, OperationResult parentResult) throws ObjectNotFoundException, SchemaException {
Expand Down Expand Up @@ -1110,8 +1139,13 @@ private void updatePendingOperationStatus(ProvisioningContext ctx, PrismObject<S

if (resourceOperationResult != null) {
ItemPath asyncRefPath = containerPath.subPath(PendingOperationType.F_ASYNCHRONOUS_OPERATION_REFERENCE);
PropertyDelta<String> asyncRefDelta = PropertyDelta.createModificationReplaceProperty(asyncRefPath, shadow.getDefinition(),
PropertyDelta<String> asyncRefDelta;
if (resourceOperationResult.getAsynchronousOperationReference() == null) {
asyncRefDelta = PropertyDelta.createModificationReplaceProperty(asyncRefPath, shadow.getDefinition() /*, no value */);
} else {
asyncRefDelta = PropertyDelta.createModificationReplaceProperty(asyncRefPath, shadow.getDefinition(),
resourceOperationResult.getAsynchronousOperationReference());
}
repoDeltas.add(asyncRefDelta);
}

Expand Down Expand Up @@ -1227,14 +1261,34 @@ private PrismObject<ShadowType> createRepositoryShadow(ProvisioningContext ctx,
return repoShadow;
}

public void modifyShadow(ProvisioningContext ctx, PrismObject<ShadowType> shadow, Collection<? extends ItemDelta> modifications,
public void modifyShadow(ProvisioningContext ctx, PrismObject<ShadowType> oldRepoShadow, Collection<? extends ItemDelta> modifications,
OperationResult resourceOperationResult, OperationResult parentResult)
throws SchemaException, ObjectNotFoundException, ConfigurationException, CommunicationException, ExpressionEvaluationException {
LOGGER.trace("Updating repository shadow, resourceOperationResult={}, {} modifications", resourceOperationResult.getStatus(), modifications.size());
if (resourceOperationResult.isInProgress()) {
addPendingOperationModify(ctx, shadow, modifications, resourceOperationResult, parentResult);

PendingOperationType existingPendingOperation = null;
if (ResourceTypeUtil.getRecordPendingOperations(ctx.getResource()) == RecordPendingOperationsType.ALL) {
// We have to look for pending delta for this operation that may already exist. In that case update it
// instead creating a new one.
// We have to re-read the shadow from repository here. We need valid container IDs.
// Also there is some chance that the pending delta might have been created by a different thread.
PrismObject<ShadowType> currentShadow = rereadShadow(oldRepoShadow, parentResult);
ObjectDelta<ShadowType> proposedDelta = createModifyDelta(currentShadow, modifications);
existingPendingOperation = findExistingPendingOperation(currentShadow, proposedDelta, false);
}

LOGGER.trace("Updating repository shadow, resourceOperationResult={}, {} modifications, existingPendingOperation={}", resourceOperationResult.getStatus(), modifications.size(), existingPendingOperation);
if (resourceOperationResult != null && resourceOperationResult.isInProgress()) {
if (existingPendingOperation == null) {
addPendingOperationModify(ctx, oldRepoShadow, modifications, resourceOperationResult, parentResult);
} else {
updatePendingOperationStatus(ctx, oldRepoShadow, existingPendingOperation, OperationResultStatusType.IN_PROGRESS, resourceOperationResult, parentResult);
}
} else {
modifyShadowAttributes(ctx, shadow, modifications, parentResult);
if (existingPendingOperation == null) {
modifyShadowAttributes(ctx, oldRepoShadow, modifications, parentResult);
} else {
updatePendingOperationStatus(ctx, oldRepoShadow, existingPendingOperation, resourceOperationResult.getStatus().createStatusType(), resourceOperationResult, parentResult);
}
}
}

Expand Down

0 comments on commit c7cfdf6

Please sign in to comment.