Skip to content

Commit

Permalink
Add partitioned focus validity scanner task
Browse files Browse the repository at this point in the history
Repository queries issued by traditional focus validity scanner task
cannot be optimized by (some) DBMSes. So this commit implements
partitioned validity scanner that has a separate subtasks for
focus and assignment validity checking.

Introduced durable partitions i.e. subtasks that keep their
own state across executions of the master task. Also changed
handler URI for traditional focus validity scanner (original one
is kept as deprecated).
  • Loading branch information
mederly committed Mar 27, 2018
1 parent 6564c67 commit 2725029
Show file tree
Hide file tree
Showing 34 changed files with 754 additions and 178 deletions.
Expand Up @@ -3258,10 +3258,19 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="sequentialExecution" type="xsd:boolean" minOccurs="0">
<xsd:element name="sequentialExecution" type="xsd:boolean" minOccurs="0" default="true">
<xsd:annotation>
<xsd:documentation>
Whether the subtasks should be executed sequentially. The default is true.
Whether the subtasks should be executed sequentially.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="durablePartitions" type="xsd:boolean" minOccurs="0" default="false">
<xsd:annotation>
<xsd:documentation>
Whether the partitions should be durable i.e. whether they should persist through master task restarts.
This is useful e.g. for partitioned validity scanner because each partition keeps its own last
scan timestamp. (EXPERIMENTAL)
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand Down Expand Up @@ -5126,7 +5135,7 @@
of historical data, audit records, etc. It may also be used to "blocks" the user or account
identifier to avoid their reuse.
</p><p>
Usualy used for retired employees and similar cases.
Usually used for retired employees and similar cases.
</p>
</xsd:documentation>
<xsd:appinfo>
Expand Down
Expand Up @@ -34,7 +34,11 @@ public class ModelPublicConstants {
public static final String CLEANUP_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/cleanup/handler-3";
public static final String SHADOW_INTEGRITY_CHECK_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/shadow-integrity-check/handler-3";
public static final String OBJECT_INTEGRITY_CHECK_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/object-integrity-check/handler-3";
public static final String FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI = NS_SYNCHRONIZATION_TASK_PREFIX + "/focus-validation-scanner/handler-3"; // TODO why synchronization?
public static final String DEPRECATED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI = NS_SYNCHRONIZATION_TASK_PREFIX + "/focus-validation-scanner/handler-3";
public static final String FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/focus-validity-scanner/handler-3";
public static final String PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/partitioned-focus-validity-scanner/handler-3";
public static final String PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_1 = PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI + "#1";
public static final String PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_2 = PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI + "#2";
public static final String TRIGGER_SCANNER_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/trigger/scanner/handler-3";
public static final String SHADOW_REFRESH_TASK_HANDLER_URI = SchemaConstants.NS_MODEL + "/shadowRefresh/handler-3";
public static final String RECONCILIATION_TASK_HANDLER_URI = NS_SYNCHRONIZATION_TASK_PREFIX + "/reconciliation/handler-3";
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package com.evolveum.midpoint.model.impl.importer;

import java.util.List;
import java.util.function.Function;

import javax.annotation.PostConstruct;
Expand Down
Expand Up @@ -36,7 +36,6 @@

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.List;

/**
* Task handler for "Object integrity check" task.
Expand Down Expand Up @@ -92,7 +91,8 @@ protected ObjectQuery createQuery(ObjectIntegrityCheckResultHandler handler, Tas
}

@Override
protected Collection<SelectorOptions<GetOperationOptions>> createSearchOptions(ObjectIntegrityCheckResultHandler resultHandler,
protected Collection<SelectorOptions<GetOperationOptions>> createSearchOptions(
ObjectIntegrityCheckResultHandler resultHandler,
TaskRunResult runResult, Task coordinatorTask, OperationResult opResult) {
return SelectorOptions.createCollection(GetOperationOptions.createAttachDiagData());
}
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
* Task handler for "Shadow integrity check" task.
Expand Down
Expand Up @@ -16,7 +16,6 @@
package com.evolveum.midpoint.model.impl.sync;

import java.util.Collections;
import java.util.List;

import javax.annotation.PostConstruct;

Expand Down
Expand Up @@ -25,11 +25,13 @@
import com.evolveum.midpoint.model.impl.lens.LensContext;
import com.evolveum.midpoint.model.impl.util.AbstractScannerResultHandler;
import com.evolveum.midpoint.model.impl.util.AbstractScannerTaskHandler;
import com.evolveum.midpoint.prism.Containerable;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.query.ObjectFilter;
import com.evolveum.midpoint.prism.query.ObjectQuery;
import com.evolveum.midpoint.prism.query.builder.QueryBuilder;
import com.evolveum.midpoint.prism.query.builder.S_AtomicFilterExit;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.common.expression.ExpressionFactory;
import com.evolveum.midpoint.schema.result.OperationConstants;
Expand Down Expand Up @@ -81,8 +83,6 @@ public class FocusValidityScannerTaskHandler extends AbstractScannerTaskHandler<
// Therefore it must not have task-specific fields. It can only contain fields specific to
// all tasks of a specified type

public static final String HANDLER_URI = ModelPublicConstants.FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI;

@Autowired private ExpressionFactory expressionFactory;
@Autowired private ContextFactory contextFactory;
@Autowired private Clockwork clockwork;
Expand Down Expand Up @@ -118,14 +118,36 @@ public FocusValidityScannerTaskHandler() {

@PostConstruct
private void initialize() {
taskManager.registerHandler(HANDLER_URI, this);
taskManager.registerHandler(ModelPublicConstants.FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI, this);
taskManager.registerHandler(ModelPublicConstants.PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_1, this);
taskManager.registerHandler(ModelPublicConstants.PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_2, this);
taskManager.registerHandler(ModelPublicConstants.DEPRECATED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI, this);
}

@Override
protected Class<? extends ObjectType> getType(Task task) {
return getTypeFromTask(task, UserType.class);
}

private Integer getPartition(Task task) {
String handlerUri = task.getHandlerUri();
if (ModelPublicConstants.PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_1.equals(handlerUri)) {
return 1;
} else if (ModelPublicConstants.PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI_2.equals(handlerUri)) {
return 2;
} else {
return null;
}
}

private boolean checkFocusValidity(Integer partition) {
return partition == null || partition == 1;
}

private boolean checkAssignmentValidity(Integer partition) {
return partition == null || partition == 2;
}

@Override
protected ObjectQuery createQuery(AbstractScannerResultHandler<FocusType> handler, TaskRunResult runResult, Task coordinatorTask, OperationResult opResult) throws SchemaException {
initProcessedOids(coordinatorTask);
Expand All @@ -134,9 +156,10 @@ protected ObjectQuery createQuery(AbstractScannerResultHandler<FocusType> handle

Duration activateOn = getActivateOn(validityConstraintType);

Integer partition = getPartition(coordinatorTask);

ObjectQuery query = new ObjectQuery();
ObjectFilter filter;
// PrismObjectDefinition<FocusType> focusObjectDef = prismContext.getSchemaRegistry().findObjectDefinitionByCompileTimeClass(FocusType.class);

XMLGregorianCalendar lastScanTimestamp = handler.getLastScanTimestamp();
XMLGregorianCalendar thisScanTimestamp = handler.getThisScanTimestamp();
Expand All @@ -151,10 +174,8 @@ protected ObjectQuery createQuery(AbstractScannerResultHandler<FocusType> handle
lastScanTimestamp.add(activateOn.negate());
}
filter = createFilterFor(getType(coordinatorTask), path, lastScanTimestamp, thisScanTimestamp);

} else {

filter = createBasicFilter(lastScanTimestamp, thisScanTimestamp);
filter = createBasicFilter(lastScanTimestamp, thisScanTimestamp, partition);
}

query.setFilter(filter);
Expand All @@ -170,47 +191,54 @@ private Duration getActivateOn(TimeValidityPolicyConstraintType validityConstrai
}
}

private ObjectFilter createBasicFilter(XMLGregorianCalendar lastScanTimestamp, XMLGregorianCalendar thisScanTimestamp){
private ObjectFilter createBasicFilter(XMLGregorianCalendar lastScanTimestamp, XMLGregorianCalendar thisScanTimestamp,
Integer partition) {
S_AtomicFilterExit i = QueryBuilder.queryFor(FocusType.class, prismContext).none();
if (lastScanTimestamp == null) {
return QueryBuilder.queryFor(FocusType.class, prismContext)
.item(F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp)
.or().exists(F_ASSIGNMENT)
if (checkFocusValidity(partition)) {
i = i.or().item(F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp);
}
if (checkAssignmentValidity(partition)) {
i = i.or().exists(F_ASSIGNMENT)
.block()
.item(AssignmentType.F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(AssignmentType.F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp)
.endBlock()
.buildFilter();
}
.endBlock();
}
} else {
if (checkFocusValidity(partition)) {
i = i.or().item(F_ACTIVATION, F_VALID_FROM).gt(lastScanTimestamp)
.and().item(F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(F_ACTIVATION, F_VALID_TO).gt(lastScanTimestamp)
.and().item(F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp);

return QueryBuilder.queryFor(FocusType.class, prismContext)
.item(F_ACTIVATION, F_VALID_FROM).gt(lastScanTimestamp)
.and().item(F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(F_ACTIVATION, F_VALID_TO).gt(lastScanTimestamp)
.and().item(F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp)
.or().exists(F_ASSIGNMENT)
}
if (checkAssignmentValidity(partition)) {
i = i.or().exists(F_ASSIGNMENT)
.block()
.item(AssignmentType.F_ACTIVATION, F_VALID_FROM).gt(lastScanTimestamp)
.and().item(AssignmentType.F_ACTIVATION, F_VALID_FROM).le(thisScanTimestamp)
.or().item(AssignmentType.F_ACTIVATION, F_VALID_TO).gt(lastScanTimestamp)
.and().item(AssignmentType.F_ACTIVATION, F_VALID_TO).le(thisScanTimestamp)
.endBlock()
.buildFilter();

.endBlock();
}
}
return i.buildFilter();
}

private ObjectFilter createFilterFor(Class type, ItemPath path, XMLGregorianCalendar lastScanTimestamp, XMLGregorianCalendar thisScanTimestamp){
private ObjectFilter createFilterFor(Class<? extends Containerable> type, ItemPath path, XMLGregorianCalendar lastScanTimestamp,
XMLGregorianCalendar thisScanTimestamp) {
if (lastScanTimestamp == null) {
return QueryBuilder.queryFor(type, prismContext)
.item(path).le(thisScanTimestamp)
.buildFilter();
}

return QueryBuilder.queryFor(type, prismContext)
} else {
return QueryBuilder.queryFor(type, prismContext)
.item(path).gt(lastScanTimestamp)
.and().item(path).le(thisScanTimestamp)
.buildFilter();

}
}

@Override
Expand Down Expand Up @@ -255,10 +283,10 @@ private void reconcileFocus(PrismObject<FocusType> focus, Task workerTask, Opera
// We want reconcile option here. There may be accounts that are in wrong activation state.
// We will not notice that unless we go with reconcile.
LensContext<FocusType> lensContext = contextFactory.createRecomputeContext(focus, ModelExecuteOptions.createReconcile(), workerTask, result);
if (hasNotifyAction(workerTask)) {
TimeValidityPolicyConstraintType constraint = getValidityPolicyConstraint(workerTask);
if (hasNotifyAction(workerTask) && constraint != null) {
EvaluatedPolicyRuleImpl policyRule = new EvaluatedPolicyRuleImpl(workerTask.getPolicyRule(), null, prismContext);
policyRule.computeEnabledActions(null, focus, expressionFactory, prismContext, workerTask, result);
TimeValidityPolicyConstraintType constraint = getValidityPolicyConstraint(workerTask);
EvaluatedPolicyRuleTrigger<TimeValidityPolicyConstraintType> evaluatedTrigger = new EvaluatedTimeValidityTrigger(
Boolean.TRUE.equals(constraint.isAssignment()) ? PolicyConstraintKindType.ASSIGNMENT_TIME_VALIDITY : PolicyConstraintKindType.OBJECT_TIME_VALIDITY,
constraint,
Expand All @@ -283,13 +311,11 @@ private TimeValidityPolicyConstraintType getValidityPolicyConstraint(Task coordi
return null;
}

List<TimeValidityPolicyConstraintType> timeValidityContstraints = policyRule.getPolicyConstraints().getObjectTimeValidity();
if (CollectionUtils.isEmpty(timeValidityContstraints)){
List<TimeValidityPolicyConstraintType> timeValidityConstraints = policyRule.getPolicyConstraints().getObjectTimeValidity();
if (CollectionUtils.isEmpty(timeValidityConstraints)) {
return null;
}

return timeValidityContstraints.iterator().next();

return timeValidityConstraints.iterator().next();
}

private List<NotificationPolicyActionType> getNotificationActions(Task coordinatorTask){
Expand All @@ -310,7 +336,8 @@ private boolean hasNotifyAction(Task coordinatorTask) {
return !getNotificationActions(coordinatorTask).isEmpty();
}

private boolean isTimeValidityConstraint(Task coordinatorTask){
@SuppressWarnings("unused")
private boolean isTimeValidityConstraint(Task coordinatorTask) {
return getValidityPolicyConstraint(coordinatorTask) != null;
}

Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2010-2018 Evolveum
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.evolveum.midpoint.model.impl.sync;

import com.evolveum.midpoint.model.api.ModelPublicConstants;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.task.api.StaticTaskPartitionsDefinition;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskPartitionsDefinition;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskPartitionsDefinitionType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
* @author mederly
*/
@Component
public class PartitionedFocusValidityScannerTaskHandlerCreator {

@Autowired private TaskManager taskManager;
@Autowired private PrismContext prismContext;

@PostConstruct
private void initialize() {
taskManager.createAndRegisterPartitioningTaskHandler(ModelPublicConstants.PARTITIONED_FOCUS_VALIDITY_SCANNER_TASK_HANDLER_URI,
this::createPartitionsDefinition);
}

private TaskPartitionsDefinition createPartitionsDefinition(Task masterTask) {
TaskPartitionsDefinitionType definitionInTask = masterTask.getWorkManagement() != null ?
masterTask.getWorkManagement().getPartitions() : null;
TaskPartitionsDefinitionType partitionsDefinition = definitionInTask != null ?
definitionInTask.clone() : new TaskPartitionsDefinitionType();
partitionsDefinition.setCount(2);
partitionsDefinition.setDurablePartitions(true);
partitionsDefinition.setCopyMasterExtension(true);
return new StaticTaskPartitionsDefinition(partitionsDefinition,
prismContext.getSchemaRegistry().findObjectDefinitionByCompileTimeClass(TaskType.class));
}
}
Expand Up @@ -15,8 +15,6 @@
*/
package com.evolveum.midpoint.model.impl.sync;

import java.util.List;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Expand Up @@ -17,7 +17,6 @@
package com.evolveum.midpoint.model.impl.util;

import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.repo.api.RepoModifyOptions;
import com.evolveum.midpoint.repo.api.RepositoryService;
import com.evolveum.midpoint.repo.common.task.AbstractSearchIterativeResultHandler;
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
* Task handler for "reindex" task.
Expand Down
Expand Up @@ -461,6 +461,7 @@ public class AbstractConfiguredModelIntegrationTest extends AbstractModelIntegra
protected static final String TASK_LIVE_SYNC_DUMMY_GREEN_OID = "10000000-0000-0000-5555-555500000404";

protected static final String TASK_VALIDITY_SCANNER_FILENAME = COMMON_DIR + "/task-validity-scanner.xml";
protected static final String TASK_PARTITIONED_VALIDITY_SCANNER_FILENAME = COMMON_DIR + "/task-partitioned-validity-scanner.xml";
protected static final String TASK_VALIDITY_SCANNER_OID = "10000000-0000-0000-5555-555505060400";

protected static final File TASK_TRIGGER_SCANNER_FILE = new File(COMMON_DIR, "task-trigger-scanner.xml");
Expand Down

0 comments on commit 2725029

Please sign in to comment.