Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Evolveum/midpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
KaterynaHonchar committed Nov 18, 2019
2 parents 83f745b + 774d794 commit 327532d
Show file tree
Hide file tree
Showing 37 changed files with 934 additions and 130 deletions.
Expand Up @@ -150,6 +150,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="connectionHandlingThreads" type="xsd:int" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Number of connection handling threads. The default is 10.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:extension>
</xsd:complexContent>
Expand Down
12 changes: 12 additions & 0 deletions infra/test-util/pom.xml
Expand Up @@ -115,5 +115,17 @@
<artifactId>test-ng</artifactId>
<version>4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations-java5</artifactId>
</dependency>
</dependencies>
</project>
@@ -1,13 +1,14 @@
/*
* Copyright (c) 2010-2019 Evolveum and contributors
* Copyright (c) 2019 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.provisioning.impl.async;
package com.evolveum.midpoint.test.amqp;

import com.evolveum.midpoint.provisioning.ucf.impl.builtin.async.sources.Amqp091AsyncUpdateSource;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand All @@ -23,39 +24,45 @@

public class EmbeddedBroker {

protected static final Trace LOGGER = TraceManager.getTrace(EmbeddedBroker.class);

private static final String DEFAULT_CONFIG_RESOURCE_PATH = "amqp/default-qpid-config.json";

private final SystemLauncher broker = new SystemLauncher();

void start() throws Exception {
public void start() throws Exception {
start(DEFAULT_CONFIG_RESOURCE_PATH);
}

public void start(String configResourcePath) throws Exception {
System.out.println("Starting the broker");
Map<String, Object> attributes = new HashMap<>();
attributes.put("type", "Memory");
attributes.put("initialConfigurationLocation", findResourcePath("async/qpid-config.json"));
attributes.put("initialConfigurationLocation", findResourcePath(configResourcePath));
broker.startup(attributes);
}

private String findResourcePath(String fileName) {
return EmbeddedBroker.class.getClassLoader().getResource(fileName).toExternalForm();
}

void stop() {
public void stop() {
System.out.println("Stopping the broker");
broker.shutdown();
}

void send(String queueName, String message) throws IOException, TimeoutException {
public void send(String queueName, String message, Map<String, Object> headers) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.basicPublish("", queueName, createProperties(), message.getBytes(StandardCharsets.UTF_8));
System.out.println("Sent '" + message + "'");
channel.basicPublish("", queueName, createProperties(headers), message.getBytes(StandardCharsets.UTF_8));
LOGGER.trace("Sent '{}'", message);
}
}

@NotNull
private AMQP.BasicProperties createProperties() {
Map<String, Object> headers = new HashMap<>();
headers.put(Amqp091AsyncUpdateSource.HEADER_LAST_MESSAGE, true);
private AMQP.BasicProperties createProperties(Map<String, Object> headers) {
return new AMQP.BasicProperties()
.builder()
.headers(headers)
Expand Down
Expand Up @@ -1851,14 +1851,19 @@ public <O extends ObjectType> String getArchetypeOid(O object) throws SchemaExce

// temporary
public MessageWrapper wrap(AsyncUpdateMessageType message) {
return new MessageWrapper(message);
return new MessageWrapper(message, prismContext);
}

// temporary
public Map<String, Object> getMessageBodyAsMap(AsyncUpdateMessageType message) throws IOException {
return wrap(message).getBodyAsMap();
}

// temporary
public Item<?, ?> getMessageBodyAsPrismItem(AsyncUpdateMessageType message) throws SchemaException {
return wrap(message).getBodyAsPrismItem(PrismContext.LANG_XML);
}

@Override
public <O extends ObjectType> void addRecomputeTrigger(O object, Long timestamp) throws ObjectAlreadyExistsException,
SchemaException, ObjectNotFoundException {
Expand Down
Expand Up @@ -1117,4 +1117,31 @@ public static LensContext.ExportType getExportTypeTraceOrReduced(TraceType trace
public static <AH extends AssignmentHolderType> ItemDelta getAprioriItemDelta(ObjectDelta<AH> focusDelta, ItemPath itemPath) {
return focusDelta != null ? focusDelta.findItemDelta(itemPath) : null;
}

public static <O extends ObjectType> String determineExplicitArchetypeOid(PrismObject<O> object) {
String explicitArchetypeOid = null;
// Used in cases where archetype assignment haven't had the change to be processed yet.
// E.g. in case that we are creating a new object with archetype assignment
if (object.canRepresent(AssignmentHolderType.class)) {
AssignmentHolderType assignmentHolderType = (AssignmentHolderType)object.asObjectable();
List<ObjectReferenceType> archetypeRefs = assignmentHolderType.getArchetypeRef();
if (archetypeRefs.isEmpty()) {
explicitArchetypeOid = determineExplicitArchetypeOidFromAssignments(object);
}
}
return explicitArchetypeOid;
}

public static <O extends ObjectType> String determineExplicitArchetypeOidFromAssignments(PrismObject<O> object) {
String explicitArchetypeOid = null;
if (object.canRepresent(AssignmentHolderType.class)) {
for (AssignmentType assignment : ((AssignmentHolderType)object.asObjectable()).getAssignment()) {
ObjectReferenceType targetRef = assignment.getTargetRef();
if (targetRef != null && QNameUtil.match(ArchetypeType.COMPLEX_TYPE, targetRef.getType())) {
explicitArchetypeOid = targetRef.getOid();
}
}
}
return explicitArchetypeOid;
}
}
Expand Up @@ -504,7 +504,7 @@ private <F extends ObjectType> ArchetypePolicyType determineArchetypePolicy(Lens
return null;
}
PrismObject<F> object = context.getFocusContext().getObjectAny();
String explicitArchetypeOid = determineExplicitArchetypeOid(context);
String explicitArchetypeOid = LensUtil.determineExplicitArchetypeOid(context.getFocusContext().getObjectAny());
return archetypeManager.determineArchetypePolicy(object, explicitArchetypeOid, result);
}

Expand All @@ -519,7 +519,7 @@ public <F extends AssignmentHolderType> ArchetypeType updateArchetype(LensContex

PrismObject<F> object = context.getFocusContext().getObjectAny();

String explicitArchetypeOid = determineExplicitArchetypeOid(context);
String explicitArchetypeOid = LensUtil.determineExplicitArchetypeOid(context.getFocusContext().getObjectAny());
PrismObject<ArchetypeType> archetype = archetypeManager.determineArchetype(object, explicitArchetypeOid, result);
ArchetypeType archetypeType = null;
if (archetype != null) {
Expand All @@ -531,26 +531,6 @@ public <F extends AssignmentHolderType> ArchetypeType updateArchetype(LensContex
return archetypeType;
}

private <O extends ObjectType> String determineExplicitArchetypeOid(LensContext<O> context) {
PrismObject<O> object = context.getFocusContext().getObjectAny();
String explicitArchetypeOid = null;
// Used in cases where archetype assignment haven't had the change to be processed yet.
// E.g. in case that we are creating a new object with archetype assignment
if (object.canRepresent(AssignmentHolderType.class)) {
AssignmentHolderType assignmentHolderType = (AssignmentHolderType)object.asObjectable();
List<ObjectReferenceType> archetypeRefs = assignmentHolderType.getArchetypeRef();
if (archetypeRefs.isEmpty()) {
for (AssignmentType assignment : assignmentHolderType.getAssignment()) {
ObjectReferenceType targetRef = assignment.getTargetRef();
if (targetRef != null && QNameUtil.match(ArchetypeType.COMPLEX_TYPE, targetRef.getType())) {
explicitArchetypeOid = targetRef.getOid();
}
}
}
}
return explicitArchetypeOid;
}

public <F extends ObjectType> void updateArchetypePolicy(LensContext<F> context, Task task, OperationResult result) throws SchemaException, ConfigurationException {
if (context.getFocusContext() == null) {
return;
Expand Down
Expand Up @@ -119,6 +119,8 @@ private <AH extends AssignmentHolderType> void processFocusFocus(LensContext<AH>
LensFocusContext<AH> focusContext = context.getFocusContext();
PartialProcessingOptionsType partialProcessingOptions = context.getPartialProcessingOptions();

checkArchetypeRefDelta(context);

boolean resetOnRename = true; // This is fixed now. TODO: make it configurable

boolean wasResetOnIterationSpecificationChange = false;
Expand Down Expand Up @@ -808,6 +810,36 @@ private <AH extends AssignmentHolderType> void addIterationTokenDeltas(LensFocus

}

private <F extends ObjectType> void checkArchetypeRefDelta(LensContext<F> context) throws PolicyViolationException {
ObjectDelta<F> focusPrimaryDelta = context.getFocusContext().getPrimaryDelta();
if (focusPrimaryDelta != null) {
ReferenceDelta archetypeRefDelta = focusPrimaryDelta.findReferenceModification(AssignmentHolderType.F_ARCHETYPE_REF);
if (archetypeRefDelta != null) {
// We want to allow this under special circumstances. E.g. we want be able to import user with archetypeRef.
// Otherwise we won't be able to export a user and re-import it again.
if (focusPrimaryDelta.isAdd()) {
String archetypeOidFromAssignments = LensUtil.determineExplicitArchetypeOidFromAssignments(focusPrimaryDelta.getObjectToAdd());
if (archetypeOidFromAssignments == null) {
throw new PolicyViolationException("Attempt add archetypeRef without a matching assignment");
} else {
boolean match = true;
for (PrismReferenceValue archetypeRefDeltaVal : archetypeRefDelta.getValuesToAdd()) {
if (!archetypeOidFromAssignments.equals(archetypeRefDeltaVal.getOid())) {
match = false;
}
}
if (match) {
return;
} else {
throw new PolicyViolationException("Attempt add archetypeRef that does not match assignment");
}
}
}
throw new PolicyViolationException("Attempt to modify archetypeRef directly");
}
}
}

// private <F extends FocusType> void processAssignmentActivation(LensContext<F> context, XMLGregorianCalendar now,
// OperationResult result) throws SchemaException {
// DeltaSetTriple<EvaluatedAssignmentImpl<?>> evaluatedAssignmentTriple = context.getEvaluatedAssignmentTriple();
Expand Down
Expand Up @@ -1097,14 +1097,6 @@ public <F extends ObjectType> void processMembershipAndDelegatedRefs(LensContext
return;
}

ObjectDelta<F> focusPrimaryDelta = focusContext.getPrimaryDelta();
if (focusPrimaryDelta != null) {
ReferenceDelta archetypeRefDelta = focusPrimaryDelta.findReferenceModification(AssignmentHolderType.F_ARCHETYPE_REF);
if (archetypeRefDelta != null) {
throw new PolicyViolationException("Attempt to modify archetypeRef directly");
}
}

Collection<PrismReferenceValue> shouldBeRoleRefs = new ArrayList<>();
Collection<PrismReferenceValue> shouldBeDelegatedRefs = new ArrayList<>();
Collection<PrismReferenceValue> shouldBeArchetypeRefs = new ArrayList<>();
Expand Down
Expand Up @@ -7,6 +7,9 @@

package com.evolveum.midpoint.model.impl.messaging;

import com.evolveum.midpoint.prism.Item;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.Amqp091MessageAttributesType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.Amqp091MessageType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.AsyncUpdateMessageType;
Expand All @@ -24,15 +27,17 @@
public class MessageWrapper {

@NotNull private final AsyncUpdateMessageType message;
@NotNull private final PrismContext prismContext;

private static final TypeReference<Map<String, Object>> MAP_TYPE = new MapTypeReference();
private static final ObjectMapper MAPPER = new ObjectMapper();

private static class MapTypeReference extends TypeReference<Map<String, Object>> {
}

public MessageWrapper(@NotNull AsyncUpdateMessageType message) {
public MessageWrapper(@NotNull AsyncUpdateMessageType message, @NotNull PrismContext prismContext) {
this.message = message;
this.prismContext = prismContext;
}

public Amqp091MessageAttributesType getAttributes() {
Expand All @@ -55,4 +60,9 @@ public Map<String, Object> getBodyAsMap() throws IOException {
String json = getText();
return MAPPER.readValue(json, MAP_TYPE);
}

public Item<?,?> getBodyAsPrismItem(String language) throws SchemaException {
String text = getText();
return prismContext.parserFor(text).language(language).parseItem();
}
}
Expand Up @@ -1170,10 +1170,12 @@ public SynchronizationEventInformation(PrismObject<? extends ShadowType> current
objectOid = currentShadow.getOid();
}
task.recordSynchronizationOperationStart(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE, objectOid);
if (SchemaConstants.CHANGE_CHANNEL_LIVE_SYNC_URI.equals(channel)) {
// livesync processing is not controlled via model -> so we cannot do this in upper layers
task.recordIterativeOperationStart(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE, objectOid);
}
// if (isLiveSyncOrAsyncUpdate(channel)) {
// // livesync/async-update processing is not controlled via model -> so we cannot do this in upper layers
// // Note that this is not quite correct: Using this method we capture only operations that reach the model
// // (not the ones that end up updating the shadows)
// task.recordIterativeOperationStart(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE, objectOid);
// }
}

public void setProtected() {
Expand Down Expand Up @@ -1232,12 +1234,17 @@ public void setException(Exception ex) {
public void record(Task task) {
task.recordSynchronizationOperationEnd(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE,
objectOid, started, exception, originalStateIncrement, newStateIncrement);
if (SchemaConstants.CHANGE_CHANNEL_LIVE_SYNC_URI.equals(channel)) {
// livesync processing is not controlled via model -> so we cannot do this in upper layers
task.recordIterativeOperationEnd(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE,
objectOid, started, exception);
}
// if (isLiveSyncOrAsyncUpdate(channel)) {
// // livesync/async-update processing is not controlled via model -> so we cannot do this in upper layers
// task.recordIterativeOperationEnd(objectName, objectDisplayName, ShadowType.COMPLEX_TYPE,
// objectOid, started, exception);
// }
}
}

private static boolean isLiveSyncOrAsyncUpdate(String channel) {
return SchemaConstants.CHANGE_CHANNEL_LIVE_SYNC_URI.equals(channel)
|| SchemaConstants.CHANGE_CHANNEL_ASYNC_UPDATE_URI.equals(channel);
}

}

0 comments on commit 327532d

Please sign in to comment.