Skip to content

Commit

Permalink
Fix provisioning async update tests
Browse files Browse the repository at this point in the history
And other minor changes.
  • Loading branch information
mederly committed Mar 16, 2019
1 parent 5bef7dd commit 1f2c7a8
Show file tree
Hide file tree
Showing 16 changed files with 53 additions and 146 deletions.
Expand Up @@ -29,7 +29,6 @@
import com.evolveum.midpoint.model.impl.importer.ImportAccountsFromResourceTaskHandler;
import com.evolveum.midpoint.model.impl.importer.ObjectImporter;
import com.evolveum.midpoint.model.impl.lens.*;
import com.evolveum.midpoint.model.impl.messaging.MessageProcessor;
import com.evolveum.midpoint.model.impl.scripting.ExecutionContext;
import com.evolveum.midpoint.model.impl.scripting.ScriptingExpressionEvaluator;
import com.evolveum.midpoint.model.impl.util.ModelImplUtils;
Expand Down Expand Up @@ -157,7 +156,6 @@ public class ModelController implements ModelService, TaskService, WorkflowServi
@Autowired private CacheRegistry cacheRegistry;
@Autowired private ClockworkMedic clockworkMedic;
@Autowired private ChangeNotificationDispatcher dispatcher;
@Autowired private MessageProcessor messageProcessor;
@Autowired
@Qualifier("cacheRepositoryService")
private transient RepositoryService cacheRepositoryService;
Expand Down

This file was deleted.

Expand Up @@ -28,7 +28,7 @@
import java.util.Map;

