Skip to content

Commit

Permalink
fix for MID-5211
Browse files Browse the repository at this point in the history
  • Loading branch information
katkav committed Apr 10, 2019
1 parent 35968e1 commit 98d20d2
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 117 deletions.
Expand Up @@ -350,8 +350,6 @@ private void processRequest(ProcessingRequest request, RunningTask workerTask, O
// The meat
cont = handleObject(object, workerTask, result);

// LOGGER.info("###Recon result: {}", result.getStatus());
// LOGGER.info("###Recon result dump: {}", result.debugDump());

// We do not want to override the result set by handler. This is just a fallback case
if (result.isUnknown() || result.isInProgress()) {
Expand Down Expand Up @@ -462,7 +460,6 @@ private boolean processError(PrismObject<O> object, Task task, Throwable ex, Ope
result.recordFatalError("Failed to "+getProcessShortName()+": "+ex.getMessage(), ex);
}
result.summarize();
LOGGER.info("stop on error return: {}", !isStopOnError(task, ex, result));
return !isStopOnError(task, ex, result);
// return !isStopOnError();
}
Expand Down
Expand Up @@ -665,99 +665,6 @@ private TaskRunResult executeHandler(TaskHandler handler, OperationResult execut
waitForTransientChildrenAndCloseThem(executionResult);
return runResult;
}
//
// private TaskRunResult executePlainTaskHandler(TaskHandler handler) {
// TaskRunResult runResult;
// try {
// LOGGER.trace("Executing handler {}", handler.getClass().getName());
// runResult = handler.run(task);
// if (runResult == null) { // Obviously an error in task handler
// LOGGER.error("Unable to record run finish: task returned null result");
// runResult = createFailureTaskRunResult("Unable to record run finish: task returned null result", null);
// }
// } catch (Throwable t) {
// LOGGER.error("Task handler threw unexpected exception: {}: {}; task = {}", t.getClass().getName(), t.getMessage(), task, t);
// runResult = createFailureTaskRunResult("Task handler threw unexpected exception: " + t.getMessage(), t);
// }
// return runResult;
// }
//
// private TaskRunResult executeWorkBucketAwareTaskHandler(WorkBucketAwareTaskHandler handler, OperationResult executionResult) {
// WorkStateManager workStateManager = taskManagerImpl.getWorkStateManager();
//
// if (task.getWorkState() != null && Boolean.TRUE.equals(task.getWorkState().isAllWorkComplete())) {
// LOGGER.debug("Work is marked as complete; restarting it in task {}", task);
// try {
// List<ItemDelta<?, ?>> itemDeltas = taskManagerImpl.getPrismContext().deltaFor(TaskType.class)
// .item(TaskType.F_WORK_STATE).replace()
// .asItemDeltas();
// task.applyDeltasImmediate(itemDeltas, executionResult);
// } catch (SchemaException | ObjectAlreadyExistsException | ObjectNotFoundException | RuntimeException e) {
// LoggingUtils.logUnexpectedException(LOGGER, "Couldn't remove work state from (completed) task {} -- continuing", e, task);
// }
// }
//
// boolean initialBucket = true;
// TaskWorkBucketProcessingResult runResult = null;
// for (;;) {
// WorkBucketType bucket;
// try {
// try {
// bucket = workStateManager.getWorkBucket(task.getOid(), FREE_BUCKET_WAIT_TIME, () -> task.canRun(), initialBucket, executionResult);
// } catch (InterruptedException e) {
// LOGGER.trace("InterruptedExecution in getWorkBucket for {}", task);
// if (task.canRun()) {
// throw new IllegalStateException("Unexpected InterruptedException: " + e.getMessage(), e);
// } else {
// return createInterruptedTaskRunResult();
// }
// }
// } catch (Throwable t) {
// LoggingUtils.logUnexpectedException(LOGGER, "Couldn't allocate a work bucket for task {} (coordinator {})", t, task, null);
// return createFailureTaskRunResult("Couldn't allocate a work bucket for task: " + t.getMessage(), t);
// }
// initialBucket = false;
// if (bucket == null) {
// LOGGER.trace("No (next) work bucket within {}, exiting", task);
// runResult = handler.onNoMoreBuckets(task, runResult);
// return runResult != null ? runResult : createSuccessTaskRunResult();
// }
// try {
// LOGGER.trace("Executing handler {} with work bucket of {} for {}", handler.getClass().getName(), bucket, task);
// runResult = handler.run(task, bucket, runResult);
// LOGGER.trace("runResult is {} for {}", runResult, task);
// if (runResult == null) { // Obviously an error in task handler
// LOGGER.error("Unable to record run finish: task returned null result");
// //releaseWorkBucketChecked(bucket, executionResult);
// return createFailureTaskRunResult("Unable to record run finish: task returned null result", null);
// }
// } catch (Throwable t) {
// LOGGER.error("Task handler threw unexpected exception: {}: {}; task = {}", t.getClass().getName(), t.getMessage(), task, t);
// //releaseWorkBucketChecked(bucket, executionResult);
// return createFailureTaskRunResult("Task handler threw unexpected exception: " + t.getMessage(), t);
// }
// if (!runResult.isBucketComplete()) {
// return runResult;
// }
// try {
// taskManagerImpl.getWorkStateManager().completeWorkBucket(task.getOid(), bucket.getSequentialNumber(), executionResult);
// } catch (ObjectAlreadyExistsException | ObjectNotFoundException | SchemaException | RuntimeException e) {
// LoggingUtils.logUnexpectedException(LOGGER, "Couldn't complete work bucket for task {}", e, task);
// return createFailureTaskRunResult("Couldn't complete work bucket: " + e.getMessage(), e);
// }
// if (!task.canRun() || !runResult.isShouldContinue()) {
// return runResult;
// }
// }
// }

// private void releaseWorkBucketChecked(AbstractWorkBucketType bucket, OperationResult executionResult) {
// try {
// taskManagerImpl.getWorkStateManager().releaseWorkBucket(task.getOid(), bucket.getSequentialNumber(), executionResult);
// } catch (com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException | ObjectNotFoundException | SchemaException e) {
// LoggingUtils.logUnexpectedException(LOGGER, "Couldn't release work bucket for task {}", e, task);
// }
// }

private OperationResult createOperationResult(String methodName) {
return new OperationResult(DOT_CLASS + methodName);
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.springframework.stereotype.Component;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.delta.ContainerDelta;
import com.evolveum.midpoint.prism.delta.DeltaFactory;
import com.evolveum.midpoint.repo.api.CounterManager;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.RunningTask;
Expand All @@ -37,12 +39,20 @@
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskRunResult;
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.task.quartzimpl.InternalTaskInterface;
import com.evolveum.midpoint.task.quartzimpl.RunningTaskQuartzImpl;
import com.evolveum.midpoint.task.quartzimpl.execution.HandlerExecutor;
import com.evolveum.midpoint.task.quartzimpl.work.WorkStateManager;
import com.evolveum.midpoint.util.MiscUtil;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskPartitionDefinitionType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskPartitionsDefinitionType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkStateType;

/**
* @author katka
Expand All @@ -55,10 +65,10 @@ public class LightweightPartitioningTaskHandler implements TaskHandler {

private static final String HANDLER_URI = TaskConstants.LIGHTWEIGTH_PARTITIONING_TASK_HANDLER_URI;

@Autowired private PrismContext prismContext;
@Autowired private TaskManager taskManager;
@Autowired private HandlerExecutor handlerExecutor;
@Autowired private CounterManager counterManager;
@Autowired private PrismContext prismContext;


@PostConstruct
Expand All @@ -73,6 +83,9 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType taskParti
runResult.setProgress(task.getProgress());
runResult.setOperationResult(opResult);

if (taskPartition != null && taskPartition.getWorkManagement() != null) {
throw new UnsupportedOperationException("Work management not supoorted in partitions for lightweigth partitioning task");
}

TaskPartitionsDefinitionType partitionsDefinition = task.getWorkManagement().getPartitions();
List<TaskPartitionDefinitionType> partitions = partitionsDefinition.getPartition();
Expand Down Expand Up @@ -102,8 +115,8 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType taskParti
partitions.sort(comparator);
for (TaskPartitionDefinitionType partition : partitions) {
TaskHandler handler = taskManager.getHandler(partition.getHandlerUri());
LOGGER.trace("Starting to execute handler {} defined in partition {}", handler, partition);
TaskRunResult subHandlerResult = handlerExecutor.executeHandler((RunningTaskQuartzImpl) task, partition, handler, opResult);
// TaskRunResult subHandlerResult = handler.run(task, partition);
OperationResult subHandlerOpResult = subHandlerResult.getOperationResult();
opResult.addSubresult(subHandlerOpResult);
runResult = subHandlerResult;
Expand All @@ -116,6 +129,14 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType taskParti
if (subHandlerOpResult.isError()) {
break;
}

try {
LOGGER.trace("Cleaning up work state in task {}, workState: {}", task, task.getWorkState());
cleanupWorkState(task, runResult.getOperationResult());
} catch (ObjectNotFoundException | SchemaException | ObjectAlreadyExistsException e) {
LOGGER.error("Unexpected error during cleaning work state: " + e.getMessage(), e);
throw new IllegalStateException(e);
}
}

runResult.setProgress(runResult.getProgress() + 1);
Expand All @@ -125,19 +146,27 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType taskParti
return runResult;
}

public void cleanupWorkState(RunningTask runningTask, OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, ObjectAlreadyExistsException {
ContainerDelta<TaskWorkStateType> containerDelta = (ContainerDelta<TaskWorkStateType>) prismContext
.deltaFor(TaskType.class).item(TaskType.F_WORK_STATE).replace().asItemDelta();
((InternalTaskInterface) runningTask).applyDeltasImmediate(MiscUtil.createCollection(containerDelta), parentResult);

}

private boolean canContinue(RunningTask task, TaskRunResult runResult) {
if (!task.canRun() || runResult.getRunResultStatus() == TaskRunResultStatus.INTERRUPTED) {
// first, if a task was interrupted, we do not want to change its status
LOGGER.trace("Task was interrupted, exiting the execution cycle. Task = {}", task);
return true;
return false;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.TEMPORARY_ERROR) {
LOGGER.trace("Task encountered temporary error, continuing with the execution cycle. Task = {}", task);
return false;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.RESTART_REQUESTED) {
// in case of RESTART_REQUESTED we have to get (new) current handler and restart it
// this is implemented by pushHandler and by Quartz
LOGGER.trace("Task returned RESTART_REQUESTED state, exiting the execution cycle. Task = {}", task);
return true;
return false;
} else if (runResult.getRunResultStatus() == TaskRunResultStatus.PERMANENT_ERROR) {
LOGGER.info("Task encountered permanent error, suspending the task. Task = {}", task);
return false;
Expand Down Expand Up @@ -170,10 +199,4 @@ public String getCategoryName(Task task) {
return TaskCategory.UTIL;
}


// private void processErrorCriticality(Task task, TaskPartitionDefinitionType partitionDefinition, Throwable ex, OperationResult result) throws ObjectNotFoundException, CommunicationException, SchemaException, ConfigurationException, SecurityViolationException, PolicyViolationException, ExpressionEvaluationException, ObjectAlreadyExistsException, PreconditionViolationException {
// CriticalityType criticality = ExceptionUtil.getCriticality(partitionDefinition.getErrorCriticality(), ex, CriticalityType.PARTIAL);
// RepoCommonUtils.processErrorCriticality(task.getTaskType(), criticality, ex, result);
//
// }
}
Expand Up @@ -34,6 +34,7 @@
import com.evolveum.midpoint.prism.ItemDefinition;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ContainerDelta;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.delta.ItemDeltaCollectionsUtil;
import com.evolveum.midpoint.prism.path.ItemPath;
Expand All @@ -46,6 +47,7 @@
import com.evolveum.midpoint.repo.api.VersionPrecondition;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.TaskWorkStateTypeUtil;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskExecutionStatus;
import com.evolveum.midpoint.task.api.TaskManager;
Expand All @@ -60,6 +62,7 @@
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategyFactory;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandler;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandlerRegistry;
import com.evolveum.midpoint.util.MiscUtil;
import com.evolveum.midpoint.util.backoff.BackoffComputer;
import com.evolveum.midpoint.util.backoff.ExponentialBackoffComputer;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
Expand Down Expand Up @@ -170,7 +173,7 @@ public WorkBucketType getWorkBucket(@NotNull String workerTaskOid, long freeBuck
return getWorkBucketMultiNode(ctx, freeBucketWaitTime, result);
}
}

private WorkBucketType findSelfAllocatedBucket(Context ctx) {
TaskWorkStateType workState = ctx.workerTask.getWorkState();
if (workState == null || workState.getBucket().isEmpty()) {
Expand Down Expand Up @@ -503,6 +506,7 @@ private void completeWorkBucketStandalone(Context ctx, int sequentialNumber, Ope
Collection<ItemDelta<?, ?>> modifications = bucketStateChangeDeltas(bucket, WorkBucketStateType.COMPLETE);
repositoryService.modifyObject(TaskType.class, ctx.workerTask.getOid(), modifications, null, result);
((InternalTaskInterface) ctx.workerTask).applyModificationsTransient(modifications);
((InternalTaskInterface) ctx.workerTask).applyDeltasImmediate(modifications, result);
compressCompletedBuckets(ctx.workerTask, result);
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public abstract class TestThresholds extends AbstractStoryTest {
private static final String ROLE_POLICY_RULE_CREATE_OID = "00000000-role-0000-0000-999111111112";

private static final File ROLE_POLICY_RULE_CHANGE_ACTIVATION_FILE = new File(TEST_DIR, "role-policy-rule-change-activation.xml");
private static final String ROLE_POLICY_RULE_CHANGE_ACTIVATION_OID = "00000000-role-0000-0000-999111111223";
protected static final String ROLE_POLICY_RULE_CHANGE_ACTIVATION_OID = "00000000-role-0000-0000-999111111223";

private static final File TASK_IMPORT_BASE_USERS_FILE = new File(TEST_DIR, "task-opendj-import-base-users.xml");
private static final String TASK_IMPORT_BASE_USERS_OID = "fa25e6dc-a858-11e7-8ebc-eb2b71ecce1d";
Expand Down Expand Up @@ -250,15 +250,14 @@ public void test520changeActivationThreeAccounts() throws Exception {

//WHEN
displayWhen(TEST_NAME);
OperationResult reconResult = waitForTaskNextRun(getTaskOid(), false, 20000, true);
OperationResult reconResult = waitForTaskResume(getTaskOid(), false, 20000);
assertFailure(reconResult);

//THEN

Task taskAfter = taskManager.getTaskWithResult(getTaskOid(), result);

assertTaskExecutionStatus(getTaskOid(), TaskExecutionStatus.SUSPENDED);
assertUsers(getNumberOfUsers() + getProcessedUsers()*2);

assertSynchronizationStatisticsActivation(taskAfter);

Expand Down
Expand Up @@ -73,7 +73,7 @@ protected void assertSynchronizationStatisticsAfterImport(Task taskAfter) throws
}

protected void assertSynchronizationStatisticsActivation(Task taskAfter) {
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnmatched(), 5);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnmatched(), 3);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountDeleted(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountLinked(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnlinked(), 0);
Expand Down
Expand Up @@ -68,7 +68,7 @@ protected void assertSynchronizationStatisticsAfterImport(Task taskAfter) throws
}

protected void assertSynchronizationStatisticsActivation(Task taskAfter) {
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnmatched(), 5);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnmatched(), 3);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountDeleted(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountLinked(), 0);
assertEquals(taskAfter.getStoredOperationStats().getSynchronizationInformation().getCountUnlinked(), 0);
Expand Down

0 comments on commit 98d20d2

Please sign in to comment.