Skip to content

Commit

Permalink
Fix task importing process
Browse files Browse the repository at this point in the history
The hooks onTaskCreate/onTaskDelete were replaced by more serious
approach: task manager is called directly from the raw execution
in the model controller.
  • Loading branch information
mederly committed Feb 23, 2021
1 parent 4d4b9b6 commit 675b09a
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 78 deletions.
Expand Up @@ -306,6 +306,15 @@ public static boolean isClassManagedByProvisioning(Class<? extends ObjectType> c
return false;
}

public static boolean isClassManagedByTaskManager(@NotNull Class<? extends ObjectType> clazz) {
for (ObjectTypes type : ObjectTypes.values()) {
if (type.getClassDefinition().isAssignableFrom(clazz)) {
return type.isManagedByTaskManager();
}
}
return false;
}

public static boolean isObjectTypeManagedByProvisioning(Class<? extends ObjectType> objectType) {
Validate.notNull(objectType, "Object type must not be null.");

Expand Down
Expand Up @@ -514,7 +514,12 @@ private Collection<ObjectDeltaOperation<? extends ObjectType>> executeChangesRaw
}
String oid;
try {
oid = cacheRepositoryService.addObject(objectToAdd, repoOptions, result1);
if (objectToAdd.canRepresent(TaskType.class)) {
//noinspection unchecked
oid = taskManager.addTask((PrismObject<TaskType>) objectToAdd, result1);
} else {
oid = cacheRepositoryService.addObject(objectToAdd, repoOptions, result1);
}
task.recordObjectActionExecuted(objectToAdd, null, oid, ChangeType.ADD, task.getChannel(), null);
} catch (Throwable t) {
task.recordObjectActionExecuted(objectToAdd, null, null, ChangeType.ADD, task.getChannel(), t);
Expand Down Expand Up @@ -550,9 +555,12 @@ private Collection<ObjectDeltaOperation<? extends ObjectType>> executeChangesRaw
ModelImplUtils.clearRequestee(task);
provisioning.deleteObject(delta.getObjectTypeClass(), delta.getOid(),
ProvisioningOperationOptions.createRaw(), null, task, result1);
} else if (TaskType.class.isAssignableFrom(delta.getObjectTypeClass())) {
// Maybe we should check if the task is not running. However, this is raw processing.
// (But, actually, this is better than simply deleting the task from repository.)
taskManager.deleteTask(delta.getOid(), result1);
} else {
cacheRepositoryService.deleteObject(delta.getObjectTypeClass(), delta.getOid(),
result1);
cacheRepositoryService.deleteObject(delta.getObjectTypeClass(), delta.getOid(), result1);
}
task.recordObjectActionExecuted(objectToDetermineDetailsForAudit, delta.getObjectTypeClass(), delta.getOid(), ChangeType.DELETE, task.getChannel(), null);
} catch (Throwable t) {
Expand All @@ -573,8 +581,12 @@ private Collection<ObjectDeltaOperation<? extends ObjectType>> executeChangesRaw
securityEnforcer.authorize(ModelAuthorizationAction.MODIFY.getUrl(), null, autzParams, null, task, result1);
}
try {
cacheRepositoryService.modifyObject(delta.getObjectTypeClass(), delta.getOid(),
delta.getModifications(), result1);
if (TaskType.class.isAssignableFrom(delta.getObjectTypeClass())) {
taskManager.modifyTask(delta.getOid(), delta.getModifications(), result1);
} else {
cacheRepositoryService.modifyObject(delta.getObjectTypeClass(), delta.getOid(),
delta.getModifications(), result1);
}
task.recordObjectActionExecuted(existingObject, ChangeType.MODIFY, null);
} catch (Throwable t) {
task.recordObjectActionExecuted(existingObject, ChangeType.MODIFY, t);
Expand Down Expand Up @@ -1479,9 +1491,7 @@ public void importObjectsFromStream(InputStream input, String language, ImportOp
result.addParam(OperationResult.PARAM_LANGUAGE, language);
try {
objectImporter.importObjects(input, language, options, task, result);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Import result:\n{}", result.debugDump());
}
LOGGER.trace("Import result:\n{}", result.debugDumpLazily());
// No need to compute status. The validator inside will do it.
// result.computeStatus("Couldn't import object from input stream.");
} catch (RuntimeException e) {
Expand Down
Expand Up @@ -204,7 +204,7 @@ private <T extends Objectable> EventResult importParsedObject(PrismObject<T> pri
object = migrator.migrate(object);

ModelImplUtils.resolveReferences(object, repository,
(options == null || options.isReferentialIntegrity() == null) ? false : options.isReferentialIntegrity(),
options != null && options.isReferentialIntegrity() != null && options.isReferentialIntegrity(),
false, EvaluationTimeType.IMPORT, false, prismContext, objectResult);

objectResult.computeStatus();
Expand Down Expand Up @@ -368,9 +368,6 @@ private <T extends ObjectType> void importObjectToRepository(PrismObject<T> obje

String deletedOid = deleteObject(foundObject, repository, result);
if (deletedOid != null) {
if (object.canRepresent(TaskType.class)) {
taskManager.onTaskDelete(deletedOid, result);
}
if (isTrue(options.isKeepOid())) {
object.setOid(deletedOid);
}
Expand Down Expand Up @@ -423,9 +420,6 @@ private <T extends ObjectType> void addObject(PrismObject<T> object, boolean ove
if (oidOfAddedObject == null) {
LOGGER.warn("No OID of added object. Executed deltas:\n{}", DebugUtil.debugDump(executedDeltas));
} else {
if (object.canRepresent(TaskType.class)) {
taskManager.onTaskCreate(oidOfAddedObject, parentResult);
}
if (object.canRepresent(ResourceType.COMPLEX_TYPE) && isTrue(importOptions.isFetchResourceSchema())) {
modelService.testResource(oidOfAddedObject, task);
}
Expand Down
Expand Up @@ -33,7 +33,7 @@
<recurrence>recurring</recurrence>
<binding>tight</binding>
<schedule>
<interval>10</interval>
<interval>2</interval>
</schedule>

</task>
Expand Up @@ -3269,7 +3269,7 @@ protected void waitForTaskStart(final String taskOid, final boolean checkSubresu
public boolean check() throws CommonException {
Task freshTask = taskManager.getTaskWithResult(taskOid, waitResult);
OperationResult result = freshTask.getResult();
if (verbose) { display("Check result", result); }
if (verbose) { display("Task checked (result=" + result + ")", freshTask); }
assert !isError(result, checkSubresult) : "Error in " + freshTask + ": " + TestUtil.getErrorMessage(result);
if (isUnknown(result, checkSubresult)) {
return false;
Expand All @@ -3282,7 +3282,7 @@ public void timeout() {
try {
Task freshTask = taskManager.getTaskWithResult(taskOid, waitResult);
OperationResult result = freshTask.getResult();
logger.debug("Result of timed-out task:\n{}", result.debugDump());
logger.debug("Result of timed-out task:\n{}", DebugUtil.debugDump(result));
assert false : "Timeout (" + timeout + ") while waiting for " + freshTask + " to start. Last result " + result;
} catch (ObjectNotFoundException | SchemaException e) {
logger.error("Exception during task refresh: {}", e, e);
Expand Down
Expand Up @@ -317,18 +317,6 @@ default Task createTaskInstance() {
* Deletes dead nodes, i.e. ones that were not checked-in for a given time period.
*/
void cleanupNodes(DeadNodeCleanupPolicyType deadNodesPolicy, RunningTask task, OperationResult opResult) throws SchemaException;

/**
* This is a signal to task manager that a new task was created in the repository.
* Task manager can react to it e.g. by creating shadow quartz job and trigger.
*/
void onTaskCreate(String oid, OperationResult parentResult);

/**
* This is a signal to task manager that a task was removed from the repository.
* Task manager can react to it e.g. by removing shadow quartz job and trigger.
*/
void onTaskDelete(String oid, OperationResult parentResult);
//endregion

//region Remotely invokable methods
Expand Down
Expand Up @@ -696,49 +696,6 @@ public Collection<String> getHandlerUrisForArchetype(String archetypeOid, boolea
}
//endregion

//region Task creation/removal event handling
@Override
public void onTaskCreate(String oid, OperationResult parentResult) {

OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "onTaskCreate");
result.addParam("oid", oid);

LOGGER.trace("onTaskCreate called for oid = " + oid);

TaskQuartzImpl task;
try {
task = getTaskPlain(oid, result);
} catch (ObjectNotFoundException e) {
LoggingUtils.logException(LOGGER, "Quartz shadow job cannot be created, because task in repository was not found; oid = {}", e, oid);
result.computeStatus();
return;
} catch (SchemaException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Quartz shadow job cannot be created, because task from repository could not be retrieved; oid = {}", e, oid);
result.computeStatus();
return;
}

task.synchronizeWithQuartz(result);
result.computeStatus();
}

@Override
public void onTaskDelete(String oid, OperationResult parentResult) {

OperationResult result = parentResult.createMinorSubresult(DOT_INTERFACE + "onTaskDelete");
result.addParam("oid", oid);
try {
LOGGER.trace("onTaskDelete called for oid = {}", oid);
localScheduler.deleteTaskFromQuartz(oid, true, result);
} catch (Throwable t) {
result.recordFatalError(t);
throw t;
} finally {
result.computeStatusIfUnknown();
}
}
//endregion

//region Notifications
@Override
public void registerTaskListener(TaskListener taskListener) {
Expand Down
Expand Up @@ -1973,16 +1973,16 @@ public void addSubtask(TaskType subtaskBean) {
public String debugDump(int indent) {
StringBuilder sb = new StringBuilder();
DebugUtil.indentDebugDump(sb, indent);
sb.append("Task(");
sb.append(TaskQuartzImpl.class.getName());
sb.append(")\n");
sb.append("Task(").append(getClass().getSimpleName()).append(")\n");
DebugUtil.debugDumpLabelLn(sb, "prism", indent + 1);
synchronized (prismAccess) {
sb.append(taskPrism.debugDump(indent + 2));
}
sb.append("\n");
DebugUtil.debugDumpWithLabelToStringLn(sb, "persistenceStatus", getPersistenceStatus(), indent);
DebugUtil.debugDumpWithLabelLn(sb, "taskResult", taskResult, indent);
DebugUtil.debugDumpWithLabelToStringLn(sb, "persistenceStatus", getPersistenceStatus(), indent + 1);
DebugUtil.debugDumpWithLabelLn(sb, "taskResult", taskResult, indent + 1);
DebugUtil.debugDumpWithLabelLn(sb, "pendingModifications", new ArrayList<>(pendingModifications), indent + 1);
DebugUtil.debugDumpWithLabelLn(sb, "recreateQuartzTrigger", recreateQuartzTrigger, indent + 1);
return sb.toString();
}

Expand Down

0 comments on commit 675b09a

Please sign in to comment.