Skip to content

Commit

Permalink
Make running tasks thread-safe by synchronization
Browse files Browse the repository at this point in the history
This is to resolve ConcurrentModificationExceptions (e.g. MID-5111,
MID-5113, MID-4088, MID-3954, MID-5131, MID-5135).

Features:
1) Methods of Task implementation are synchronized on various
objects to ensure thread safety
2) RunningTask as a subtype of Task provides even more safety
by cloning returned objects where appropriate

Although the implementation is not 100% safe yet, it should be well
enough for current state of midPoint code base.
  • Loading branch information
mederly committed Feb 14, 2019
1 parent fcff849 commit e28ce2a
Show file tree
Hide file tree
Showing 106 changed files with 2,531 additions and 2,427 deletions.
Expand Up @@ -84,7 +84,7 @@ protected String deleteObjectsAsync(QName type, ObjectQuery objectQuery, boolean
task.setExtensionProperty(rawProp);

task.setName(taskName);
task.savePendingModifications(result);
task.flushPendingModifications(result);

TaskManager taskManager = getTaskManager();
taskManager.switchToBackground(task, result);
Expand Down
Expand Up @@ -117,7 +117,10 @@ private static boolean isMatchingType(Class<?> expectedClass, Class<?> actualCla
return false;
}

public static XMLGregorianCalendar createXMLGregorianCalendar(long timeInMillis) {
public static XMLGregorianCalendar createXMLGregorianCalendar(Long timeInMillis) {
if (timeInMillis == null) {
return null;
}
GregorianCalendar gregorianCalendar = new GregorianCalendar();
gregorianCalendar.setTimeInMillis(timeInMillis);
return createXMLGregorianCalendar(gregorianCalendar);
Expand Down
Expand Up @@ -42,12 +42,6 @@
*/
public interface StatisticsCollector {

/**
* Gets information from the current task and its transient subtasks (aka worker threads).
*/

OperationStatsType getAggregatedLiveOperationStats();

/**
* Records various kinds of operational information.
*/
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.evolveum.midpoint.schema.util.CertCampaignTypeUtil;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.schema.util.WfContextUtil;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.Trace;
Expand Down Expand Up @@ -61,7 +62,7 @@ private void initialize() {
}

@Override
public <O extends ObjectType> void handle(PrismObject<O> object, TriggerType trigger, Task triggerScannerTask, OperationResult parentResult) {
public <O extends ObjectType> void handle(PrismObject<O> object, TriggerType trigger, RunningTask triggerScannerTask, OperationResult parentResult) {
if (!(object.asObjectable() instanceof AccessCertificationCampaignType)) {
throw new IllegalArgumentException("Unexpected object type: should be AccessCertificationCampaignType: " + object);
}
Expand Down
Expand Up @@ -61,7 +61,7 @@ public StatisticsCollectionStrategy getStatisticsCollectionStrategy() {
}

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {
LOGGER.trace("Task run starting");

OperationResult opResult = new OperationResult(CLASS_DOT+"run");
Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.CertCampaignTypeUtil;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.util.exception.CommonException;
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.LoggingUtils;
Expand Down Expand Up @@ -56,7 +56,7 @@ private void initialize() {
}

@Override
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, Task task, OperationResult result) {
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, RunningTask task, OperationResult result) {
try {
ObjectType object = prismObject.asObjectable();
if (!(object instanceof AccessCertificationCampaignType)) {
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.CertCampaignTypeUtil;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
Expand Down Expand Up @@ -60,7 +60,7 @@ private void initialize() {
}

@Override
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, Task task, OperationResult result) {
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, RunningTask task, OperationResult result) {
try {
ObjectType object = prismObject.asObjectable();
if (!(object instanceof AccessCertificationCampaignType)) {
Expand Down
Expand Up @@ -22,7 +22,7 @@
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.CertCampaignTypeUtil;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
Expand Down Expand Up @@ -57,7 +57,7 @@ private void initialize() {
}

@Override
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, Task task, OperationResult result) {
public <O extends ObjectType> void handle(PrismObject<O> prismObject, TriggerType trigger, RunningTask task, OperationResult result) {
try {
ObjectType object = prismObject.asObjectable();
if (!(object instanceof AccessCertificationCampaignType)) {
Expand Down
Expand Up @@ -90,7 +90,7 @@ public StatisticsCollectionStrategy getStatisticsCollectionStrategy() {
}

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {
LOGGER.info("Task run starting");

OperationResult opResult = new OperationResult(CLASS_DOT+"run");
Expand Down Expand Up @@ -285,11 +285,11 @@ private static class ObjectContext {
}

private static class RunContext {
final Task task;
final RunningTask task;
final Map<String, ObjectContext> objectContextMap = new HashMap<>();
final PerformerCommentsFormatter commentsFormatter;

RunContext(Task task, PerformerCommentsFormatter commentsFormatter) {
RunContext(RunningTask task, PerformerCommentsFormatter commentsFormatter) {
this.task = task;
this.commentsFormatter = commentsFormatter;
}
Expand Down
Expand Up @@ -83,7 +83,7 @@ public StatisticsCollectionStrategy getStatisticsCollectionStrategy() {
}

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {
LOGGER.trace("Task run starting");

OperationResult opResult = new OperationResult(CLASS_DOT+"run");
Expand Down
Expand Up @@ -18,6 +18,7 @@
import javax.xml.namespace.QName;

import com.evolveum.midpoint.model.api.ModelPublicConstants;
import com.evolveum.midpoint.prism.path.ItemName;
import com.evolveum.midpoint.schema.constants.SchemaConstants;

/**
Expand All @@ -35,13 +36,13 @@ public class ModelConstants {

public static final String NS_EXTENSION = SchemaConstants.NS_MODEL + "/extension-3";

public static final QName OBJECTCLASS_PROPERTY_NAME = new QName(ModelConstants.NS_EXTENSION, "objectclass");
public static final ItemName OBJECTCLASS_PROPERTY_NAME = new ItemName(ModelConstants.NS_EXTENSION, "objectclass");

public static final QName KIND_PROPERTY_NAME = new QName(ModelConstants.NS_EXTENSION, "kind");
public static final ItemName KIND_PROPERTY_NAME = new ItemName(ModelConstants.NS_EXTENSION, "kind");

public static final QName INTENT_PROPERTY_NAME = new QName(ModelConstants.NS_EXTENSION, "intent");
public static final ItemName INTENT_PROPERTY_NAME = new ItemName(ModelConstants.NS_EXTENSION, "intent");

public static final QName FILENAME_PROPERTY_NAME = new QName(ModelConstants.NS_EXTENSION, "filename");
public static final ItemName FILENAME_PROPERTY_NAME = new ItemName(ModelConstants.NS_EXTENSION, "filename");


}
Expand Up @@ -72,7 +72,7 @@ public StatisticsCollectionStrategy getStatisticsCollectionStrategy() {
}

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {
LOGGER.trace("CleanUpTaskHandler.run starting");

OperationResult opResult = new OperationResult(OperationConstants.CLEANUP);
Expand Down
Expand Up @@ -17,6 +17,7 @@

import javax.annotation.PostConstruct;

import com.evolveum.midpoint.task.api.RunningTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -89,19 +90,19 @@ protected ObjectQuery createQuery(AbstractScannerResultHandler<ShadowType> handl
}

@Override
protected void finish(AbstractScannerResultHandler<ShadowType> handler, TaskRunResult runResult, Task task, OperationResult opResult)
protected void finish(AbstractScannerResultHandler<ShadowType> handler, TaskRunResult runResult, RunningTask task, OperationResult opResult)
throws SchemaException {
super.finish(handler, runResult, task, opResult);
}

@Override
protected AbstractScannerResultHandler<ShadowType> createHandler(TaskRunResult runResult, final Task coordinatorTask,
protected AbstractScannerResultHandler<ShadowType> createHandler(TaskRunResult runResult, final RunningTask coordinatorTask,
OperationResult opResult) {

AbstractScannerResultHandler<ShadowType> handler = new AbstractScannerResultHandler<ShadowType>(
coordinatorTask, ShadowRefreshTaskHandler.class.getName(), "shadowRefresh", "shadow refresh task", taskManager) {
@Override
protected boolean handleObject(PrismObject<ShadowType> object, Task workerTask, OperationResult result) throws CommonException {
protected boolean handleObject(PrismObject<ShadowType> object, RunningTask workerTask, OperationResult result) throws CommonException {
LOGGER.trace("Refreshing {}", object);

provisioningService.refreshShadow(object, null, workerTask, result);
Expand Down
Expand Up @@ -57,7 +57,7 @@ public class ExecuteDeltasTaskHandler implements TaskHandler {
@Autowired private ModelService modelService;

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {

OperationResult result = task.getResult().createSubresult(DOT_CLASS + "run");
TaskRunResult runResult = new TaskRunResult();
Expand Down
Expand Up @@ -25,11 +25,7 @@
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.repo.api.PreconditionViolationException;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskCategory;
import com.evolveum.midpoint.task.api.TaskHandler;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskRunResult;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.util.exception.*;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
Expand Down Expand Up @@ -67,7 +63,7 @@ public class ModelOperationTaskHandler implements TaskHandler {
@Autowired private Clockwork clockwork;

@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {

OperationResult result = task.getResult().createSubresult(DOT_CLASS + "run");
TaskRunResult runResult = new TaskRunResult();
Expand Down Expand Up @@ -116,7 +112,7 @@ public TaskRunResult run(Task task) {
clockwork.run(context, task, result);

task.setModelOperationContext(context.toLensContextType(context.getState() == ModelState.FINAL));
task.savePendingModifications(result);
task.flushPendingModifications(result);

if (result.isUnknown()) {
result.computeStatus();
Expand Down
Expand Up @@ -23,7 +23,9 @@
import com.evolveum.midpoint.common.refinery.RefinedResourceSchemaImpl;
import com.evolveum.midpoint.prism.*;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -37,19 +39,13 @@
import com.evolveum.midpoint.provisioning.api.ChangeNotificationDispatcher;
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.provisioning.api.ResourceObjectChangeListener;
import com.evolveum.midpoint.repo.common.CounterManager;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.processor.ObjectClassComplexTypeDefinition;
import com.evolveum.midpoint.schema.result.OperationConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.ObjectQueryUtil;
import com.evolveum.midpoint.schema.util.ObjectTypeUtil;
import com.evolveum.midpoint.schema.util.ShadowUtil;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskCategory;
import com.evolveum.midpoint.task.api.TaskHandler;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskRunResult;
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.util.DOMUtil;
import com.evolveum.midpoint.util.exception.CommunicationException;
Expand Down Expand Up @@ -147,7 +143,7 @@ public void launch(ResourceType resource, QName objectclass, Task task, Operatio
PrismProperty<QName> objectclassProp = objectclassPropertyDefinition.instantiate();
objectclassProp.setRealValue(objectclass);
task.setExtensionProperty(objectclassProp);
task.savePendingModifications(result); // just to be sure (if the task was already persistent)
task.flushPendingModifications(result); // just to be sure (if the task was already persistent)
// task.modify(modifications, result);
} catch (ObjectNotFoundException e) {
LOGGER.error("Task object not found, expecting it to exist (task {})", task, e);
Expand All @@ -174,26 +170,33 @@ public void launch(ResourceType resource, QName objectclass, Task task, Operatio
}

@Override
protected SynchronizeAccountResultHandler createHandler(TaskRunResult runResult, Task coordinatorTask,
protected SynchronizeAccountResultHandler createHandler(TaskRunResult runResult, RunningTask coordinatorTask,
OperationResult opResult) {

ResourceType resource = resolveObjectRef(ResourceType.class, runResult, coordinatorTask, opResult);
if (resource == null) {
return null;
}

return createHandler(resource, null, runResult, coordinatorTask, opResult);
ObjectClassComplexTypeDefinition objectClass = determineObjectClassDefinition(resource, null, runResult, coordinatorTask, opResult);
if (objectClass == null) {
return null;
}
return createHandler(resource, objectClass, coordinatorTask);
}

// shadowToImport - it is used to derive objectClass/intent/kind when importing a single shadow
private SynchronizeAccountResultHandler createHandler(ResourceType resource, PrismObject<ShadowType> shadowToImport,
TaskRunResult runResult, Task coordinatorTask, OperationResult opResult) {

ObjectClassComplexTypeDefinition objectClass = determineObjectClassDefinition(resource, shadowToImport, runResult, coordinatorTask, opResult);
private SynchronizeAccountResultHandler createHandlerForSingleShadow(@NotNull ResourceType resource, @NotNull PrismObject<ShadowType> shadowToImport,
TaskRunResult runResult, RunningTask task, OperationResult opResult) {
ObjectClassComplexTypeDefinition objectClass = determineObjectClassDefinition(resource, shadowToImport, runResult, task, opResult);
if (objectClass == null) {
return null;
}
return createHandler(resource, objectClass, task);
}

private SynchronizeAccountResultHandler createHandler(@NotNull ResourceType resource, @NotNull ObjectClassComplexTypeDefinition objectClass,
RunningTask coordinatorTask) {
LOGGER.info("Start executing import from resource {}, importing object class {}", resource, objectClass.getTypeName());

SynchronizeAccountResultHandler handler = new SynchronizeAccountResultHandler(resource, objectClass, "import",
Expand Down Expand Up @@ -299,7 +302,8 @@ public boolean importSingleShadow(String shadowOid, Task task, OperationResult p

// Create a result handler just for one object. Invoke the handle() method manually.
TaskRunResult runResult = new TaskRunResult();
SynchronizeAccountResultHandler resultHandler = createHandler(resource.asObjectable(), shadow, runResult, task, parentResult);
RunningTask fakeRunningTask = taskManager.createFakeRunningTask(task);
SynchronizeAccountResultHandler resultHandler = createHandlerForSingleShadow(resource.asObjectable(), shadow, runResult, fakeRunningTask, parentResult);
if (resultHandler == null) {
return false;
}
Expand All @@ -316,7 +320,7 @@ public boolean importSingleShadow(String shadowOid, Task task, OperationResult p
return false;
}

finish(resultHandler, runResult, task, parentResult);
finish(resultHandler, runResult, fakeRunningTask, parentResult);

return true;
}
Expand Down
Expand Up @@ -40,7 +40,6 @@
import javax.annotation.PostConstruct;

import java.io.File;
import java.util.List;

/**
* Task handler for "Import objects from file" task.
Expand Down Expand Up @@ -121,7 +120,7 @@ public void launch(File input, Task task, OperationResult parentResult) {
PrismProperty filenameProp = filenamePropertyDefinition.instantiate();
filenameProp.setRealValue(input.getAbsolutePath());
task.setExtensionProperty(filenameProp);
task.savePendingModifications(result);
task.flushPendingModifications(result);
// task.modify(modifications, result);
} catch (ObjectNotFoundException e) {
LOGGER.error("Task object not found, expecting it to exist (task {})", task, e);
Expand Down Expand Up @@ -150,7 +149,7 @@ public void launch(File input, Task task, OperationResult parentResult) {
* The body of the task. This will start the import "loop".
*/
@Override
public TaskRunResult run(Task task) {
public TaskRunResult run(RunningTask task) {

LOGGER.debug("Import objects from file run (task {})", task);

Expand Down

0 comments on commit e28ce2a

Please sign in to comment.