Skip to content

Commit

Permalink
Fix a couple of task thread-safety issues
Browse files Browse the repository at this point in the history
1) OperationResult is not a thread-safe structure, yet it was
exposed to multiple threads - mainly in getUpdatedOrClonedTaskObject
method.

2) Even backing task prism objects were exposed to multiple threads
in a special case: When a task with running LATs was fetched (with an
option to retrieve also subtasks), live instances of prism objects
were returned. If they were later processed - e.g. serialized in GUI
- concurrency-related exceptions could occur.

All of this should be now fixed, along with better naming for methods
that give access to backing task prism object.

A price to pay is that the operation result in returned objects
is not updated in these cases. But it should be acceptable.

This should resolve MID-6910.
  • Loading branch information
mederly committed Mar 31, 2021
1 parent ca31cb8 commit 74806ee
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 68 deletions.
Expand Up @@ -69,7 +69,6 @@
import com.evolveum.midpoint.web.component.MultifunctionalButton;
import com.evolveum.midpoint.web.component.data.ISelectableDataProvider;
import com.evolveum.midpoint.web.component.data.SelectableBeanObjectDataProvider;
import com.evolveum.midpoint.web.component.data.column.ColumnUtils;
import com.evolveum.midpoint.web.component.dialog.ChooseFocusTypeAndRelationDialogPanel;
import com.evolveum.midpoint.web.component.dialog.ConfigureTaskConfirmationPanel;
import com.evolveum.midpoint.web.component.form.MidpointForm;
Expand Down Expand Up @@ -1497,7 +1496,7 @@ protected PrismObject<TaskType> getTask(AjaxRequestTarget target) {
if (task == null) {
return null;
}
PrismObject<TaskType> recomputeTask = task.getClonedTaskObject();
PrismObject<TaskType> recomputeTask = task.getRawTaskObjectClone();
TaskType recomputeTaskType = recomputeTask.asObjectable();
recomputeTaskType.getAssignment().add(ObjectTypeUtil.createAssignmentTo(SystemObjectsType.ARCHETYPE_RECOMPUTATION_TASK.value(), ObjectTypes.ARCHETYPE, getPrismContext()));
return recomputeTask;
Expand Down
Expand Up @@ -48,6 +48,8 @@

import org.jetbrains.annotations.NotNull;

import static com.evolveum.midpoint.util.DebugUtil.lazy;

/**
* Evaluates all assignments and sorts them to triple: added, removed and "kept" assignments.
*
Expand Down Expand Up @@ -140,9 +142,8 @@ private Collection<AssignmentType> getVirtualAssignments() throws SchemaExceptio
beans.prismContext, task, result);
LOGGER.trace("Forced assignments: {}", forcedAssignments);

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Task for process: {}", task.getUpdatedOrClonedTaskObject().debugDump());
}
LOGGER.trace("Task for process (operation result is not updated): {}",
lazy(() -> task.getRawTaskObjectClonedIfNecessary().debugDump()));
Collection<Task> allTasksToRoot = task.getPathToRootTask(result);
Collection<AssignmentType> taskAssignments = allTasksToRoot.stream()
.filter(Task::hasAssignments)
Expand All @@ -158,7 +159,7 @@ private Collection<AssignmentType> getVirtualAssignments() throws SchemaExceptio
private AssignmentType createTaskAssignment(Task fromTask) {
AssignmentType taskAssignment = new AssignmentType(beans.prismContext);
ObjectReferenceType targetRef = new ObjectReferenceType();
targetRef.asReferenceValue().setObject(fromTask.getUpdatedOrClonedTaskObject());
targetRef.asReferenceValue().setObject(fromTask.getRawTaskObjectClonedIfNecessary());
taskAssignment.setTargetRef(targetRef);
return taskAssignment;
}
Expand Down
Expand Up @@ -77,7 +77,8 @@ static VariablesMap initialPreparation(VariablesMap initialVariables,
}

private static void addProvidedVariables(VariablesMap resultingVariables, VariablesMap initialVariables, Task task) {
TypedValue<TaskType> taskValAndDef = new TypedValue<>(task.getUpdatedOrClonedTaskObject().asObjectable(), task.getUpdatedOrClonedTaskObject().getDefinition());
PrismObject<TaskType> taskObject = task.getRawTaskObjectClonedIfNecessary();
TypedValue<TaskType> taskValAndDef = new TypedValue<>(taskObject.asObjectable(), taskObject.getDefinition());
putImmutableValue(resultingVariables, ExpressionConstants.VAR_TASK, taskValAndDef);
if (initialVariables != null) {
initialVariables.forEach((key, value) -> putImmutableValue(resultingVariables, key, value));
Expand Down
Expand Up @@ -3279,7 +3279,7 @@ protected void displayTaskWithOperationStats(String message, PrismObject<TaskTyp
protected void displayTaskWithOperationStats(String message, Task task) throws SchemaException {
display(message, task);
String stats = prismContext.xmlSerializer()
.serializeRealValue(task.getUpdatedOrClonedTaskObject().asObjectable().getOperationStats(), TaskType.F_OPERATION_STATS);
.serializeRealValue(task.getStoredOperationStatsOrClone(), TaskType.F_OPERATION_STATS);
displayValue(message + ": Operational stats", stats);
}

Expand Down Expand Up @@ -3521,14 +3521,14 @@ public void timeout() {
protected OperationResult waitForTaskTreeNextFinishedRun(String rootTaskOid, int timeout) throws Exception {
final OperationResult waitResult = new OperationResult(AbstractIntegrationTest.class + ".waitForTaskTreeNextFinishedRun");
Task origRootTask = taskManager.getTaskWithResult(rootTaskOid, waitResult);
return waitForTaskTreeNextFinishedRun(origRootTask.getUpdatedOrClonedTaskObject().asObjectable(), timeout, waitResult);
return waitForTaskTreeNextFinishedRun(origRootTask.getUpdatedTaskObject().asObjectable(), timeout, waitResult);
}

protected OperationResult runTaskTreeAndWaitForFinish(String rootTaskOid, int timeout) throws Exception {
final OperationResult waitResult = new OperationResult(AbstractIntegrationTest.class + ".runTaskTreeAndWaitForFinish");
Task origRootTask = taskManager.getTaskWithResult(rootTaskOid, waitResult);
restartTask(rootTaskOid, waitResult);
return waitForTaskTreeNextFinishedRun(origRootTask.getUpdatedOrClonedTaskObject().asObjectable(), timeout, waitResult);
return waitForTaskTreeNextFinishedRun(origRootTask.getUpdatedTaskObject().asObjectable(), timeout, waitResult);
}

// a bit experimental
Expand Down
Expand Up @@ -591,7 +591,7 @@ void processPostReportScript(ReportType parentReport, String reportOutputFilePat

VariablesMap variables = new VariablesMap();
variables.put(ExpressionConstants.VAR_OBJECT, parentReport, parentReport.asPrismObject().getDefinition());
PrismObject<TaskType> taskObject = task.getUpdatedOrClonedTaskObject();
PrismObject<TaskType> taskObject = task.getRawTaskObjectClonedIfNecessary();
variables.put(ExpressionConstants.VAR_TASK, taskObject.asObjectable(), taskObject.getDefinition());
variables.put(ExpressionConstants.VAR_FILE, commandLineScriptExecutor.getOsSpecificFilePath(reportOutputFilePath), String.class);

Expand Down
Expand Up @@ -265,7 +265,7 @@ private void processPostReportScript(ReportType parentReport, String reportOutpu

VariablesMap variables = new VariablesMap();
variables.put(ExpressionConstants.VAR_OBJECT, parentReport, parentReport.asPrismObject().getDefinition());
PrismObject<TaskType> taskObject = task.getUpdatedOrClonedTaskObject();
PrismObject<TaskType> taskObject = task.getRawTaskObjectClonedIfNecessary();
variables.put(ExpressionConstants.VAR_TASK, taskObject.asObjectable(), taskObject.getDefinition());
variables.put(ExpressionConstants.VAR_FILE, reportService.getCommandLineScriptExecutor().getOsSpecificFilePath(reportOutputFilePath), String.class);

Expand Down
Expand Up @@ -717,30 +717,25 @@ default List<? extends Task> listSubtasks(OperationResult parentResult) throws S
//region Task object as a whole

/**
* Returns backing task prism object.
*
* *AVOID* use of this method if possible:
*
* - for regular tasks it has to update operation result in the prism object (might be costly)
* - for running tasks it provides a clone of the actual prism object (even more costly and leads to lost changes
* if the returned value is changed)
* Returns backing task prism object without updating with current operation result.
* If the task is running, a clone is returned.
*/
@NotNull
PrismObject<TaskType> getUpdatedOrClonedTaskObject();
PrismObject<TaskType> getRawTaskObjectClonedIfNecessary();

/**
* Returns backing task prism object, provided that task is not running.
* Beware that the task operation result is updated (might be costly).
* @throws IllegalStateException if task is running
* Returns CLONE of backing task prism object without updating with current operation result.
*/
@NotNull
PrismObject<TaskType> getUpdatedTaskObject();
PrismObject<TaskType> getRawTaskObjectClone();

/**
* Returns cloned task object.
* Returns backing task prism object UPDATED with current operation result.
*
* Assumes that task is not running. (Otherwise IllegalStateException is thrown.)
*/
@NotNull
PrismObject<TaskType> getClonedTaskObject();
PrismObject<TaskType> getUpdatedTaskObject();

/**
* Returns a reference to the task prism.
Expand Down
Expand Up @@ -317,21 +317,14 @@ public void setProgressImmediate(Long progress, OperationResult parentResult) {
throw new UnsupportedOperationException();
}

@NotNull
@Override
public PrismObject<TaskType> getUpdatedOrClonedTaskObject() {
throw new UnsupportedOperationException();
}

@NotNull
@Override
public PrismObject<TaskType> getUpdatedTaskObject() {
throw new UnsupportedOperationException();
}

@NotNull
@Override
public PrismObject<TaskType> getClonedTaskObject() {
public @NotNull PrismObject<TaskType> getRawTaskObjectClone() {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -492,6 +485,11 @@ public List<Task> listPrerequisiteTasks(OperationResult parentResult) {
throw new UnsupportedOperationException();
}

@Override
public @NotNull PrismObject<TaskType> getRawTaskObjectClonedIfNecessary() {
throw new UnsupportedOperationException();
}

@Override
public List<String> getDependents() {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -48,7 +48,7 @@ public Collection<TaskQuartzImpl> getTransientSubtasks(String identifier) {
if (runningInstance != null) {
List<TaskQuartzImpl> subtasks = new ArrayList<>();
for (RunningTaskQuartzImpl subtask : runningInstance.getLightweightAsynchronousSubtasks()) {
subtasks.add(subtask.cloneAsStaticTask());
subtasks.add(subtask.cloneAsStaticTask()); // Beware, does not update operation result in task prism
}
return subtasks;
} else {
Expand Down
Expand Up @@ -272,30 +272,32 @@ private boolean isLiveRunningInstance() {
return this instanceof RunningTask;
}

/**
* TODO TODO TODO (think out better name)
* Use with care. Never provide to outside world (beyond task manager).
*/
public PrismObject<TaskType> getLiveTaskObjectForNotRunningTasks() {
@NotNull
@Override
public PrismObject<TaskType> getRawTaskObjectClonedIfNecessary() {
if (isLiveRunningInstance()) {
throw new UnsupportedOperationException("It is not possible to get live task prism object from the running task instance: " + this);
return getRawTaskObjectClone();
} else {
return taskPrism;
}
}

// Use with utmost care! Never provide to outside world (beyond task manager)
public PrismObject<TaskType> getLiveTaskObject() {
return taskPrism;
}

@NotNull
@Override
public PrismObject<TaskType> getUpdatedOrClonedTaskObject() {
public PrismObject<TaskType> getRawTaskObjectClone() {
synchronized (prismAccess) {
return taskPrism.clone();
}
}


/**
* Returns the backing task prism object. Not supported for running task instances.
*/
public PrismObject<TaskType> getRawTaskObject() {
if (isLiveRunningInstance()) {
return getClonedTaskObject();
throw new IllegalStateException("Cannot get task object from live running task instance");
} else {
updateTaskPrismResult(taskPrism);
return taskPrism;
}
}
Expand All @@ -312,17 +314,7 @@ public PrismObject<TaskType> getUpdatedTaskObject() {
}

TaskQuartzImpl cloneAsStaticTask() {
return TaskQuartzImpl.createFromPrismObject(taskManager, getClonedTaskObject());
}

@NotNull
@Override
public PrismObject<TaskType> getClonedTaskObject() {
synchronized (prismAccess) {
PrismObject<TaskType> rv = taskPrism.clone();
updateTaskPrismResult(rv);
return rv;
}
return TaskQuartzImpl.createFromPrismObject(taskManager, getRawTaskObjectClone());
}

public boolean isRecreateQuartzTrigger() {
Expand Down
Expand Up @@ -96,7 +96,7 @@ private void persist(TaskQuartzImpl task, OperationResult result) {
}

try {
CryptoUtil.encryptValues(protector, task.getLiveTaskObjectForNotRunningTasks());
CryptoUtil.encryptValues(protector, task.getRawTaskObject());
addTaskToRepositoryAndQuartz(task, null, result);
} catch (ObjectAlreadyExistsException ex) {
// This should not happen. If it does, it is a bug. It is OK to convert to a runtime exception
Expand Down
Expand Up @@ -223,7 +223,7 @@ private List<?> getSubtasks(Object task, OperationResult result) throws SchemaEx
private void addSubtask(Object task, Object subtask) {
TaskType subtaskBean;
if (subtask instanceof TaskQuartzImpl) {
subtaskBean = ((TaskQuartzImpl) subtask).getLiveTaskObject().asObjectable();
subtaskBean = ((TaskQuartzImpl) subtask).getRawTaskObjectClonedIfNecessary().asObjectable();
} else if (subtask instanceof PrismObject<?>) {
//noinspection unchecked
subtaskBean = ((PrismObject<TaskType>) subtask).asObjectable();
Expand Down Expand Up @@ -336,7 +336,7 @@ private void addTransientTaskInformation(Object task, ClusterStatusInformation c
}
TaskType taskBean;
if (task instanceof TaskQuartzImpl) {
taskBean = ((TaskQuartzImpl) task).getLiveTaskObjectForNotRunningTasks().asObjectable();
taskBean = ((TaskQuartzImpl) task).getRawTaskObject().asObjectable();
} else if (task instanceof PrismObject<?>) {
//noinspection unchecked
taskBean = ((PrismObject<TaskType>) task).asObjectable();
Expand Down
Expand Up @@ -86,7 +86,7 @@ private TracingEnvironmentType createTracingEnvironmentDescription(Task task, Tr
selectedNodeInformation.setClustered(localNode.isClustered());
environment.setNodeRef(ObjectTypeUtil.createObjectRefWithFullObject(selectedNodeInformation, prismContext));
}
TaskType taskClone = task.getClonedTaskObject().asObjectable();
TaskType taskClone = task.getRawTaskObjectClone().asObjectable(); // is it OK that we use not updated op. result?
if (taskClone.getResult() != null) {
taskClone.getResult().getPartialResults().clear();
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ public static class MyLightweightTaskHandler implements LightweightTaskHandler {
private boolean hasRun = false;
private boolean hasExited = false;
private final long duration;
private static final long STEP = 100;
private static final long STEP = 10;

MyLightweightTaskHandler(Integer duration) {
this.duration = duration != null ? duration : 86400L * 1000L * 365000L; // 1000 years
Expand All @@ -72,8 +72,9 @@ public void run(RunningLightweightTask task) {
//assertTrue("Subtask is not in Running LAT list of parent", isAmongRunningChildren(task, parentTask));

while (System.currentTimeMillis() < end && task.canRun()) {
// hoping to get ConcurrentModificationException when setting operation result here (MID-5113)
task.getLightweightTaskParent().getUpdatedOrClonedTaskObject();
// Check for some concurrency issues - although not related to the operation result, because
// it is inherently not thread-safe.
task.getLightweightTaskParent().getRawTaskObjectClonedIfNecessary();
IterationItemInformation info = new IterationItemInformation("o1", null, UserType.COMPLEX_TYPE, "oid1");
Operation op = task.recordIterativeOperationStart(info);
try {
Expand All @@ -88,6 +89,9 @@ public void run(RunningLightweightTask task) {
op.failed(e);
break;
}
OperationResult result = task.getResult();
result.createSubresult("test");
result.summarize();
}
hasExited = true;
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
import com.evolveum.midpoint.schema.util.task.TaskTreeUtil;
import com.evolveum.midpoint.task.api.RunningLightweightTask;

import com.evolveum.midpoint.util.exception.CommonException;

import org.jetbrains.annotations.NotNull;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
Expand All @@ -52,7 +54,6 @@
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.result.OperationResultStatus;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskConstants;
import com.evolveum.midpoint.task.quartzimpl.cluster.ClusterManager;
Expand Down Expand Up @@ -769,6 +770,9 @@ public void test250TaskWithThreads() throws Exception {
when();

add(TASK_WITH_THREADS, result);

checkTreadSafety(TASK_WITH_THREADS.oid, 1000L, result);

waitUntilDone(TASK_WITH_THREADS.oid, result, 15000, 100);
waitForTaskClose(TASK_WITH_THREADS.oid, result, 15000, 100);

Expand All @@ -787,6 +791,40 @@ public void test250TaskWithThreads() throws Exception {
}
}

/**
* A simple test for MID-6910.
*/
@SuppressWarnings("SameParameterValue")
private void checkTreadSafety(String oid, long duration, OperationResult result)
throws CommonException, InterruptedException {

PrismObject<TaskType> retrievedTask;
long start = System.currentTimeMillis();
for (;;) {
Collection<SelectorOptions<GetOperationOptions>> options = schemaService.getOperationOptionsBuilder()
.item(TaskType.F_SUBTASK_REF).retrieve()
.build();
retrievedTask = taskManager.getObject(TaskType.class, oid, options, result);
if (!retrievedTask.asObjectable().getSubtaskRef().isEmpty()) {
break;
}
if (System.currentTimeMillis() - start > 3000) {
throw new AssertionError("Timed out waiting for the task to create subtasks");
}
//noinspection BusyWait
Thread.sleep(200);
}

display("Subtasks: " + retrievedTask.asObjectable().getSubtaskRef().size());
long startCloning = System.currentTimeMillis();
int cloneIterations = 0;
while (System.currentTimeMillis() - startCloning < duration) {
retrievedTask.clone();
cloneIterations++;
}
display("Clone iterations done: " + cloneIterations);
}

@SuppressWarnings("SameParameterValue")
private void waitUntilDone(String taskOid, OperationResult result, int duration, int checkInterval)
throws SchemaException, ObjectNotFoundException, InterruptedException {
Expand Down

0 comments on commit 74806ee

Please sign in to comment.