Skip to content

Commit

Permalink
Clean-up async update listening interface
Browse files Browse the repository at this point in the history
Introduced listening activity handle to identify particular activity
to be stopped via stopListeningForAsyncUpdates() method.

Also now supporting multiple async update sources for a given resource.
  • Loading branch information
mederly committed Mar 14, 2019
1 parent 8e69471 commit 351b964
Show file tree
Hide file tree
Showing 22 changed files with 489 additions and 345 deletions.
Expand Up @@ -83,6 +83,10 @@ public ResourceShadowDiscriminator(ShadowDiscriminatorType accRefType, String de
setKind(kind);
}

public ResourceShadowDiscriminator(String resourceOid) {
this.resourceOid = resourceOid;
}

public ResourceShadowDiscriminator(String resourceOid, QName objectClass) {
this.resourceOid = resourceOid;
this.objectClass = objectClass;
Expand Down
Expand Up @@ -186,7 +186,13 @@
</xsd:appinfo>
</xsd:annotation>
<xsd:sequence>
<!-- currently there are no common attributes now -->
<xsd:element name="sourceName" type="xsd:string" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Name of the async update source through which the message came.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:element name="asyncUpdateMessage" type="tns:AsyncUpdateMessageType"/>
Expand Down
Expand Up @@ -18,7 +18,9 @@

import com.evolveum.midpoint.provisioning.ucf.api.AsyncUpdateMessageListener;
import com.evolveum.midpoint.provisioning.ucf.api.AsyncUpdateSource;
import com.evolveum.midpoint.provisioning.ucf.api.ListeningActivity;
import com.evolveum.midpoint.provisioning.ucf.impl.builtin.async.AsyncUpdateConnectorInstance;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
Expand All @@ -42,26 +44,29 @@ public class MockAsyncUpdateSource implements AsyncUpdateSource {

public static final MockAsyncUpdateSource INSTANCE = new MockAsyncUpdateSource();

private static class DummyListeningActivityImpl implements ListeningActivity {
@Override
public void stop() {
// no-op
}
}

public static MockAsyncUpdateSource create(AsyncUpdateSourceType configuration, AsyncUpdateConnectorInstance connectorInstance) {
LOGGER.info("create() method called");
return INSTANCE;
}

@Override
public void startListening(AsyncUpdateMessageListener listener) throws SchemaException {
public ListeningActivity startListening(AsyncUpdateMessageListener listener) throws SchemaException {
LOGGER.info("startListening() method called");
for (AsyncUpdateMessageType message : messages) {
listener.process(message);
listener.onMessage(message);
}
return new DummyListeningActivityImpl();
}

@Override
public void stopListening() {
LOGGER.info("stopListening() method called");
}

@Override
public void test() {
public void test(OperationResult parentResult) {
LOGGER.info("test() method called");
}

Expand All @@ -78,9 +83,4 @@ public void prepareMessage(AsyncUpdateMessageType message) {
public void reset() {
messages.clear();
}

@Override
public void dispose() {
reset();
}
}
Expand Up @@ -111,9 +111,9 @@ public void test100AddAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -160,9 +160,9 @@ public void test110AddAlumniAndStaff() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_GROUP_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -207,9 +207,9 @@ public void test200AddAlumniForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -248,9 +248,9 @@ public void test210AddStaffForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -289,9 +289,9 @@ public void test220AddAlumniForLewis() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -330,9 +330,9 @@ public void test230AddLewis() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -371,9 +371,9 @@ public void test240AddStaffForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -412,9 +412,9 @@ public void test250DeleteAlumniForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -455,9 +455,9 @@ public void test310DeleteStaff() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_GROUP_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down
Expand Up @@ -111,9 +111,9 @@ public void test100AddAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -153,9 +153,9 @@ public void test110AddAlumniAndStaff() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_GROUP_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -200,9 +200,9 @@ public void test120AddWhite() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -241,9 +241,9 @@ public void test200AddAlumniForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -282,9 +282,9 @@ public void test210AddStaffForAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -323,9 +323,9 @@ public void test220AddAlumniForLewis() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -364,9 +364,9 @@ public void test230MentionLewis() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -405,9 +405,9 @@ public void test300DeleteAnderson() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_ACCOUNT_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down Expand Up @@ -440,9 +440,9 @@ public void test310DeleteStaff() throws Exception {

// WHEN

ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID, SchemaConstants.RI_GROUP_OBJECT_CLASS);
provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(coords, task, result);
ResourceShadowDiscriminator coords = new ResourceShadowDiscriminator(RESOURCE_GROUPER_OID);
String handle = provisioningService.startListeningForAsyncUpdates(coords, task, result);
provisioningService.stopListeningForAsyncUpdates(handle, task, result);

// THEN

Expand Down
Expand Up @@ -202,11 +202,21 @@ <T extends ObjectType> String addObject(PrismObject<T> object, OperationProvisio
int synchronize(ResourceShadowDiscriminator shadowCoordinates, Task task, TaskPartitionDefinitionType taskPartition, OperationResult parentResult) throws ObjectNotFoundException,
CommunicationException, SchemaException, ConfigurationException, SecurityViolationException, ExpressionEvaluationException, PolicyViolationException, PreconditionViolationException;

void startListeningForAsyncUpdates(ResourceShadowDiscriminator shadowCoordinates, Task task, OperationResult parentResult)
/**
* Starts listening for asynchronous updates for a given resource.
* Returns "listening activity handle" that will be used to stop the listening activity.
*
* Note that although it is possible to specify other parameters in addition to resource OID (e.g. objectClass), these
* settings are not supported now.
*/
String startListeningForAsyncUpdates(ResourceShadowDiscriminator shadowCoordinates, Task task, OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
ExpressionEvaluationException;

void stopListeningForAsyncUpdates(ResourceShadowDiscriminator shadowCoordinates, Task task, OperationResult parentResult)
/**
* Stops the given listening activity.
*/
void stopListeningForAsyncUpdates(String listeningActivityHandle, Task task, OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException,
ExpressionEvaluationException;

Expand Down
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2010-2019 Evolveum
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.evolveum.midpoint.provisioning.impl;

import com.evolveum.midpoint.provisioning.ucf.api.ListeningActivity;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Keeps track of active async update listening activities.
* Externally visible methods are synchronized to ensure thread safety.
*/
@Component
public class AsyncUpdateListeningRegistry {

private AtomicLong counter = new AtomicLong(0);

private Map<String, ListeningActivity> listeningActivities = new HashMap<>();

@NotNull
synchronized String registerListeningActivity(ListeningActivity activity) {
String handle = String.valueOf(counter.incrementAndGet());
listeningActivities.put(handle, activity);
return handle;
}

synchronized ListeningActivity getListeningActivity(@NotNull String handle) {
ListeningActivity listeningActivity = listeningActivities.get(handle);
if (listeningActivity == null) {
throw new IllegalArgumentException("Listening activity handle " + handle + " is unknown at this moment");
}
listeningActivities.remove(handle);
return listeningActivity;
}
}

0 comments on commit 351b964

Please sign in to comment.