Skip to content

Commit

Permalink
Support configuration changes in AsyncUpdateConn.
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed Mar 14, 2019
1 parent 351b964 commit ff48aa7
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 120 deletions.
Expand Up @@ -83,4 +83,11 @@ public void prepareMessage(AsyncUpdateMessageType message) {
public void reset() {
messages.clear();
}

@Override
public String toString() {
return "MockAsyncUpdateSource{" +
"messages:" + messages.size() +
'}';
}
}
Expand Up @@ -76,4 +76,11 @@ public void prepareMessage(UcfChangeType changeDescription) {
public void reset() {
messages.clear();
}

@Override
public String toString() {
return "MockAsyncUpdateSource{" +
"messages:" + messages.size() +
'}';
}
}
Expand Up @@ -96,7 +96,7 @@ public void setPrismContext(PrismContext prismContext) {
this.prismContext = prismContext;
}

protected ResourceSchema getResourceSchema() {
public ResourceSchema getResourceSchema() {
return resourceSchema;
}

Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Async Update source for AMQP 0.9.1 brokers.
Expand All @@ -51,6 +52,8 @@ public class Amqp091AsyncUpdateSource implements AsyncUpdateSource {
@NotNull private final PrismContext prismContext;
@NotNull private final ConnectionFactory connectionFactory;

private static final long CONNECTION_CLOSE_TIMEOUT = 5000L;

private Amqp091AsyncUpdateSource(@NotNull Amqp091SourceType sourceConfiguration, @NotNull AsyncUpdateConnectorInstance connectorInstance) {
this.sourceConfiguration = sourceConfiguration;
this.prismContext = connectorInstance.getPrismContext();
Expand All @@ -61,20 +64,19 @@ private class ListeningActivityImpl implements ListeningActivity {

private Connection activeConnection;
private Channel activeChannel;
private String activeConsumerTag;

@Override
public void stop() {
silentlyCloseActiveConnection();
}
private final AtomicInteger messagesBeingProcessed = new AtomicInteger(0);

private void startListening(AsyncUpdateMessageListener listener) {
private ListeningActivityImpl(AsyncUpdateMessageListener listener) {
try {
activeConnection = connectionFactory.newConnection();
activeChannel = activeConnection.createChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
byte[] body = message.getBody();
System.out.println("Body: " + new String(body, StandardCharsets.UTF_8));
try {
messagesBeingProcessed.incrementAndGet();
byte[] body = message.getBody();
System.out.println("Body: " + new String(body, StandardCharsets.UTF_8));
boolean successful = listener.onMessage(createAsyncUpdateMessage(message));
if (successful) {
activeChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
Expand All @@ -84,19 +86,58 @@ private void startListening(AsyncUpdateMessageListener listener) {
} catch (RuntimeException | SchemaException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Got exception while processing message", e);
rejectMessage(message);
} finally {
messagesBeingProcessed.decrementAndGet();
}
};
String activeConsumerTag = activeChannel
activeConsumerTag = activeChannel
.basicConsume(sourceConfiguration.getQueue(), false, deliverCallback, consumerTag -> {});
System.out.println("Opened consumer " + activeConsumerTag);
} catch (Throwable t) {
LoggingUtils.logUnexpectedException(LOGGER, "Exception on AMQP", t);
} catch (RuntimeException | IOException | TimeoutException e) {
if (activeConnection != null) {
silentlyCloseActiveConnection();
}
throw new SystemException("Couldn't start listening on " + listener + ": " + e.getMessage(), e);
}
}

@Override
public void stop() {
if (activeChannel != null && activeConsumerTag != null) {
LOGGER.info("Cancelling consumer {} on {}", activeConsumerTag, activeChannel);
try {
activeChannel.basicCancel(activeConsumerTag);
} catch (IOException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't cancel consumer {} on channel {}", e, activeConsumerTag, activeChannel);
}
}

// wait until remaining messages are processed
long start = System.currentTimeMillis();
while (messagesBeingProcessed.get() > 0 && System.currentTimeMillis() - start < CONNECTION_CLOSE_TIMEOUT) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.warn("Waiting for connection to be closed was interrupted");
break;
}
}
if (messagesBeingProcessed.get() > 0) {
LOGGER.warn("Closing the connection even if {} messages are being processed; they will be unacknowledged", messagesBeingProcessed.get());
}
if (activeConnection != null) {
silentlyCloseActiveConnection();
}
}

@Override
public String toString() {
return "AMQP091-ListeningActivityImpl{" +
"connection=" + activeConnection +
", consumerTag='" + activeConsumerTag + '\'' +
'}';
}

private void rejectMessage(Delivery message) throws IOException {
// TODO implement a policy to selectively requeue or discard messages
activeChannel.basicReject(message.getEnvelope().getDeliveryTag(), true);
Expand Down Expand Up @@ -130,9 +171,7 @@ public static Amqp091AsyncUpdateSource create(AsyncUpdateSourceType configuratio

@Override
public ListeningActivity startListening(AsyncUpdateMessageListener listener) {
ListeningActivityImpl listeningActivity = new ListeningActivityImpl();
listeningActivity.startListening(listener);
return listeningActivity;
return new ListeningActivityImpl(listener);
}

@NotNull
Expand Down Expand Up @@ -168,5 +207,4 @@ public void test(OperationResult parentResult) {
throw new SystemException("Couldn't connect to AMQP queue: " + e.getMessage(), e);
}
}

}
Expand Up @@ -23,6 +23,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
*
Expand Down Expand Up @@ -57,24 +58,18 @@ public void validate() {
}
}

public AsyncUpdateSourceType getSingleSourceConfiguration() {
List<AsyncUpdateSourceType> allSources = getAllSources();
if (allSources.isEmpty()) {
throw new IllegalStateException("No asynchronous update sources were configured");
} else if (allSources.size() > 1) {
throw new IllegalStateException("Multiple asynchronous update sources were configured. This is not supported yet.");
} else {
return allSources.get(0);
}
}

@NotNull
public List<AsyncUpdateSourceType> getAllSources() {
List<AsyncUpdateSourceType> getAllSources() {
List<AsyncUpdateSourceType> allSources = new ArrayList<>();
if (sources != null) {
allSources.addAll(sources.getAmqp091());
allSources.addAll(sources.getOther());
}
return allSources;
}

boolean hasSourcesChanged(AsyncUpdateConnectorConfiguration other) {
// we can consider weaker comparison here in the future
return other == null || !Objects.equals(other.sources, sources);
}
}
Expand Up @@ -30,28 +30,26 @@
import com.evolveum.midpoint.schema.statistics.ConnectorOperationalStatus;
import com.evolveum.midpoint.task.api.StateReporter;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.util.logging.LoggingUtils;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ExpressionType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ShadowType;
import com.evolveum.midpoint.xml.ns._public.resource.capabilities_3.AsyncUpdateCapabilityType;
import com.evolveum.midpoint.xml.ns._public.resource.capabilities_3.PagedSearchCapabilityType;
import com.evolveum.midpoint.xml.ns._public.resource.capabilities_3.ReadCapabilityType;

import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Connector that is able to obtain and process asynchronous updates.
* It can be used to receive messages from JMS or AMQP messaging systems; or maybe from REST calls in the future.
*
* Currently we keep no state besides the configuration. It is because calls to this connector should be really infrequent.
* Sources are therefore instantiated on demand, e.g. on test() or startListening() calls.
* Currently we keep no state besides the configuration and open listening activities. It is because calls to this
* connector should be really infrequent. Sources are therefore instantiated on demand, e.g. on test() or startListening() calls.
*
* TODO we should do something reasonable on configuration change: probably we should restart all the listening activities
* (we cannot simply cancel them; but restarting seems appropriate; otherwise all Async Update tasks would continue with
* the old configuration)
*/
@SuppressWarnings("DefaultAnnotationParam")
@ManagedConnector(type="AsyncUpdateConnector", version="1.0.0")
Expand All @@ -72,14 +70,37 @@ public class AsyncUpdateConnectorInstance extends AbstractManagedConnectorInstan
*/
private UcfExpressionEvaluator ucfExpressionEvaluator;

/**
* Open listening activities. Needed mainly to be able to restart them on configuration change.
*/
private final Collection<ConnectorInstanceListeningActivity> openListeningActivities = ConcurrentHashMap.newKeySet();

@ManagedConnectorConfiguration
public AsyncUpdateConnectorConfiguration getConfiguration() {
return configuration;
}

public void setConfiguration(AsyncUpdateConnectorConfiguration configuration) {
configuration.validate();
boolean sourcesChanged = configuration.hasSourcesChanged(this.configuration);
this.configuration = configuration;
if (sourcesChanged) {
HashSet<ConnectorInstanceListeningActivity> openActivitiesClone = new HashSet<>(openListeningActivities);
if (!openActivitiesClone.isEmpty()) {
restartListeningActivities(openActivitiesClone);
}
}
}

@Override
protected void connect(OperationResult result) {
// no-op
}

@Override
protected void disconnect(OperationResult result) {
// we do not want to stop currently running listening activities just because the configuration was changed
// no-op - we act on configuration change in setConfiguration method because
// we need the original configuration to know the difference
}

@Override
Expand Down Expand Up @@ -107,19 +128,45 @@ public void dispose() {

@Override
public ListeningActivity startListeningForChanges(ChangeListener changeListener, OperationResult parentResult) throws SchemaException {
ConnectorInstanceListeningActivity listeningActivity = new ConnectorInstanceListeningActivity(changeListener);
try {
openListeningActivities.add(listeningActivity);
startListeningInternal(listeningActivity);
} catch (Throwable t) {
openListeningActivities.remove(listeningActivity);
throw t;
}
return listeningActivity;
}

private void startListeningInternal(ConnectorInstanceListeningActivity listeningActivity)
throws SchemaException {
TransformationalAsyncUpdateMessageListener messageListener = new TransformationalAsyncUpdateMessageListener(
changeListener, configuration.getTransformExpression(), ucfExpressionEvaluator, getPrismContext(), getResourceSchema());
listeningActivity.changeListener, this);
Collection<AsyncUpdateSource> sources = sourceManager.createSources(configuration.getAllSources());
ConnectorInstanceListeningActivity listeningActivity = new ConnectorInstanceListeningActivity();
try {
for (AsyncUpdateSource source : sources) {
listeningActivity.addActivity(source.startListening(messageListener));
}
} catch (RuntimeException e) {
listeningActivity.stop();
throw e;
} catch (Throwable t) {
listeningActivity.stopInnerActivities();
throw t;
}
}

private void restartListeningActivities(Set<ConnectorInstanceListeningActivity> activities) {
LOGGER.info("Restarting {} open listening activities", activities.size());

for (ConnectorInstanceListeningActivity activity : activities) {
try {
LOGGER.debug("Stopping listening activity {}", activity);
activity.stopInnerActivities();
LOGGER.debug("Starting listening activity {} again", activity);
startListeningInternal(activity);
} catch (RuntimeException | SchemaException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't restart listening activity {} on {}", e, activity, this);
}
}
return listeningActivity;
}

@Override
Expand All @@ -141,16 +188,6 @@ public Collection<Object> fetchCapabilities(OperationResult parentResult) {
// TODO activation, credentials?
}

@ManagedConnectorConfiguration
public AsyncUpdateConnectorConfiguration getConfiguration() {
return configuration;
}

public void setConfiguration(AsyncUpdateConnectorConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public UcfExpressionEvaluator getUcfExpressionEvaluator() {
return ucfExpressionEvaluator;
Expand All @@ -161,6 +198,10 @@ public void setUcfExpressionEvaluator(UcfExpressionEvaluator evaluator) {
this.ucfExpressionEvaluator = evaluator;
}

ExpressionType getTransformExpression() {
return configuration.getTransformExpression();
}

//region Unsupported operations
@Override
public ResourceSchema fetchResourceSchema(List<QName> generateObjectClasses, OperationResult parentResult) {
Expand Down Expand Up @@ -238,5 +279,45 @@ public List<Change> fetchChanges(ObjectClassComplexTypeDefinition objectClass, P
AttributesToReturn attrsToReturn, StateReporter reporter, OperationResult parentResult) {
return null;
}

//endregion

//region Listening activity
private class ConnectorInstanceListeningActivity implements ListeningActivity {

private final List<ListeningActivity> activities = new ArrayList<>();
private final ChangeListener changeListener;

ConnectorInstanceListeningActivity(ChangeListener changeListener) {
this.changeListener = changeListener;
}

@Override
public void stop() {
openListeningActivities.remove(this);
stopInnerActivities();
}

private void stopInnerActivities() {
for (ListeningActivity activity : activities) {
try {
activity.stop();
} catch (RuntimeException e) {
LoggingUtils.logUnexpectedException(LOGGER, "Couldn't stop listening on {}", e, activity);
}
}
activities.clear();
}

void addActivity(ListeningActivity activity) {
activities.add(activity);
}

@Override
public String toString() {
return "ConnectorInstanceListeningActivity{" + activities + "}";
}
}

//endregion
}

0 comments on commit ff48aa7

Please sign in to comment.