Skip to content

Commit

Permalink
multithreaded tasks support for thresholds (recon, livesync)
Browse files Browse the repository at this point in the history
  • Loading branch information
katkav committed Apr 1, 2020
1 parent a221ea2 commit 57aa870
Show file tree
Hide file tree
Showing 22 changed files with 587 additions and 75 deletions.
Expand Up @@ -11,6 +11,15 @@
import java.util.Collection;
import java.util.List;

import com.evolveum.midpoint.gui.api.util.WebComponentUtil;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;

import com.evolveum.midpoint.util.exception.*;

import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

import org.apache.wicket.ajax.AjaxRequestTarget;
import org.apache.wicket.ajax.markup.html.AjaxLink;
import org.apache.wicket.markup.html.basic.Label;
Expand Down Expand Up @@ -38,6 +47,9 @@ public class InternalsCountersPanel extends BasePanel<ListView<InternalCounters>
private static final String ID_COUNTER_COUNT_LABEL = "counterCount";
private static final String ID_RESET_THRESHOLD_COUNTER = "resetThresholdCounter";

private static final String DOT_CLASS = InternalsCountersPanel.class.getName() + ".";
private static final String OPERATION_LOAD_TASK = DOT_CLASS + "loadTaskByIdentifier";


public InternalsCountersPanel(String id) {
super(id);
Expand All @@ -58,7 +70,7 @@ protected void onInitialize() {
@Override
protected void populateItem(ListItem<CounterSpecification> item) {
CounterSpecification counter = item.getModelObject();
Label task = new Label(ID_COUNTER_TASK_LABEL, counter.getTaskName());
Label task = new Label(ID_COUNTER_TASK_LABEL, createLabelModel(counter));
item.add(task);

Label policyRule = new Label(ID_COUNTER_POLICY_RULE_LABEL, counter.getPolicyRuleName());
Expand All @@ -73,7 +85,7 @@ protected void populateItem(ListItem<CounterSpecification> item) {

@Override
public void onClick(AjaxRequestTarget target) {
ConfirmationPanel confirmPanel = new ConfirmationPanel(getPageBase().getMainPopupBodyId(), createStringResource("InternalsCountersPanel.reset.confirm.message", counter.getTaskName(), counter.getPolicyRuleName())) {
ConfirmationPanel confirmPanel = new ConfirmationPanel(getPageBase().getMainPopupBodyId(), createStringResource("InternalsCountersPanel.reset.confirm.message", counter.getOid(), counter.getPolicyRuleName())) {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -118,6 +130,27 @@ public String getObject() {
add(countersTable);
}

private IModel<String> createLabelModel(CounterSpecification counter) {
return () -> {
Task operationTask = getPageBase().createSimpleTask(OPERATION_LOAD_TASK);
OperationResult parentResult = operationTask.getResult();
PrismObject<TaskType> taskPrism = null;
try {
taskPrism = getPageBase().getTaskService().getTaskByIdentifier(counter.getOid(), null, operationTask, parentResult);
} catch (SchemaException | CommunicationException | ExpressionEvaluationException | ObjectNotFoundException | ConfigurationException | SecurityViolationException e) {
parentResult.recordPartialError("Failed to load task with identifier: " + counter.getOid() + ". Reason: " + e.getMessage());
}

parentResult.computeStatusIfUnknown();
getPageBase().showResult(parentResult, false);

if (taskPrism == null) {
return counter.getOid();
}
return WebComponentUtil.getName(taskPrism) + "(" + counter.getOid() + ")";
};
}

private IModel<List<CounterSpecification>> createThresholdCounterModel() {
return new IModel<List<CounterSpecification>>() {
private static final long serialVersionUID = 1L;
Expand Down
Expand Up @@ -7,6 +7,7 @@
package com.evolveum.midpoint.model.impl.lens.projector.focus;

import java.util.Collection;
import java.util.stream.Collectors;

import javax.xml.datatype.XMLGregorianCalendar;

Expand Down Expand Up @@ -197,20 +198,22 @@ public DeltaSetTriple<EvaluatedAssignmentImpl<AH>> processAllAssignments() throw
prismContext, task, result);

LOGGER.trace("Task for process: {}", task.debugDumpLazily());
AssignmentType taskAssignment;
if (task.hasAssignments()) {
taskAssignment = new AssignmentType(prismContext);
ObjectReferenceType targetRef = new ObjectReferenceType();
targetRef.asReferenceValue().setObject(task.getUpdatedOrClonedTaskObject());
taskAssignment.setTargetRef(targetRef);
} else {
taskAssignment = null;
}

LOGGER.trace("Task assignment: {}", taskAssignment);
Collection<Task> allTasksToRoot = task.getPathToRootTask(result);
Collection<AssignmentType> taskAssignments = allTasksToRoot.stream()
.filter(taskPath -> taskPath.hasAssignments())
.map(taskPath -> createTaskAssignment(taskPath))
.collect(Collectors.toList());

LOGGER.trace("Task assignment: {}", taskAssignments);

if (taskAssignments.isEmpty()) {
assignmentCollection.collect(focusContext.getObjectCurrent(), focusContext.getObjectOld(), assignmentDelta, forcedAssignments, null);
}

assignmentCollection.collect(focusContext.getObjectCurrent(), focusContext.getObjectOld(), assignmentDelta,
forcedAssignments, taskAssignment);
for (AssignmentType taskAssignment : taskAssignments) {
assignmentCollection.collect(focusContext.getObjectCurrent(), focusContext.getObjectOld(), assignmentDelta, forcedAssignments, taskAssignment);
}

LOGGER.trace("Assignment collection:\n{}", assignmentCollection.debugDumpLazily(1));

Expand All @@ -229,6 +232,14 @@ public DeltaSetTriple<EvaluatedAssignmentImpl<AH>> processAllAssignments() throw
return evaluatedAssignmentTriple;
}

private AssignmentType createTaskAssignment(Task fromTask) {
AssignmentType taskAssignment = new AssignmentType(prismContext);
ObjectReferenceType targetRef = new ObjectReferenceType();
targetRef.asReferenceValue().setObject(fromTask.getUpdatedOrClonedTaskObject());
taskAssignment.setTargetRef(targetRef);
return taskAssignment;
}

private String getNewObjectLifecycleState(LensFocusContext<AH> focusContext) {
PrismObject<AH> focusNew = focusContext.getObjectNew();
AH focusTypeNew = focusNew.asObjectable();
Expand Down
Expand Up @@ -43,9 +43,10 @@ public <O extends ObjectType> void execute(@NotNull ModelContext<O> context, Tas
return;
}

TaskType taskType = task.getUpdatedOrClonedTaskObject().asObjectable();
String id = task.getTaskTreeId(result);

for (EvaluatedPolicyRule policyRule : focusCtx.getPolicyRules()) {
CounterSpecification counterSpec = counterManager.getCounterSpec(taskType, policyRule.getPolicyRuleIdentifier(), policyRule.getPolicyRule());
CounterSpecification counterSpec = counterManager.getCounterSpec(id, policyRule.getPolicyRuleIdentifier(), policyRule.getPolicyRule());
LOGGER.trace("Found counter specification {} for {}", counterSpec, DebugUtil.debugDumpLazily(policyRule));

int counter = 1;
Expand Down
Expand Up @@ -14,8 +14,11 @@

import com.evolveum.midpoint.model.impl.util.AuditHelper;
import com.evolveum.midpoint.prism.query.AndFilter;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.repo.common.util.RepoCommonUtils;
import com.evolveum.midpoint.schema.cache.CacheConfigurationManager;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.prism.xml.ns._public.query_3.QueryType;
import org.apache.commons.lang.BooleanUtils;
Expand Down Expand Up @@ -61,14 +64,6 @@
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.util.Holder;
import com.evolveum.midpoint.util.QNameUtil;
import com.evolveum.midpoint.util.exception.CommunicationException;
import com.evolveum.midpoint.util.exception.ConfigurationException;
import com.evolveum.midpoint.util.exception.ExpressionEvaluationException;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.exception.SecurityViolationException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.prism.xml.ns._public.types_3.PolyStringType;
Expand Down Expand Up @@ -334,6 +329,15 @@ public TaskWorkBucketProcessingResult run(RunningTask localCoordinatorTask, Work
} catch (ExpressionEvaluationException ex) {
processErrorFinal(runResult, "Expression error", ex, TaskRunResultStatus.PERMANENT_ERROR, resource, localCoordinatorTask, opResult);
return runResult;
} catch (ObjectAlreadyExistsException ex) {
processErrorFinal(runResult, "Object already existis error", ex, TaskRunResultStatus.PERMANENT_ERROR, resource, localCoordinatorTask, opResult);
return runResult;
} catch (PolicyViolationException ex) {
processErrorFinal(runResult, "Policy violation error", ex, TaskRunResultStatus.PERMANENT_ERROR, resource, localCoordinatorTask, opResult);
return runResult;
} catch (PreconditionViolationException ex) {
processErrorFinal(runResult, "Precondition violation error", ex, TaskRunResultStatus.PERMANENT_ERROR, resource, localCoordinatorTask, opResult);
return runResult;
}

AuditEventRecord executionRecord = new AuditEventRecord(AuditEventType.RECONCILIATION, AuditEventStage.EXECUTION);
Expand Down Expand Up @@ -487,7 +491,7 @@ private boolean performResourceReconciliation(PrismObject<ResourceType> resource
ReconciliationTaskResult reconResult, RunningTask localCoordinatorTask,
TaskPartitionDefinitionType partitionDefinition, WorkBucketType workBucket, OperationResult result, boolean intentIsNull)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
SecurityViolationException, ExpressionEvaluationException {
SecurityViolationException, ExpressionEvaluationException, PreconditionViolationException, PolicyViolationException, ObjectAlreadyExistsException {

boolean interrupted;

Expand Down Expand Up @@ -549,6 +553,11 @@ private boolean performResourceReconciliation(PrismObject<ResourceType> resource
opResult.recordFatalError(e);
throw e;
}

if (handler.getExceptionEncountered() != null) {
RepoCommonUtils.throwException(handler.getExceptionEncountered(), opResult);
}

return !interrupted;
}

Expand Down
Expand Up @@ -17,9 +17,9 @@
*/
public interface CounterManager {

CounterSpecification getCounterSpec(TaskType task, String policyRuleId, PolicyRuleType policyRule);
void cleanupCounters(String taskOid);
CounterSpecification getCounterSpec(String taskId, String policyRuleId, PolicyRuleType policyRule);
void cleanupCounters(String oid);
Collection<CounterSpecification> listCounters();
void removeCounter(CounterSpecification counterSpecification);
void resetCounters(String taskOid);
void resetCounters(String oid);
}
Expand Up @@ -10,7 +10,6 @@
import com.evolveum.midpoint.util.DebugDumpable;
import com.evolveum.midpoint.xml.ns._public.common.common_3.PolicyRuleType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.PolicyThresholdType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;

/**
* @author katka
Expand All @@ -21,12 +20,12 @@ public class CounterSpecification implements DebugDumpable {
private int count = 0;
private long counterStart;

private TaskType task;
private String oid;
private PolicyRuleType policyRule;
private String policyRuleId;

public CounterSpecification(TaskType task, String policyRuleId, PolicyRuleType policyRule) {
this.task = task;
public CounterSpecification(String oid, String policyRuleId, PolicyRuleType policyRule) {
this.oid = oid;
this.policyRuleId = policyRuleId;
this.policyRule = policyRule;
}
Expand All @@ -48,16 +47,12 @@ public PolicyThresholdType getPolicyThreshold() {
return policyRule.getPolicyThreshold();
}

public String getTaskName() {
return task.getName().getOrig();
}

public String getPolicyRuleName() {
return policyRule.getName();
}

public String getTaskOid() {
return task.getOid();
public String getOid() {
return oid;
}

public String getPolicyRuleId() {
Expand All @@ -73,7 +68,7 @@ public void reset(long currentTimeMillis) {
@Override
public String debugDump(int indent) {
StringBuilder sb = new StringBuilder();
sb.append("Counter for: ").append(task).append(", policy rule: ").append(policyRule).append("\n");
sb.append("Counter for: ").append(oid).append(", policy rule: ").append(policyRule).append("\n");
sb.append("Current count: ").append(count).append("\n");
sb.append("Counter start: ").append(XmlTypeConverter.createXMLGregorianCalendar(counterStart)).append("\n");

Expand Down
Expand Up @@ -25,7 +25,6 @@
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.PolicyRuleType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.PolicyThresholdType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TimeIntervalType;

/**
Expand All @@ -41,17 +40,17 @@ public class CacheCounterManager implements CounterManager {

private Map<CounterKey, CounterSpecification> countersMap = new ConcurrentHashMap<>();

public synchronized CounterSpecification registerCounter(TaskType task, String policyRuleId, PolicyRuleType policyRule) {
public synchronized CounterSpecification registerCounter(String taskId, String policyRuleId, PolicyRuleType policyRule) {

if (task.getOid() == null) {
if (taskId == null) {
LOGGER.trace("Not persistent task, skipping registering counter.");
return null;
}

CounterKey key = new CounterKey(task.getOid(), policyRuleId);
CounterKey key = new CounterKey(taskId, policyRuleId);
CounterSpecification counterSpec = countersMap.get(key);
if (counterSpec == null) {
return initCleanCounter(key, task, policyRule);
return initCleanCounter(key, taskId, policyRule);
}

if (isResetCounter(counterSpec, false)) {
Expand Down Expand Up @@ -84,7 +83,7 @@ private boolean isResetCounter(CounterSpecification counterSpec, boolean removeI
}

@Override
public void cleanupCounters(String taskOid) {
public synchronized void cleanupCounters(String taskOid) {
Set<CounterKey> keys = countersMap.keySet();

Set<CounterKey> counersToRemove = new HashSet<>();
Expand All @@ -103,7 +102,7 @@ public void cleanupCounters(String taskOid) {
}

@Override
public void resetCounters(String taskOid) {
public synchronized void resetCounters(String taskOid) {
Set<CounterKey> keys = countersMap.keySet();

Set<CounterKey> counersToReset = new HashSet<>();
Expand All @@ -119,8 +118,8 @@ public void resetCounters(String taskOid) {
}
}

private CounterSpecification initCleanCounter(CounterKey key, TaskType task, PolicyRuleType policyRule) {
CounterSpecification counterSpec = new CounterSpecification(task, key.policyRuleId, policyRule);
private CounterSpecification initCleanCounter(CounterKey key, String taskId, PolicyRuleType policyRule) {
CounterSpecification counterSpec = new CounterSpecification(taskId, key.policyRuleId, policyRule);
counterSpec.setCounterStart(clock.currentTimeMillis());
countersMap.put(key, counterSpec);
return counterSpec;
Expand All @@ -133,18 +132,18 @@ private CounterSpecification refreshCounter(CounterKey key, CounterSpecification
}

@Override
public CounterSpecification getCounterSpec(TaskType task, String policyRuleId, PolicyRuleType policyRule) {
if (task.getOid() == null) {
LOGGER.trace("Cannot get counter spec for task without oid");
public CounterSpecification getCounterSpec(String taskId, String policyRuleId, PolicyRuleType policyRule) {
if (taskId == null) {
LOGGER.trace("Cannot get counter spec for task without identifier");
return null;
}

LOGGER.trace("Getting counter spec for {} and {}", task, policyRule);
CounterKey key = new CounterKey(task.getOid(), policyRuleId);
LOGGER.trace("Getting counter spec for {} and {}", taskId, policyRule);
CounterKey key = new CounterKey(taskId, policyRuleId);
CounterSpecification counterSpec = countersMap.get(key);

if (counterSpec == null) {
return registerCounter(task, policyRuleId, policyRule);
return registerCounter(taskId, policyRuleId, policyRule);
}

if (isResetCounter(counterSpec, false)) {
Expand All @@ -162,11 +161,11 @@ public Collection<CounterSpecification> listCounters() {

@Override
public void removeCounter(CounterSpecification counterSpecification) {
CounterKey key = new CounterKey(counterSpecification.getTaskOid(), counterSpecification.getPolicyRuleId());
CounterKey key = new CounterKey(counterSpecification.getOid(), counterSpecification.getPolicyRuleId());
countersMap.remove(key);
}

class CounterKey {
static class CounterKey {

private String oid;
private String policyRuleId;
Expand Down

0 comments on commit 57aa870

Please sign in to comment.