Skip to content

Commit

Permalink
Fix channel handling in synchronization tasks
Browse files Browse the repository at this point in the history
The channel URI was not consistently set in synchronization activities
(import, recon, live sync, async update). It was hacked in a complex
way, and still not reliable enough.

This commit sets the task.channel property for such synchronization
activities, hopefully providing channel to all places where it is
needed. Some of the hacks could be thus removed.

This resolves MID-7454.
  • Loading branch information
mederly committed Mar 24, 2022
1 parent a7c3cd3 commit 9526b38
Show file tree
Hide file tree
Showing 21 changed files with 107 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,7 @@ private <F extends FocusType> void saveSyncMetadata(SynchronizationContext<F> sy
private void savePendingDeltas(SynchronizationContext<?> syncCtx, OperationResult result) throws SchemaException {
Task task = syncCtx.getTask();
PrismObject<ShadowType> shadow = syncCtx.getShadowedResourceObject();
String channel = syncCtx.getChannel();

try {
beans.cacheRepositoryService.modifyObject(
Expand All @@ -917,9 +918,9 @@ private void savePendingDeltas(SynchronizationContext<?> syncCtx, OperationResul
syncCtx.getPendingShadowDeltas(),
result);
syncCtx.clearPendingShadowDeltas();
task.recordObjectActionExecuted(shadow, ChangeType.MODIFY, null);
task.recordObjectActionExecuted(shadow, null, null, ChangeType.MODIFY, channel, null);
} catch (ObjectNotFoundException ex) {
task.recordObjectActionExecuted(shadow, ChangeType.MODIFY, ex);
task.recordObjectActionExecuted(shadow, null, null, ChangeType.MODIFY, channel, ex);
// This may happen e.g. during some recon-livesync interactions.
// If the shadow is gone then it is gone. No point in recording the
// situation any more.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.evolveum.midpoint.model.impl.sync.tasks.ResourceObjectClass;
import com.evolveum.midpoint.repo.common.activity.run.*;
import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;

import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -120,4 +121,9 @@ public boolean isExcludedFromStalenessChecking() {
// This task does not have regularly updated progress. It cannot be watched for staleness (for now).
return true;
}

@Override
protected String getChannelOverride() {
return SchemaConstants.CHANNEL_ASYNC_UPDATE_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,9 @@ public boolean processItem(@NotNull ShadowType object,
private @NotNull ModelBeans getModelBeans() {
return getActivityHandler().getModelBeans();
}

@Override
protected String getChannelOverride() {
return SchemaConstants.CHANNEL_IMPORT_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,9 @@ private PrismObject<ShadowType> reloadShadow(ShadowType originalShadow, Task tas
public long getShadowReconCount() {
return transientRunStatistics.getItemsProcessed();
}

@Override
protected String getChannelOverride() {
return SchemaConstants.CHANNEL_RECON_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public long getResourceReconCount() {
public long getResourceReconErrors() {
return transientRunStatistics.getErrors();
}

@Override
protected String getChannelOverride() {
return SchemaConstants.CHANNEL_RECON_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,9 @@ public boolean processItem(@NotNull ItemProcessingRequest<LiveSyncEvent> request
getRunningTask().getExtensionPropertyRealValue(SchemaConstants.MODEL_EXTENSION_RETRY_LIVE_SYNC_ERRORS));
return retryErrors ? STOP : CONTINUE;
}

@Override
protected String getChannelOverride() {
return SchemaConstants.CHANNEL_LIVE_SYNC_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,6 @@ public class ProvisioningContext {
*/
private Collection<ResourceObjectPattern> protectedObjectPatterns;

/**
* Override for values returned by {@link #getChannel()} method.
*
* TODO Remove this ugly hack. The channel should be read solely from the task.
*/
private String channelOverride;

/** Creating context from scratch. */
ProvisioningContext(
@NotNull Task task,
Expand All @@ -138,7 +131,6 @@ public class ProvisioningContext {
this.contextFactory = originalCtx.contextFactory;
this.connectorMap.putAll(originalCtx.connectorMap);
this.resourceSchema = originalCtx.resourceSchema;
this.channelOverride = originalCtx.channelOverride;
this.getOperationOptions = originalCtx.getOperationOptions; // OK?
this.propagation = originalCtx.propagation;
// Not copying protected account patters because these are object type specific.
Expand Down Expand Up @@ -293,12 +285,7 @@ public Collection<ResourceObjectPattern> getProtectedAccountPatterns(
}

public String getChannel() {
Task task = getTask();
if (channelOverride == null) {
return task.getChannel();
} else {
return channelOverride;
}
return task.getChannel();
}

public <T extends CapabilityType> ConnectorInstance getConnector(Class<T> operationCapabilityClass, OperationResult result)
Expand Down Expand Up @@ -469,10 +456,6 @@ public CachingStategyType getCachingStrategy()
return ProvisioningUtil.getCachingStrategy(this);
}

public void setChannelOverride(String channelOverride) {
this.channelOverride = channelOverride;
}

public String toHumanReadableDescription() {
if (resourceObjectDefinition != null) {
return resourceObjectDefinition.getHumanReadableName() + " @" + resource;
Expand All @@ -481,15 +464,6 @@ public String toHumanReadableDescription() {
}
}

public Object toHumanReadableDescriptionLazy() {
return new Object() {
@Override
public String toString() {
return toHumanReadableDescription();
}
};
}

public boolean isInMaintenance() {
return ResourceTypeUtil.isInMaintenance(resource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,10 @@ void notifyAfterAdd(
PrismObject<ShadowType> addedShadow,
ProvisioningOperationState<AsynchronousOperationReturnValue<PrismObject<ShadowType>>> opState,
Task task,
OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {
OperationResult parentResult) {
ObjectDelta<ShadowType> delta = DeltaFactory.Object.createAddDelta(addedShadow);
ResourceOperationDescription operationDescription = Util.createSuccessOperationDescription(ctx, addedShadow,
delta, parentResult);
ResourceOperationDescription operationDescription = Util.createSuccessOperationDescription(
ctx, addedShadow, delta, parentResult);

if (opState.isExecuting()) {
eventDispatcher.notifyInProgress(operationDescription, task, parentResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ void notifyAfterDelete(
PrismObject<ShadowType> shadow,
ProvisioningOperationState<AsynchronousOperationResult> opState,
Task task,
OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {
OperationResult parentResult) {
ObjectDelta<ShadowType> delta = prismContext.deltaFactory().object().createDeleteDelta(shadow.getCompileTimeClass(),
shadow.getOid());
ResourceOperationDescription operationDescription = createSuccessOperationDescription(ctx, shadow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,12 @@ void notifyAfterModify(
Collection<? extends ItemDelta> modifications,
ProvisioningOperationState<AsynchronousOperationReturnValue<Collection<PropertyDelta<PrismPropertyValue>>>> opState,
Task task,
OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException, ConfigurationException, ExpressionEvaluationException {
OperationResult parentResult) {

ObjectDelta<ShadowType> delta = prismContext.deltaFactory().object().createModifyDelta(repoShadow.getOid(), modifications,
repoShadow.getCompileTimeClass());
ResourceOperationDescription operationDescription = createSuccessOperationDescription(ctx, repoShadow,
delta, parentResult);
ResourceOperationDescription operationDescription =
createSuccessOperationDescription(ctx, repoShadow, delta, parentResult);

if (opState.isExecuting()) {
eventDispatcher.notifyInProgress(operationDescription, task, parentResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ private void refreshShadowAsyncStatus(ProvisioningContext ctx, PrismObject<Shado
}

for (ObjectDelta<ShadowType> notificationDelta: notificationDeltas) {
ResourceOperationDescription operationDescription = createSuccessOperationDescription(ctx, repoShadow,
notificationDelta, parentResult);
ResourceOperationDescription operationDescription =
createSuccessOperationDescription(ctx, repoShadow, notificationDelta, parentResult);
operationListener.notifySuccess(operationDescription, task, parentResult);
}

Expand Down Expand Up @@ -536,8 +536,8 @@ private PrismObject<ShadowType> deleteDeadShadowIfPossible(ProvisioningContext c
LOGGER.debug("Deleting dead {} because it is expired", repoShadow);
shadowManager.deleteShadow(repoShadow, task, parentResult);
definitionsHelper.applyDefinition(repoShadow, task, parentResult);
ResourceOperationDescription operationDescription = createSuccessOperationDescription(ctx, repoShadow,
repoShadow.createDeleteDelta(), parentResult);
ResourceOperationDescription operationDescription =
createSuccessOperationDescription(ctx, repoShadow, repoShadow.createDeleteDelta(), parentResult);
operationListener.notifySuccess(operationDescription, task, parentResult);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.evolveum.midpoint.schema.AcknowledgementSink;

import com.evolveum.midpoint.schema.constants.SchemaConstants;
import com.evolveum.midpoint.schema.result.OperationResult;

import org.jetbrains.annotations.NotNull;
Expand All @@ -32,4 +33,9 @@ public ShadowedAsyncChange(@NotNull ResourceObjectAsyncChange resourceObjectChan
public void acknowledge(boolean release, OperationResult result) {
resourceObjectChange.acknowledge(release, result);
}

@Override
protected String getDefaultChannel() {
return SchemaConstants.CHANNEL_ASYNC_UPDATE_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* This means that it is connected to repository shadow, and this shadow is updated
* with the appropriate information.
*/
public class ShadowedChange<ROC extends ResourceObjectChange> implements InitializableMixin {
public abstract class ShadowedChange<ROC extends ResourceObjectChange> implements InitializableMixin {

private static final Trace LOGGER = TraceManager.getTrace(ShadowedChange.class);

Expand Down Expand Up @@ -385,9 +385,15 @@ private void markIndexOnlyItemsAsIncomplete(PrismObject<ShadowType> resourceObje
}

private String getChannel() {
return ObjectUtils.defaultIfNull(context.getChannel(), SchemaConstants.CHANNEL_LIVE_SYNC_URI);
return ObjectUtils.defaultIfNull(context.getChannel(), getDefaultChannel());
}

/**
* Default channel for given change. The usefulness of this method is questionable,
* as the context should have the correct channel already set.
*/
protected abstract String getDefaultChannel();

public Collection<ResourceAttribute<?>> getIdentifiers() {
return resourceObjectChange.getIdentifiers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ public class ShadowedExternalChange extends ShadowedChange<ExternalResourceObjec
public ShadowedExternalChange(@NotNull ExternalResourceObjectChange resourceObjectChange, ChangeProcessingBeans beans) {
super(resourceObjectChange, beans);
}

@Override
protected String getDefaultChannel() {
return null; // TODO ?
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.evolveum.midpoint.provisioning.api.LiveSyncToken;
import com.evolveum.midpoint.schema.AcknowledgementSink;

import com.evolveum.midpoint.schema.constants.SchemaConstants;

import org.jetbrains.annotations.NotNull;

import com.evolveum.midpoint.provisioning.impl.resourceobjects.ResourceObjectLiveSyncChange;
Expand All @@ -27,4 +29,9 @@ public ShadowedLiveSyncChange(@NotNull ResourceObjectLiveSyncChange resourceObje
public LiveSyncToken getToken() {
return resourceObjectChange.getToken();
}

@Override
protected String getDefaultChannel() {
return SchemaConstants.CHANNEL_LIVE_SYNC_URI;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ static boolean shouldExecuteResourceOperationDirectly(ProvisioningContext ctx) t
}

static ResourceOperationDescription createSuccessOperationDescription(ProvisioningContext ctx,
PrismObject<ShadowType> shadowType, ObjectDelta<? extends ShadowType> delta, OperationResult parentResult)
throws ObjectNotFoundException, SchemaException, CommunicationException,
ConfigurationException, ExpressionEvaluationException {
PrismObject<ShadowType> shadowType, ObjectDelta<? extends ShadowType> delta, OperationResult parentResult) {
ResourceOperationDescription operationDescription = new ResourceOperationDescription();
operationDescription.setCurrentShadow(shadowType);
operationDescription.setResource(ctx.getResource().asPrismObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ public void processAsynchronousUpdates(ResourceShadowCoordinates shadowCoordinat

ProvisioningContext globalContext = ctxFactory.createForCoordinates(shadowCoordinates, callerTask, callerResult);

// This is a bit of hack to propagate information about async update channel to upper layers
// e.g. to implement MID-5853. TODO fix this hack
globalContext.setChannelOverride(SchemaConstants.CHANNEL_ASYNC_UPDATE_URI);

IndividualEventsAcknowledgeGate<AsyncUpdateEvent> acknowledgeGate = new IndividualEventsAcknowledgeGate<>();

ResourceObjectAsyncChangeListener listener = (resourceObjectChange, lTask, lResult) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
import com.evolveum.midpoint.schema.AcknowledgementSink;
import com.evolveum.midpoint.xml.ns._public.common.common_3.AsyncUpdateMessageType;

import com.google.common.annotations.VisibleForTesting;

/**
* Listener that receives "raw" async update messages from asynchronous update source (e.g. AMQP, JMS, REST, ...).
*/
@VisibleForTesting // just to provide mock implementations
public interface AsyncUpdateMessageListener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.util.Objects;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;

import com.evolveum.midpoint.repo.common.activity.run.processing.ItemProcessingRequest;
Expand Down Expand Up @@ -175,8 +174,10 @@ public IterativeActivityRun(@NotNull ActivityRunInstantiationContext<WD, AH> con

LOGGER.trace("{}: Starting with local coordinator task {}", shortName, getRunningTask());

String originalChannel = getRunningTask().getChannel();
try {
enableGlobalConnIdOperationsListener();
overrideTaskChannelIfNeeded();

transientRunStatistics.recordRunStart(getStartTimestampRequired());

Expand All @@ -196,10 +197,27 @@ public IterativeActivityRun(@NotNull ActivityRunInstantiationContext<WD, AH> con

} finally {
disableGlobalConnIdOperationsListener();
cancelTaskChannelOverride(originalChannel);
getActivityState().getConnIdOperationsReport().flush(getRunningTask(), result);
}
}

private void cancelTaskChannelOverride(String originalChannel) {
getRunningTask().setChannel(originalChannel);
}

private void overrideTaskChannelIfNeeded() {
String channelOverride = getChannelOverride();
if (channelOverride != null) {
getRunningTask().setChannel(channelOverride);
}
}

/** Channel URI that should be set into the task during this activity run. (If not null.) */
protected @Nullable String getChannelOverride() {
return null;
}

/**
* Bucketed version of the run.
*/
Expand Down Expand Up @@ -744,7 +762,7 @@ public final ConnIdOperationsReport getConnIdOperationsReport() {
private void reportBucketCompleted(BucketProcessingRecord processingRecord, OperationResult result) {
if (shouldReportBuckets()) {
activityState.getBucketsReport().recordBucketCompleted(
new BucketProcessingRecordType(PrismContext.get())
new BucketProcessingRecordType()
.sequentialNumber(bucket.getSequentialNumber())
.content(bucket.getContent())
.size(processingRecord.getTotalSize())
Expand All @@ -761,7 +779,7 @@ private void reportBucketCompleted(BucketProcessingRecord processingRecord, Oper
private void reportBucketAnalyzed(Integer size, OperationResult result) {
if (shouldReportBuckets()) {
activityState.getBucketsReport().recordBucketCompleted(
new BucketProcessingRecordType(PrismContext.get())
new BucketProcessingRecordType()
.sequentialNumber(bucket.getSequentialNumber())
.content(bucket.getContent())
.size(size),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,14 @@ public ActionsExecutedPartInfoAsserter<RA> assertEmpty() {
return this;
}

public ActionsExecutedPartInfoAsserter<RA> assertChannels(String... uris) {
assertThat(getAllChannels()).as("all channels").containsExactlyInAnyOrder(uris);
return this;
}

public Collection<String> getAllChannels() {
return getEntries().stream()
.map(ObjectActionsExecutedEntryType::getChannel)
.collect(Collectors.toSet());
}
}

0 comments on commit 9526b38

Please sign in to comment.