Skip to content

Commit

Permalink
Add narrowing of reconciliation task queries
Browse files Browse the repository at this point in the history
The (forgotten) narrowing of queries in 2nd and 3rd reconciliation
stages according to work buckets was added.
  • Loading branch information
mederly committed Mar 15, 2018
1 parent 84ece60 commit 9304940
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 34 deletions.
Expand Up @@ -90,10 +90,10 @@
@Component
public class ReconciliationTaskHandler implements WorkBucketAwareTaskHandler {

public static final String HANDLER_URI = ModelPublicConstants.RECONCILIATION_TASK_HANDLER_URI;
public static final String FIRST_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_1;
public static final String SECOND_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_2;
public static final String THIRD_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_3;
private static final String HANDLER_URI = ModelPublicConstants.RECONCILIATION_TASK_HANDLER_URI;
private static final String FIRST_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_1;
private static final String SECOND_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_2;
private static final String THIRD_STAGE_HANDLER_URI = ModelPublicConstants.PARTITIONED_RECONCILIATION_TASK_HANDLER_URI_3;

/**
* Just for testability. Used in tests. Injected by explicit call to a
Expand Down Expand Up @@ -264,12 +264,14 @@ public TaskWorkBucketProcessingResult run(Task localCoordinatorTask, WorkBucketT
long afterResourceReconTimestamp;
long afterShadowReconTimestamp;
try {
if (isStage(stage, Stage.SECOND) && !performResourceReconciliation(resource, objectclassDef, reconResult, localCoordinatorTask, opResult)) {
if (isStage(stage, Stage.SECOND) && !performResourceReconciliation(resource, objectclassDef, reconResult,
localCoordinatorTask, workBucket, opResult)) {
processInterruption(runResult, resource, localCoordinatorTask, opResult);
return runResult;
}
afterResourceReconTimestamp = clock.currentTimeMillis();
if (isStage(stage, Stage.THIRD) && !performShadowReconciliation(resource, objectclassDef, reconStartTimestamp, afterResourceReconTimestamp, reconResult, localCoordinatorTask, opResult)) {
if (isStage(stage, Stage.THIRD) && !performShadowReconciliation(resource, objectclassDef, reconStartTimestamp,
afterResourceReconTimestamp, reconResult, localCoordinatorTask, workBucket, opResult)) {
processInterruption(runResult, resource, localCoordinatorTask, opResult);
return runResult;
}
Expand Down Expand Up @@ -469,8 +471,10 @@ private void processErrorPartial(TaskRunResult runResult, String errorDesc, Exce
}

// returns false in case of execution interruption
private boolean performResourceReconciliation(PrismObject<ResourceType> resource, ObjectClassComplexTypeDefinition objectclassDef,
ReconciliationTaskResult reconResult, Task coordinatorTask, OperationResult result)
private boolean performResourceReconciliation(PrismObject<ResourceType> resource,
ObjectClassComplexTypeDefinition objectclassDef,
ReconciliationTaskResult reconResult, Task localCoordinatorTask,
WorkBucketType workBucket, OperationResult result)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
SecurityViolationException, ExpressionEvaluationException {

Expand All @@ -481,26 +485,27 @@ private boolean performResourceReconciliation(PrismObject<ResourceType> resource
// Instantiate result handler. This will be called with every search
// result in the following iterative search
SynchronizeAccountResultHandler handler = new SynchronizeAccountResultHandler(resource.asObjectable(),
objectclassDef, "reconciliation", coordinatorTask, changeNotificationDispatcher, taskManager);
objectclassDef, "reconciliation", localCoordinatorTask, changeNotificationDispatcher, taskManager);
handler.setSourceChannel(SchemaConstants.CHANGE_CHANNEL_RECON);
handler.setStopOnError(false);
handler.setEnableSynchronizationStatistics(true);
handler.setEnableActionsExecutedStatistics(true);

coordinatorTask.setExpectedTotal(null);
localCoordinatorTask.setExpectedTotal(null);

try {

ObjectQuery query = objectclassDef.createShadowSearchQuery(resource.getOid());
query = narrowQueryForBucket(query, localCoordinatorTask, workBucket, objectclassDef, opResult);

OperationResult searchResult = new OperationResult(OperationConstants.RECONCILIATION+".searchIterative");

handler.createWorkerThreads(coordinatorTask, searchResult);
handler.createWorkerThreads(localCoordinatorTask, searchResult);
// note that progress is incremented within the handler, as it extends AbstractSearchIterativeResultHandler
provisioningService.searchObjectsIterative(ShadowType.class, query, null, handler, coordinatorTask, searchResult);
handler.completeProcessing(coordinatorTask, searchResult);
provisioningService.searchObjectsIterative(ShadowType.class, query, null, handler, localCoordinatorTask, searchResult);
handler.completeProcessing(localCoordinatorTask, searchResult);

interrupted = !coordinatorTask.canRun();
interrupted = !localCoordinatorTask.canRun();

opResult.computeStatus();

Expand Down Expand Up @@ -530,8 +535,10 @@ private boolean performResourceReconciliation(PrismObject<ResourceType> resource
}

// returns false in case of execution interruption
private boolean performShadowReconciliation(final PrismObject<ResourceType> resource, final ObjectClassComplexTypeDefinition objectclassDef,
long startTimestamp, long endTimestamp, ReconciliationTaskResult reconResult, final Task task, OperationResult result) throws SchemaException {
private boolean performShadowReconciliation(final PrismObject<ResourceType> resource,
final ObjectClassComplexTypeDefinition objectclassDef,
long startTimestamp, long endTimestamp, ReconciliationTaskResult reconResult, final Task localCoordinatorTask,
WorkBucketType workBucket, OperationResult result) throws SchemaException, ObjectNotFoundException {
boolean interrupted;

// find accounts
Expand All @@ -547,6 +554,9 @@ private boolean performShadowReconciliation(final PrismObject<ResourceType> reso
.and().item(ShadowType.F_RESOURCE_REF).ref(ObjectTypeUtil.createObjectRef(resource).asReferenceValue())
.and().item(ShadowType.F_OBJECT_CLASS).eq(objectclassDef.getTypeName())
.build();

query = narrowQueryForBucket(query, localCoordinatorTask, workBucket, objectclassDef, opResult);

if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Shadow recon query:\n{}", query.debugDump());
}
Expand All @@ -566,28 +576,28 @@ private boolean performShadowReconciliation(final PrismObject<ResourceType> reso
long started1 = System.currentTimeMillis();
PrismObject<ShadowType> resourceShadow;
try {
task.recordIterativeOperationStart(shadow.asObjectable());
resourceShadow = reconcileShadow(shadow, resource, task);
task.recordIterativeOperationEnd(shadow.asObjectable(), started1, null);
localCoordinatorTask.recordIterativeOperationStart(shadow.asObjectable());
resourceShadow = reconcileShadow(shadow, resource, localCoordinatorTask);
localCoordinatorTask.recordIterativeOperationEnd(shadow.asObjectable(), started1, null);
} catch (Throwable t) {
task.recordIterativeOperationEnd(shadow.asObjectable(), started1, t);
localCoordinatorTask.recordIterativeOperationEnd(shadow.asObjectable(), started1, t);
throw t;
}

if (ShadowUtil.isProtected(resourceShadow)) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Skipping recording counter for {} because it is protected", shadow);
}
return task.canRun();
return localCoordinatorTask.canRun();
}

countHolder.setValue(countHolder.getValue() + 1);
incrementAndRecordProgress(task, new OperationResult("dummy")); // reconcileShadow writes to its own dummy OperationResult, so we do the same here
return task.canRun();
incrementAndRecordProgress(localCoordinatorTask, new OperationResult("dummy")); // reconcileShadow writes to its own dummy OperationResult, so we do the same here
return localCoordinatorTask.canRun();
};

repositoryService.searchObjectsIterative(ShadowType.class, query, handler, null, true, opResult);
interrupted = !task.canRun();
interrupted = !localCoordinatorTask.canRun();

// for each try the operation again

Expand All @@ -606,6 +616,19 @@ private boolean performShadowReconciliation(final PrismObject<ResourceType> reso
return !interrupted;
}

private ObjectQuery narrowQueryForBucket(ObjectQuery query, Task localCoordinatorTask,
WorkBucketType workBucket, ObjectClassComplexTypeDefinition objectclassDef,
OperationResult opResult) throws SchemaException, ObjectNotFoundException {
return taskManager.narrowQueryForWorkBucket(query, ShadowType.class, itemPath -> {
if (itemPath.startsWithName(ShadowType.F_ATTRIBUTES)) {
return objectclassDef.findAttributeDefinition(itemPath.rest().asSingleName());
} else {
return null;
}
}, localCoordinatorTask,
workBucket, opResult);
}

private PrismObject<ShadowType> reconcileShadow(PrismObject<ShadowType> shadow, PrismObject<ResourceType> resource, Task task) {
OperationResult opResult = new OperationResult(OperationConstants.RECONCILIATION+".shadowReconciliation.object");
try {
Expand Down
Expand Up @@ -34,7 +34,7 @@
<syncext:resourceReconciliationWorkStateConfiguration>
<taskKind>coordinator</taskKind>
<stringBuckets>
<discriminator>icfs:name</discriminator>
<discriminator>attributes/icfs:name</discriminator>
<boundaryCharacters>abcdefghijklmnopqrstuvwxyz</boundaryCharacters>
</stringBuckets>
<workers>
Expand Down
Expand Up @@ -34,7 +34,7 @@
<syncext:resourceReconciliationWorkStateConfiguration>
<taskKind>coordinator</taskKind>
<stringBuckets>
<discriminator>icfs:name</discriminator>
<discriminator>attributes/icfs:name</discriminator>
<boundaryCharacters>agmt</boundaryCharacters>
</stringBuckets>
<workers>
Expand Down
Expand Up @@ -33,7 +33,7 @@
<syncext:resourceReconciliationWorkStateConfiguration>
<taskKind>coordinator</taskKind>
<stringBuckets>
<discriminator>icfs:name</discriminator>
<discriminator>attributes/icfs:name</discriminator>
<boundaryCharacters>abcdefghijklmnopqrstuvwxyz</boundaryCharacters>
</stringBuckets>
</syncext:resourceReconciliationWorkStateConfiguration>
Expand Down
Expand Up @@ -382,8 +382,9 @@ private ObjectQuery prepareQuery(H resultHandler,
}

try {
query = taskManager.narrowQueryForWorkBucket(localCoordinatorTask, query, type,
getIdentifierDefinitionProvider(localCoordinatorTask, opResult), workBucket, opResult);
query = taskManager.narrowQueryForWorkBucket(query, type,
getIdentifierDefinitionProvider(localCoordinatorTask, opResult), localCoordinatorTask,
workBucket, opResult);
} catch (SchemaException | ObjectNotFoundException e) {
throw new ExitHandlerException(
logErrorAndSetResult(runResult, resultHandler, "Exception while narrowing a search query", e,
Expand Down
Expand Up @@ -707,8 +707,8 @@ void resumeTaskTree(String coordinatorOid, OperationResult parentResult)
/**
* TODO. EXPERIMENTAL.
*/
ObjectQuery narrowQueryForWorkBucket(Task workerTask, ObjectQuery query, Class<? extends ObjectType> type,
Function<ItemPath, ItemDefinition<?>> itemDefinitionProvider,
ObjectQuery narrowQueryForWorkBucket(ObjectQuery query, Class<? extends ObjectType> type,
Function<ItemPath, ItemDefinition<?>> itemDefinitionProvider, Task workerTask,
WorkBucketType workBucket, OperationResult opResult) throws SchemaException, ObjectNotFoundException;

TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, TaskPartitioningStrategy partitioningStrategy);
Expand Down
Expand Up @@ -2116,8 +2116,8 @@ public WorkStateManager getWorkStateManager() {
}

@Override
public ObjectQuery narrowQueryForWorkBucket(Task workerTask, ObjectQuery query,
Class<? extends ObjectType> type, Function<ItemPath, ItemDefinition<?>> itemDefinitionProvider,
public ObjectQuery narrowQueryForWorkBucket(ObjectQuery query, Class<? extends ObjectType> type,
Function<ItemPath, ItemDefinition<?>> itemDefinitionProvider, Task workerTask,
WorkBucketType workBucket, OperationResult opResult)
throws SchemaException, ObjectNotFoundException {
return workStateManager.narrowQueryForWorkBucket(workerTask, query, type, itemDefinitionProvider, workBucket, opResult);
Expand Down
Expand Up @@ -97,7 +97,7 @@ public TaskWorkBucketProcessingResult run(Task task, WorkBucketType workBucket,
if (defaultQuery != null) {
ObjectQuery narrowedQuery;
try {
narrowedQuery = taskManager.narrowQueryForWorkBucket(task, defaultQuery, UserType.class, null, workBucket, opResult);
narrowedQuery = taskManager.narrowQueryForWorkBucket(defaultQuery, UserType.class, null, task, workBucket, opResult);
} catch (SchemaException | ObjectNotFoundException e) {
throw new SystemException("Couldn't narrow query for work bucket", e);
}
Expand Down

0 comments on commit 9304940

Please sign in to comment.