Skip to content

Commit

Permalink
Stop LiveSync when threshold is reached
Browse files Browse the repository at this point in the history
This is NOT a complete solution: it is only an ugly hack.
See MID-5940.
  • Loading branch information
mederly committed Nov 26, 2019
1 parent 21e5fa2 commit 4aaf38b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 27 deletions.
Expand Up @@ -13,14 +13,10 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.evolveum.midpoint.common.refinery.RefinedResourceSchema;
import com.evolveum.midpoint.common.refinery.RefinedResourceSchemaImpl;
import com.evolveum.midpoint.model.impl.ModelConstants;
import com.evolveum.midpoint.model.impl.sync.SyncTaskHelper.TargetInfo;
import com.evolveum.midpoint.model.impl.util.ModelImplUtils;
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.schema.ResourceShadowDiscriminator;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
Expand All @@ -35,9 +31,6 @@
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.CriticalityType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.LayerType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ResourceType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskPartitionDefinitionType;
/**
* The task handler for a live synchronization.
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.evolveum.midpoint.repo.api.SystemConfigurationChangeListener;
import com.evolveum.midpoint.schema.cache.CacheConfigurationManager;
import com.evolveum.midpoint.schema.cache.CacheType;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.apache.commons.lang.Validate;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -73,15 +74,6 @@
import com.evolveum.midpoint.schema.util.SchemaDebugUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.DebugUtil;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.PolicyViolationException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;

Expand Down Expand Up @@ -319,7 +311,6 @@ public <T extends ObjectType> String addObject(PrismObject<T> object, OperationP
return oid;
}

@SuppressWarnings("rawtypes")
@Override
public int synchronize(ResourceShadowDiscriminator shadowCoordinates, Task task, TaskPartitionDefinitionType taskPartition, OperationResult parentResult) throws ObjectNotFoundException,
CommunicationException, SchemaException, ConfigurationException, SecurityViolationException, ExpressionEvaluationException, PolicyViolationException {
Expand Down Expand Up @@ -370,6 +361,13 @@ public int synchronize(ResourceShadowDiscriminator shadowCoordinates, Task task,
result.recordSuccess();
}
result.cleanupResult();

// This is a brutal hack for thresholds in LiveSync tasks: it propagates ThresholdPolicyViolationException to upper layers.
// FIXME Get rid of this as soon as possible! MID-5940
if (liveSyncResult.getExceptionEncountered() instanceof ThresholdPolicyViolationException) {
throw (ThresholdPolicyViolationException) liveSyncResult.getExceptionEncountered();
}

return liveSyncResult.getChangesProcessed();
}

Expand Down
Expand Up @@ -14,6 +14,7 @@
import com.evolveum.midpoint.provisioning.ucf.api.Change;
import com.evolveum.midpoint.provisioning.ucf.api.ChangeHandler;
import com.evolveum.midpoint.provisioning.ucf.api.GenericFrameworkException;
import com.evolveum.midpoint.repo.common.util.RepoCommonUtils;
import com.evolveum.midpoint.schema.ResourceShadowDiscriminator;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.internals.InternalCounters;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void onSuccess() {
public void onError(OperationResult result) {
LOGGER.error("An error occurred during live synchronization in {}, when processing #{}: {}", task,
sequentialNumber, change);
treatError(sequentialNumber);
treatError(sequentialNumber, RepoCommonUtils.getResultExceptionIfExists(result));
}

/**
Expand All @@ -130,7 +131,7 @@ public void onError(OperationResult result) {
public void onError(Throwable t, OperationResult result) {
LoggingUtils.logUnexpectedException(LOGGER, "An exception occurred during live synchronization in {},"
+ " when processing #{}: {}", t, task, sequentialNumber, change);
treatError(sequentialNumber);
treatError(sequentialNumber, t);
}
};
try {
Expand All @@ -156,7 +157,7 @@ public boolean handleError(@Nullable PrismProperty<?> token, @Nullable Change ch
.logUnexpectedException(LOGGER, "An exception occurred during live synchronization in {}, "
+ "as part of pre-processing #{}: {}", exception, task,
sequentialNumber, change != null ? "change " + change : "sync delta with token " + token);
return treatError(sequentialNumber);
return treatError(sequentialNumber, exception);
}

@Override
Expand All @@ -175,12 +176,13 @@ private boolean treatSuccess(int sequentialNumber) {
return ctx.canRun();
}

private boolean treatError(int sequentialNumber) {
private boolean treatError(int sequentialNumber, Throwable t) {
syncResult.incrementErrors();
if (retryLiveSyncErrors) {
// We need to retry the failed change -- so we must not update the token.
// Moreover, we have to stop here, so that the changes will be applied in correct order.
syncResult.setHaltingErrorEncountered(true);
syncResult.setExceptionEncountered(t);
LOGGER.info("LiveSync encountered an error and 'retryLiveSyncErrors' is set to true: so exiting now with "
+ "the hope that the error will be cleared on the next task run. Task: {}; processed changes: {}",
ctx.getTask(), syncResult.getChangesProcessed());
Expand Down
Expand Up @@ -20,6 +20,7 @@ public class SynchronizationOperationResult {
private AtomicInteger errors = new AtomicInteger(0);
private volatile boolean suspendEncountered;
private volatile boolean haltingErrorEncountered;
private Throwable exceptionEncountered; // FIXME this is a workaround for thresholds
private boolean allChangesFetched;
private PrismProperty<?> initialToken;
private PrismProperty<?> taskTokenUpdatedTo;
Expand Down Expand Up @@ -48,6 +49,14 @@ public void setHaltingErrorEncountered(boolean haltingErrorEncountered) {
this.haltingErrorEncountered = haltingErrorEncountered;
}

public Throwable getExceptionEncountered() {
return exceptionEncountered;
}

public void setExceptionEncountered(Throwable exceptionEncountered) {
this.exceptionEncountered = exceptionEncountered;
}

public boolean isAllChangesFetched() {
return allChangesFetched;
}
Expand Down Expand Up @@ -78,6 +87,7 @@ public String toString() {
", errors=" + errors +
", suspendEncountered=" + suspendEncountered +
", haltingErrorEncountered=" + haltingErrorEncountered +
", exceptionEncountered=" + exceptionEncountered +
", allChangesFetched=" + allChangesFetched +
", initialToken=" + initialToken +
", taskTokenUpdatedTo=" + taskTokenUpdatedTo;
Expand Down
Expand Up @@ -100,7 +100,7 @@ public static Throwable getResultException(OperationResult result) {
*
* TODO think about handled errors here: e.g. should we skip them when looking for exceptions?
*/
private static Throwable getResultExceptionIfExists(OperationResult result) {
public static Throwable getResultExceptionIfExists(OperationResult result) {
if (result.getCause() != null) {
return result.getCause();
}
Expand Down
Expand Up @@ -49,7 +49,7 @@ protected int getProcessedUsers() {
protected void assertSynchronizationStatisticsAfterImport(Task taskAfter) throws Exception {
SynchronizationInformationType syncInfo = taskAfter.getStoredOperationStats().getSynchronizationInformation();

assertSyncToken(taskAfter, 8, taskAfter.getResult());
assertSyncToken(taskAfter, 4, taskAfter.getResult());

assertEquals(syncInfo.getCountUnmatched(), 5);
assertEquals(syncInfo.getCountDeleted(), 0);
Expand All @@ -66,7 +66,7 @@ protected void assertSynchronizationStatisticsAfterImport(Task taskAfter) throws
protected void assertSynchronizationStatisticsActivation(Task taskAfter) {
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnmatched(), 3);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountDeleted(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountLinked(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountLinked(), 8); // this is because LiveSync re-processes changes by default (FIXME)
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnlinked(), 0);
}

Expand All @@ -77,16 +77,16 @@ protected void assertSynchronizationStatisticsActivation(Task taskAfter) {
protected void assertSynchronizationStatisticsAfterSecondImport(Task taskAfter) throws Exception {
SynchronizationInformationType syncInfo = taskAfter.getStoredOperationStats().getSynchronizationInformation();

assertSyncToken(taskAfter, 12, taskAfter.getResult());
assertSyncToken(taskAfter, 4, taskAfter.getResult());

assertEquals(syncInfo.getCountUnmatched(), 5);
assertEquals(syncInfo.getCountDeleted(), 0);
assertEquals(syncInfo.getCountLinked(), 0);
assertEquals(syncInfo.getCountLinked(), 4); // this is because LiveSync re-processes changes by default (FIXME)
assertEquals(syncInfo.getCountUnlinked(), 0);

assertEquals(syncInfo.getCountUnmatchedAfter(), 0);
assertEquals(syncInfo.getCountDeletedAfter(), 0);
assertEquals(syncInfo.getCountLinkedAfter(), getProcessedUsers());
assertEquals(syncInfo.getCountLinkedAfter(), 8); // this is because LiveSync re-processes changes by default (FIXME)
assertEquals(syncInfo.getCountUnlinkedAfter(), 0);
}
}

0 comments on commit 4aaf38b

Please sign in to comment.