/**
*
* TODO move somewhere else or rewrite
*/
public class MessageWrapper {

Expand Down
Expand Up @@ -17,10 +17,10 @@
package com.evolveum.midpoint.model.impl.sync;

import com.evolveum.midpoint.model.impl.ModelConstants;
import com.evolveum.midpoint.model.impl.sync.SyncTaskHelper.TargetInfo;
import com.evolveum.midpoint.model.impl.util.ModelImplUtils;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.schema.ResourceShadowDiscriminator;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
Expand Down Expand Up @@ -64,7 +64,6 @@ public class AsyncUpdateTaskHandler implements TaskHandler {
@PostConstruct
private void initialize() {
taskManager.registerHandler(HANDLER_URI, this);
System.out.println("URI = " + HANDLER_URI);
}

@NotNull
Expand All @@ -90,16 +89,16 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition

final String CTX = "Async Update";

ResourceShadowDiscriminator coords = helper.getCoords(LOGGER, task, opResult, runResult, CTX);
if (coords == null) {
TargetInfo targetInfo = helper.getTargetInfo(LOGGER, task, opResult, runResult, CTX);
if (targetInfo == null) {
return runResult;
}

String listeningActivityHandle = null;
try {
ModelImplUtils.clearRequestee(task); // todo is this needed?
listeningActivityHandle = provisioningService.startListeningForAsyncUpdates(coords, task, opResult);
System.out.println("Handle = " + listeningActivityHandle);
listeningActivityHandle = provisioningService.startListeningForAsyncUpdates(targetInfo.coords, task, opResult);
LOGGER.info("Started listening for async updates on {} with handle {}", targetInfo.resource, listeningActivityHandle);

long lastCheck = 0;

Expand All @@ -108,7 +107,7 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition
lastCheck = System.currentTimeMillis();
AsyncUpdateListeningActivityInformationType info = provisioningService
.getAsyncUpdatesListeningActivityInformation(listeningActivityHandle, task, opResult);
LOGGER.info("Listening activity {} state:\n{}", listeningActivityHandle,
LOGGER.info("Listening activity {} state:\n{}", listeningActivityHandle, // todo trace
prismContext.xmlSerializer().root(new QName("info")).serializeRealValue(info));
if (isAllDown(info)) {
throw new SystemException("All listening activities are down, suspending the task: " + task);
Expand All @@ -121,7 +120,7 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition
Thread.sleep(TASK_CHECK_INTERVAL);
}
} catch (RuntimeException | ObjectNotFoundException | SchemaException | CommunicationException | ConfigurationException |
ExpressionEvaluationException | InterruptedException | SecurityViolationException e) {
ExpressionEvaluationException | InterruptedException e) {
helper.processException(LOGGER, e, opResult, runResult, partition, CTX);
return runResult;
} finally {
Expand Down
Expand Up @@ -16,9 +16,9 @@
package com.evolveum.midpoint.model.impl.sync;

import com.evolveum.midpoint.model.impl.ModelConstants;
import com.evolveum.midpoint.model.impl.sync.SyncTaskHelper.TargetInfo;
import com.evolveum.midpoint.model.impl.util.ModelImplUtils;
import com.evolveum.midpoint.provisioning.api.ProvisioningService;
import com.evolveum.midpoint.schema.ResourceShadowDiscriminator;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationConstants;
import com.evolveum.midpoint.schema.result.OperationResult;
Expand Down Expand Up @@ -86,8 +86,8 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition

final String CTX = "Live Sync";

ResourceShadowDiscriminator coords = helper.getCoords(LOGGER, task, opResult, runResult, CTX);
if (coords == null) {
TargetInfo targetInfo = helper.getTargetInfo(LOGGER, task, opResult, runResult, CTX);
if (targetInfo == null) {
return runResult;
}

Expand All @@ -98,7 +98,7 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition
// This will detect the changes and notify model about them.
// It will use extension of task to store synchronization state
ModelImplUtils.clearRequestee(task);
changesProcessed = provisioningService.synchronize(coords, task, partition, opResult);
changesProcessed = provisioningService.synchronize(targetInfo.coords, task, partition, opResult);
} catch (Throwable t) {
helper.processException(LOGGER, t, opResult, runResult, partition, CTX);
return runResult;
Expand All @@ -109,7 +109,7 @@ public TaskRunResult run(RunningTask task, TaskPartitionDefinitionType partition

// This "run" is finished. But the task goes on ...
runResult.setRunResultStatus(TaskRunResultStatus.FINISHED);
LOGGER.trace("LiveSyncTaskHandler.run stopping (resource {})", coords.getResourceOid());
LOGGER.trace("LiveSyncTaskHandler.run stopping (resource {})", targetInfo.resource);
return runResult;
}

Expand Down
Expand Up @@ -46,7 +46,22 @@ public class SyncTaskHelper {
@Autowired private ProvisioningService provisioningService;
@Autowired private PrismContext prismContext;

ResourceShadowDiscriminator getCoords(Trace LOGGER, Task task, OperationResult opResult, TaskRunResult runResult, String ctx) {
public class TargetInfo {
final public ResourceShadowDiscriminator coords;
final public ResourceType resource;
final public RefinedResourceSchema refinedResourceSchema;
final public ObjectClassComplexTypeDefinition objectClassDefinition;

public TargetInfo(ResourceShadowDiscriminator coords, ResourceType resource,
RefinedResourceSchema refinedResourceSchema, ObjectClassComplexTypeDefinition objectClassDefinition) {
this.coords = coords;
this.resource = resource;
this.refinedResourceSchema = refinedResourceSchema;
this.objectClassDefinition = objectClassDefinition;
}
}

TargetInfo getTargetInfo(Trace LOGGER, Task task, OperationResult opResult, TaskRunResult runResult, String ctx) {
String resourceOid = getResourceOid(LOGGER, task, opResult, runResult, ctx);
if (resourceOid == null) {
return null;
Expand All @@ -73,7 +88,9 @@ ResourceShadowDiscriminator getCoords(Trace LOGGER, Task task, OperationResult o
LOGGER.debug("{}: Processing all object classes", ctx);
}

return new ResourceShadowDiscriminator(resourceOid, objectClass==null?null:objectClass.getTypeName());
return new TargetInfo(
new ResourceShadowDiscriminator(resourceOid, objectClass==null?null:objectClass.getTypeName()),
resource, refinedSchema, objectClass);
}

public String getResourceOid(Trace logger, Task task, OperationResult opResult, TaskRunResult runResult, String ctx) {
Expand Down
Expand Up @@ -89,7 +89,6 @@
EXPORTED_GROUP_PREFIX = 'ref:'
RELEVANT_SOURCE_ID = 'ldap'

this.binding.variables.each {k,v -> println "$k = $v"}
esbEvent = midpoint.getMessageBodyAsMap(message)['esbEvent'][0]
log.info('esbEvent = {}', esbEvent)
eventType = esbEvent['eventType']
Expand Down
7 changes: 7 additions & 0 deletions model/model-intest/testng-integration.xml
Expand Up @@ -94,6 +94,13 @@
<class name="com.evolveum.midpoint.model.intest.sync.TestObjTemplateLiveSyncTask"/>
</classes>
</test>
<test name="Async updates" preserve-order="true" parallel="false" verbose="10">
<classes>
<class name="com.evolveum.midpoint.model.intest.async.TestNotifyChange"/>
<class name="com.evolveum.midpoint.model.intest.async.TestAsyncUpdateUcf"/>
<class name="com.evolveum.midpoint.model.intest.async.TestAsyncUpdateGrouperJson"/>
</classes>
</test>
<test name="Importer" preserve-order="true" parallel="false" verbose="10">
<classes>
<class name="com.evolveum.midpoint.model.intest.importer.XmlImportTest"/>
Expand Down
Expand Up @@ -204,7 +204,7 @@ int synchronize(ResourceShadowDiscriminator shadowCoordinates, Task task, TaskPa
*/
String startListeningForAsyncUpdates(ResourceShadowDiscriminator shadowCoordinates, Task task, OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
ExpressionEvaluationException, SecurityViolationException;
ExpressionEvaluationException;

/**
* Stops the given listening activity.
Expand Down
Expand Up @@ -382,7 +382,7 @@ public int synchronize(ResourceShadowDiscriminator shadowCoordinates, Task task,
public String startListeningForAsyncUpdates(@NotNull ResourceShadowDiscriminator shadowCoordinates, @NotNull Task task,
@NotNull OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
ExpressionEvaluationException, SecurityViolationException {
ExpressionEvaluationException {
String resourceOid = shadowCoordinates.getResourceOid();
Validate.notNull(resourceOid, "Resource oid must not be null.");

Expand All @@ -394,7 +394,7 @@ public String startListeningForAsyncUpdates(@NotNull ResourceShadowDiscriminator
try {
LOGGER.trace("Starting listening for async updates for {}", shadowCoordinates);
listeningActivityHandle = shadowCache.startListeningForAsyncUpdates(shadowCoordinates, task, result);
} catch (ObjectNotFoundException | CommunicationException | SchemaException | ConfigurationException | ExpressionEvaluationException | RuntimeException | Error | SecurityViolationException e) {
} catch (ObjectNotFoundException | CommunicationException | SchemaException | ConfigurationException | ExpressionEvaluationException | RuntimeException | Error e) {
ProvisioningUtil.recordFatalError(LOGGER, result, null, e);
result.summarize(true);
throw e;
Expand Down
Expand Up @@ -1859,8 +1859,7 @@ public List<Change> fetchChanges(ProvisioningContext ctx, PrismProperty<?> lastT

String startListeningForAsyncUpdates(@NotNull ProvisioningContext ctx,
@NotNull ChangeListener outerListener, @NotNull OperationResult parentResult) throws SchemaException,
CommunicationException, ConfigurationException, ObjectNotFoundException, ExpressionEvaluationException,
SecurityViolationException {
CommunicationException, ConfigurationException, ObjectNotFoundException, ExpressionEvaluationException {

LOGGER.trace("START start listening for async updates, objectClass: {}", ctx.getObjectClassDefinition());
ConnectorInstance connector = ctx.getConnector(AsyncUpdateCapabilityType.class, parentResult);
Expand Down
Expand Up @@ -2391,7 +2391,7 @@ public int synchronize(ResourceShadowDiscriminator shadowCoordinates, PrismPrope

String startListeningForAsyncUpdates(ResourceShadowDiscriminator shadowCoordinates, Task task, OperationResult parentResult)
throws ObjectNotFoundException, CommunicationException, SchemaException, ConfigurationException,
ExpressionEvaluationException, SecurityViolationException {
ExpressionEvaluationException {
InternalMonitor.recordCount(InternalCounters.PROVISIONING_ALL_EXT_OPERATION_COUNT);

ProvisioningContext ctx = ctxFactory.create(shadowCoordinates, task, parentResult);
Expand Down
1 change: 1 addition & 0 deletions provisioning/provisioning-impl/testng-integration.xml
Expand Up @@ -77,6 +77,7 @@
<classes>
<class name="com.evolveum.midpoint.provisioning.impl.async.TestAsyncUpdateCaching" />
<class name="com.evolveum.midpoint.provisioning.impl.async.TestAsyncUpdateNoCaching" />
<class name="com.evolveum.midpoint.provisioning.impl.async.TestAsyncUpdateCachingAmqp" />
</classes>
</test>
</suite>
Expand Up @@ -336,8 +336,7 @@ List<Change> fetchChanges(ObjectClassComplexTypeDefinition objectClass, PrismPro
*/
void dispose();

default ListeningActivity startListeningForChanges(ChangeListener changeListener, OperationResult parentResult)
throws SchemaException, SecurityViolationException {
default ListeningActivity startListeningForChanges(ChangeListener changeListener, OperationResult parentResult) throws SchemaException {
throw new UnsupportedOperationException();
}
}

0 comments on commit 1f2c7a8

Please sign in to comment.