Skip to content

Commit

Permalink
Implement multi-note thresholds
Browse files Browse the repository at this point in the history
1) Generalized the place where rule-related counters are kept.
For reconciliation (as a compound activity) it is the recon activity
itself.
2) Implemented "skip" processing option for activities.
3) Added default action of STOP for policy threshold exceptions.
4) Added simulation-related reconciliation activity children
(skipped by default).
5) Fixed cloning of activity definitions.
6) Improved internal interfaces related to activity counters.
7) Cleaned-up internal structures for LiveSynchronizer.
... plus a lot of minor changes.

(Still a work in progress.)
  • Loading branch information
mederly committed Jul 5, 2021
1 parent bcf7a05 commit da378ec
Show file tree
Hide file tree
Showing 79 changed files with 1,408 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public static CriticalityType getCriticality(ErrorSelectorType selector, Throwab
return defaultIfNull(selector.getNetwork(), defaultValue);
case SECURITY:
return defaultIfNull(selector.getSecurity(), defaultValue);
case POLICY_THRESHOLD:
return defaultIfNull(selector.getPolicyThreshold(), defaultValue);
case POLICY:
return defaultIfNull(selector.getPolicy(), defaultValue);
case SCHEMA:
Expand All @@ -77,6 +79,8 @@ public static ErrorCategoryType getErrorCategory(Throwable exception) {
return ErrorCategoryType.NETWORK;
} else if (exception instanceof SecurityViolationException) {
return ErrorCategoryType.SECURITY;
} else if (exception instanceof ThresholdPolicyViolationException) {
return ErrorCategoryType.POLICY_THRESHOLD;
} else if (exception instanceof PolicyViolationException) {
return ErrorCategoryType.POLICY;
} else if (exception instanceof SchemaException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public static boolean hasLimitations(WorkBucketType bucket) {
} else if (bucket.getContent() instanceof StringPrefixWorkBucketContentType) {
StringPrefixWorkBucketContentType stringPrefix = (StringPrefixWorkBucketContentType) bucket.getContent();
return !stringPrefix.getPrefix().isEmpty();
} else if (bucket.getContent() instanceof StringValueWorkBucketContentType) {
StringValueWorkBucketContentType stringValue = (StringValueWorkBucketContentType) bucket.getContent();
return !stringValue.getValue().isEmpty();
} else if (bucket.getContent() instanceof FilterWorkBucketContentType) {
FilterWorkBucketContentType filtered = (FilterWorkBucketContentType) bucket.getContent();
return !filtered.getFilter().isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public class ModelPublicConstants {
public static final String CLUSTER_REPORT_FILE_FILENAME_PARAMETER = "filename";

public static final String RECONCILIATION_OPERATION_COMPLETION_ID = "operationCompletion";
public static final String RECONCILIATION_RESOURCE_OBJECTS_SIMULATION_ID = "resourceObjectsSimulation";
public static final String RECONCILIATION_RESOURCE_OBJECTS_ID = "resourceObjects";
public static final String RECONCILIATION_REMAINING_SHADOWS_SIMULATION_ID = "remainingShadowsSimulation";
public static final String RECONCILIATION_REMAINING_SHADOWS_ID = "remainingShadows";

public static final ActivityPath RECONCILIATION_OPERATION_COMPLETION_PATH = ActivityPath.fromId(RECONCILIATION_OPERATION_COMPLETION_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.function.Predicate;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.repo.api.Countable;
import com.evolveum.midpoint.util.DebugDumpable;
import com.evolveum.midpoint.util.LocalizableMessage;
import com.evolveum.midpoint.util.TreeNode;
Expand All @@ -24,7 +23,7 @@
* @author semancik
*
*/
public interface EvaluatedPolicyRule extends DebugDumpable, Serializable, Cloneable, Countable {
public interface EvaluatedPolicyRule extends DebugDumpable, Serializable, Cloneable {

@NotNull
Collection<EvaluatedPolicyRuleTrigger<?>> getTriggers();
Expand Down Expand Up @@ -104,4 +103,6 @@ default boolean hasThreshold() {
}

int getCount();

void setCount(int value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,16 +514,12 @@ public String getPolicyRuleIdentifier() {
}

@Override
public String getIdentifier() {
return getPolicyRuleIdentifier();
public int getCount() {
return count;
}

@Override
public void setCount(int value) {
count = value;
}

public int getCount() {
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
import com.evolveum.midpoint.model.impl.lens.projector.ProjectorProcessor;
import com.evolveum.midpoint.model.impl.lens.projector.util.ProcessorExecution;
import com.evolveum.midpoint.model.impl.lens.projector.util.ProcessorMethod;
import com.evolveum.midpoint.repo.common.activity.execution.ActivityExecution;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.ExecutionContext;
import com.evolveum.midpoint.task.api.ExecutionSupport;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
Expand All @@ -27,8 +26,10 @@

import javax.xml.datatype.XMLGregorianCalendar;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.evolveum.midpoint.repo.common.activity.execution.ActivityCountersGroup.POLICY_RULES;
import static com.evolveum.midpoint.task.api.ExecutionSupport.CountersGroup.POLICY_RULES;

/**
* Updates counters for policy rules, with the goal of determining if rules' thresholds have been reached.
Expand All @@ -40,15 +41,15 @@
public class PolicyRuleCounterUpdater implements ProjectorProcessor {

@ProcessorMethod
public <AH extends AssignmentHolderType> void updateCounters(LensContext<AH> context, XMLGregorianCalendar now,
Task task, OperationResult result) throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException {
public <AH extends AssignmentHolderType> void updateCounters(LensContext<AH> context,
@SuppressWarnings("unused") XMLGregorianCalendar now,
Task task, OperationResult result)
throws SchemaException, ObjectNotFoundException, ObjectAlreadyExistsException {

ExecutionContext executionContext = task.getExecutionContext();
if (!(executionContext instanceof ActivityExecution)) {
result.recordNotApplicable();
ExecutionSupport executionSupport = task.getExecutionSupport();
if (executionSupport == null) {
return;
}
ActivityExecution activityExecution = (ActivityExecution) executionContext;

/*
* We update the counters in rules with thresholds in the following ways:
Expand All @@ -71,19 +72,31 @@ public <AH extends AssignmentHolderType> void updateCounters(LensContext<AH> con
if (!rule.hasThreshold()) {
continue;
}
Integer knownCounterValue = focusContext.getPolicyRuleCounter(rule.getPolicyRuleIdentifier());
if (knownCounterValue != null) {
rule.setCount(knownCounterValue);
} else {
if (rule.isTriggered()) {
rulesToIncrement.add(rule);
}
Integer alreadyIncrementedValue = focusContext.getPolicyRuleCounter(rule.getPolicyRuleIdentifier());
if (alreadyIncrementedValue != null) {
rule.setCount(alreadyIncrementedValue);
continue;
}

if (!rule.isTriggered()) {
continue;
}
rulesToIncrement.add(rule);
}
if (!rulesToIncrement.isEmpty()) {
activityExecution.incrementCounters(POLICY_RULES, rulesToIncrement, result);
rulesToIncrement.forEach(
rule -> focusContext.setPolicyRuleCounter(rule.getPolicyRuleIdentifier(), rule.getCount()));

if (rulesToIncrement.isEmpty()) {
return;
}

Map<String, EvaluatedPolicyRule> rulesByIdentifier = rulesToIncrement.stream()
.collect(Collectors.toMap(EvaluatedPolicyRule::getPolicyRuleIdentifier, Function.identity()));

Map<String, Integer> currentValues =
executionSupport.incrementCounters(POLICY_RULES, rulesByIdentifier.keySet(), result);

currentValues.forEach((id, value) -> {
rulesByIdentifier.get(id).setCount(value);
focusContext.setPolicyRuleCounter(id, value);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@

package com.evolveum.midpoint.model.impl.sync.tasks.recon;

import com.evolveum.midpoint.repo.common.activity.state.ActivityState;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;

import com.evolveum.midpoint.xml.ns._public.common.common_3.ReconciliationWorkStateType;

import org.jetbrains.annotations.NotNull;

import com.evolveum.midpoint.model.impl.sync.tasks.SynchronizationObjectsFilterImpl;
Expand Down Expand Up @@ -41,7 +47,7 @@ public class PartialReconciliationActivityExecution<AE extends PartialReconcilia
protected ResourceObjectClassSpecification objectClassSpec;
protected SynchronizationObjectsFilterImpl objectsFilter;

public PartialReconciliationActivityExecution(
PartialReconciliationActivityExecution(
@NotNull ExecutionInstantiationContext<ReconciliationWorkDefinition, ReconciliationActivityHandler> context,
@NotNull String shortNameCapitalized) {
super(context, shortNameCapitalized);
Expand All @@ -64,4 +70,18 @@ protected void initializeExecution(OperationResult opResult) throws CommonExcept
protected @NotNull ResourceObjectSetType getResourceObjectSet() {
return activity.getWorkDefinition().getResourceObjectSetSpecification();
}

@Override
protected ActivityState determineActivityStateForCounters(@NotNull OperationResult result) throws SchemaException, ObjectNotFoundException {
return activityState.getParentActivityState(ReconciliationWorkStateType.COMPLEX_TYPE, result);

/*
return ActivityState.getActivityStateDownwards(
getActivityPath(),
getTaskExecution().getRootTask(),
AbstractActivityWorkStateType.COMPLEX_TYPE,
getBeans(),
result);
*/
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ private void sendReconciliationResult(@NotNull ActivityExecutionResult execution
}
}

@Nullable ResourceReconciliationActivityExecution getResourceReconciliationExecution() {
@Nullable ResourceObjectsReconciliationActivityExecution getResourceReconciliationExecution() {
try {
return (ResourceReconciliationActivityExecution) activity.getChild(RECONCILIATION_RESOURCE_OBJECTS_ID)
return (ResourceObjectsReconciliationActivityExecution) activity.getChild(RECONCILIATION_RESOURCE_OBJECTS_ID)
.getExecution();
} catch (SchemaException e) {
throw new IllegalStateException(e); // Occurs only during children map initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.repo.common.activity.ActivityStateDefinition;
import com.evolveum.midpoint.repo.common.activity.definition.ActivityDefinition;
import com.evolveum.midpoint.repo.common.activity.state.ActivityState;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.util.exception.CommonException;
Expand Down Expand Up @@ -79,9 +80,23 @@ public void unregister() {
(i) -> ModelPublicConstants.RECONCILIATION_OPERATION_COMPLETION_ID,
ActivityStateDefinition.normal(),
parentActivity));
children.add(EmbeddedActivity.create(
createSimulationDefinition(parentActivity.getDefinition()),
(context, result) -> new ResourceObjectsReconciliationActivityExecution(context),
this::beforeResourceObjectsReconciliation, // this is needed even for simulation
(i) -> ModelPublicConstants.RECONCILIATION_RESOURCE_OBJECTS_SIMULATION_ID,
ActivityStateDefinition.normal(),
parentActivity));
children.add(EmbeddedActivity.create(
createSimulationDefinition(parentActivity.getDefinition()),
(context, result) -> new RemainingShadowsActivityExecution(context),
null,
(i) -> ModelPublicConstants.RECONCILIATION_REMAINING_SHADOWS_SIMULATION_ID,
ActivityStateDefinition.normal(),
parentActivity));
children.add(EmbeddedActivity.create(
parentActivity.getDefinition().clone(),
(context, result) -> new ResourceReconciliationActivityExecution(context),
(context, result) -> new ResourceObjectsReconciliationActivityExecution(context),
this::beforeResourceObjectsReconciliation,
(i) -> ModelPublicConstants.RECONCILIATION_RESOURCE_OBJECTS_ID,
ActivityStateDefinition.normal(),
Expand All @@ -96,11 +111,19 @@ public void unregister() {
return children;
}

private ActivityDefinition<ReconciliationWorkDefinition> createSimulationDefinition(
@NotNull ActivityDefinition<ReconciliationWorkDefinition> original) {
ActivityDefinition<ReconciliationWorkDefinition> clone = original.clone();
clone.getWorkDefinition().setExecutionMode(ExecutionModeType.SIMULATE);
clone.getControlFlowDefinition().setSkip();
return clone;
}

private void beforeResourceObjectsReconciliation(
EmbeddedActivity<ReconciliationWorkDefinition, ReconciliationActivityHandler> activity,
RunningTask runningTask, OperationResult result) throws CommonException {
ActivityState<?> reconState =
ActivityState.getActivityState(
ActivityState reconState =
ActivityState.getActivityStateUpwards(
activity.getPath().allExceptLast(),
runningTask,
ReconciliationWorkStateType.COMPLEX_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static ReconciliationResult fromActivityExecution(@NotNull ReconciliationActivit
if (operationCompletionExecution != null) {
result.unOpsCount = operationCompletionExecution.getUnOpsCount();
}
ResourceReconciliationActivityExecution resourceReconciliationExecution = execution.getResourceReconciliationExecution();
ResourceObjectsReconciliationActivityExecution resourceReconciliationExecution = execution.getResourceReconciliationExecution();
if (resourceReconciliationExecution != null) {
result.resourceReconCount = resourceReconciliationExecution.getResourceReconCount();
result.resourceReconErrors = resourceReconciliationExecution.getResourceReconErrors();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskUtil;
import com.evolveum.midpoint.util.QNameUtil;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.Trace;
Expand Down Expand Up @@ -95,7 +94,7 @@ protected ObjectQuery customizeQuery(ObjectQuery configuredQuery, OperationResul

private @NotNull XMLGregorianCalendar getReconciliationStartTimestamp(OperationResult opResult)
throws SchemaException, ObjectNotFoundException {
ActivityState<?> reconState = getActivityState().
ActivityState reconState = getActivityState().
getParentActivityState(ReconciliationWorkStateType.COMPLEX_TYPE, opResult);
XMLGregorianCalendar started =
reconState.getWorkStatePropertyRealValue(F_RESOURCE_OBJECTS_RECONCILIATION_START_TIMESTAMP, XMLGregorianCalendar.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.Collection;
import java.util.function.Function;

import com.evolveum.midpoint.xml.ns._public.common.common_3.ExecutionModeType;

import com.google.common.annotations.VisibleForTesting;
import org.jetbrains.annotations.NotNull;

Expand All @@ -33,14 +35,21 @@
/**
* Execution of resource objects reconciliation (the main part of reconciliation).
*/
public class ResourceReconciliationActivityExecution
extends PartialReconciliationActivityExecution<ResourceReconciliationActivityExecution> {
public class ResourceObjectsReconciliationActivityExecution
extends PartialReconciliationActivityExecution<ResourceObjectsReconciliationActivityExecution> {

private Synchronizer synchronizer;

ResourceReconciliationActivityExecution(
ResourceObjectsReconciliationActivityExecution(
@NotNull ExecutionInstantiationContext<ReconciliationWorkDefinition, ReconciliationActivityHandler> context) {
super(context, "Reconciliation (on resource)");
super(context, "Reconciliation (on resource)" + modeSuffix(context));
}

// TODO generalize
private static String modeSuffix(
ExecutionInstantiationContext<ReconciliationWorkDefinition, ReconciliationActivityHandler> context) {
return context.getActivity().getWorkDefinition().getExecutionMode() == ExecutionModeType.SIMULATE ?
" (simulated)" : "";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class LiveSyncActivityExecution
public static final ThreadLocal<Integer> CHANGE_BEING_PROCESSED = new ThreadLocal<>();

private ResourceObjectClassSpecification objectClassSpecification;
private SynchronizationResult syncResult;

LiveSyncActivityExecution(
@NotNull ExecutionInstantiationContext<LiveSyncWorkDefinition, LiveSyncActivityHandler> context) {
Expand Down Expand Up @@ -83,10 +82,11 @@ protected void initializeExecution(OperationResult opResult) throws ActivityExec

@Override
protected void finishExecution(OperationResult opResult) throws SchemaException {
int itemsProcessed = executionStatistics.getItemsProcessed();
LOGGER.trace("LiveSyncTaskHandler.run stopping (resource {}); changes processed: {}",
objectClassSpecification.resource, syncResult);
objectClassSpecification.resource, itemsProcessed);
opResult.createSubresult(OperationConstants.LIVE_SYNC_STATISTICS)
.recordStatus(OperationResultStatus.SUCCESS, "Changes processed: " + syncResult);
.recordStatus(OperationResultStatus.SUCCESS, "Changes processed: " + itemsProcessed);
}

@Override
Expand All @@ -112,7 +112,7 @@ public void allEventsSubmitted(OperationResult result) {
ActivityTokenStorageImpl tokenStorage = new ActivityTokenStorageImpl(this);

ModelImplUtils.clearRequestee(getRunningTask());
syncResult = getModelBeans().provisioningService
getModelBeans().provisioningService
.synchronize(objectClassSpecification.getCoords(), options, tokenStorage, handler, getRunningTask(), opResult);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,6 @@ protected boolean isLegacy() {
}

private Consumer<PrismObject<TaskType>> workerThreadsCustomizer(int threads) {
return workerThreadsCustomizer(threads, isLegacy());
return rootActivityWorkerThreadsCustomizer(threads, isLegacy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void initSystem(Task initTask, OperationResult initResult) throws Excepti

interruptedSyncResource = DummyInterruptedSyncResource.create(dummyResourceCollection, initTask, initResult);

addObject(getReconciliationTask().file, initTask, initResult, workerThreadsCustomizerNew(getWorkerThreads()));
addObject(getReconciliationTask().file, initTask, initResult, tailoringWorkerThreadsCustomizer(getWorkerThreads()));

assertUsers(getNumberOfUsers());
interruptedSyncResource.createAccounts(USERS, this::getUserName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public void test210PartitionedMultinodeReconciliationWithAllFailuresEnabled() th

when();
var taskBefore = addTask(TASK_RECONCILIATION_PARTITIONED_MULTINODE, result);
waitForTaskTreeNextFinishedRun(taskBefore.asObjectable(), 60000, result);
waitForTaskTreeNextFinishedRun(taskBefore.asObjectable(), 60000, result, true);

then();
assertTaskTree(TASK_RECONCILIATION_PARTITIONED_MULTINODE.oid, "reconciliation task after")
Expand Down

0 comments on commit da378ec

Please sign in to comment.