Skip to content

Commit

Permalink
Add skeleton of new task execution framework
Browse files Browse the repository at this point in the history
This commit lays the ground for the new approach to task definition
and execution, based on activities.

It provides:

1. Updated XSD for definition of activities within tasks.
2. Basic parts of the execution framework: generic task handler,
parts of ActivityDefinition, WorkDefinition, ActivityHandler,
and ActivityExecution class hierarchies.
3. Sample basic, semi-composite and pure composite activities.

This is a work in progress. The system is not compilable at this moment,
but very basic test (TestActions) passes.
  • Loading branch information
mederly committed May 14, 2021
1 parent 20a2513 commit 295b6ea
Show file tree
Hide file tree
Showing 187 changed files with 5,327 additions and 2,462 deletions.
Expand Up @@ -219,9 +219,9 @@ public static String getDisplayName(TaskErrorHandlingStrategyEntryType taskError
}

//TODO improve
public static String getDisplayName(TaskPartDefinitionType partitionDefinition) {
public static String getDisplayName(ActivityDefinitionType partitionDefinition) {
Integer index = partitionDefinition.getIndex();
TaskWorkDistributionType workManagementType = partitionDefinition.getWorkManagement();
WorkDistributionType workManagementType = partitionDefinition.getWorkManagement();

String string = "";
if (index != null) {
Expand Down
Expand Up @@ -146,7 +146,7 @@ public boolean isVisible() {
@Override
public WebMarkupContainer createPanel(String panelId) {
// TODO FIX THIS after task schema change
return new SingleContainerPanel<TaskWorkDistributionType>(panelId, PrismContainerWrapperModel.fromContainerWrapper(getObjectModel(), TaskType.F_PARTS), TaskWorkDistributionType.COMPLEX_TYPE) {
return new SingleContainerPanel<WorkDistributionType>(panelId, PrismContainerWrapperModel.fromContainerWrapper(getObjectModel(), TaskType.F_PARTS), WorkDistributionType.COMPLEX_TYPE) {

@Override
protected ItemVisibility getVisibility(ItemPath itemPath) {
Expand Down
Expand Up @@ -11,13 +11,11 @@
import java.util.List;

import com.evolveum.midpoint.schema.util.task.TaskPartPerformanceInformation;
import com.evolveum.midpoint.schema.util.task.TaskPartProgressInformation;
import com.evolveum.midpoint.schema.util.task.TaskPerformanceInformation;

import org.apache.wicket.model.StringResourceModel;

import com.evolveum.midpoint.gui.api.util.WebComponentUtil;
import com.evolveum.midpoint.web.page.admin.server.TaskDisplayUtil;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import com.evolveum.wicket.chartjs.*;

Expand Down Expand Up @@ -56,7 +54,7 @@ public TaskIterativeProgressType(IterativeTaskPartItemsProcessingInformationType
}

createChartConfiguration();
performanceInformation = createPerformanceInformation(taskType, processingInfoType.getPartUri());
performanceInformation = createPerformanceInformation(taskType, processingInfoType.getPartIdentifier());
}

private void parseItemForOutcome(ItemProcessingOutcomeType outcome, ProcessedItemSetType processedItem) {
Expand Down Expand Up @@ -126,8 +124,8 @@ private TaskPartPerformanceInformation createPerformanceInformation(TaskType tas
}

public String getTitle() {
if (performanceInformation.getPartUri() != null) {
return getString("TaskIterativeProgress.part." + performanceInformation.getPartUri(), performanceInformation.getItemsProcessed());
if (performanceInformation.getPartIdentifier() != null) {
return getString("TaskIterativeProgress.part." + performanceInformation.getPartIdentifier(), performanceInformation.getItemsProcessed());
}
return getString("TaskOperationStatisticsPanel.processingInfo", performanceInformation.getItemsProcessed());
}
Expand Down
Expand Up @@ -117,7 +117,7 @@ public static String migrateUri(String uri) {

/**
* Describes migration from (potentially) old channel URI to a current channel URI.
* (What is the most important, is the "change" flag.)
* (What is the most important, is the {@link #needed} flag.)
*/
public static class Migration {

Expand Down
Expand Up @@ -18,7 +18,7 @@
public class IterativeOperationStartInfo {

private final IterationItemInformation item;
private final String partUri;
private final String partIdentifier;
private final Long partStartTimestamp;

private final long startTimeMillis;
Expand All @@ -36,13 +36,13 @@ public IterativeOperationStartInfo(IterationItemInformation item) {
this(item, null);
}

public IterativeOperationStartInfo(IterationItemInformation item, String partUri) {
this(item, partUri, null);
public IterativeOperationStartInfo(IterationItemInformation item, String partIdentifier) {
this(item, partIdentifier, null);
}

public IterativeOperationStartInfo(IterationItemInformation item, String partUri, Long partStartTimestamp) {
public IterativeOperationStartInfo(IterationItemInformation item, String partIdentifier, Long partStartTimestamp) {
this.item = item;
this.partUri = partUri;
this.partIdentifier = partIdentifier;
this.partStartTimestamp = partStartTimestamp;

this.startTimeMillis = System.currentTimeMillis();
Expand All @@ -65,8 +65,8 @@ public Long getPartStartTimestamp() {
return partStartTimestamp;
}

public String getPartUri() {
return partUri;
public String getPartIdentifier() {
return partIdentifier;
}

public StructuredProgressCollector getStructuredProgressCollector() {
Expand All @@ -83,7 +83,7 @@ public String toString() {
"item=" + item +
", startTimeMillis=" + startTimeMillis +
", partStartTimestamp=" + partStartTimestamp +
", partUri=" + partUri +
", partUri=" + partIdentifier +
", structuredProgressCollector=" + structuredProgressCollector +
'}';
}
Expand Down
Expand Up @@ -105,7 +105,7 @@ public synchronized Operation recordOperationStart(IterativeOperationStartInfo s
.operationId(getNextOperationId());

IterativeTaskPartItemsProcessingInformationType matchingPart =
findOrCreateMatchingPart(value.getPart(), startInfo.getPartUri());
findOrCreateMatchingPart(value.getPart(), startInfo.getPartIdentifier());
updatePartExecutions(matchingPart, startInfo.getPartStartTimestamp(), System.currentTimeMillis());

List<ProcessedItemType> currentList = matchingPart.getCurrent();
Expand Down Expand Up @@ -151,7 +151,7 @@ private TaskPartExecutionRecordType findOrCreateMatchingExecutionRecord(List<Tas
*/
private synchronized void recordOperationEnd(OperationImpl operation, QualifiedItemProcessingOutcomeType outcome,
Throwable exception) {
String partUri = operation.startInfo.getPartUri();
String partUri = operation.startInfo.getPartIdentifier();

Optional<IterativeTaskPartItemsProcessingInformationType> matchingPartOptional =
findMatchingPart(value.getPart(), partUri);
Expand Down Expand Up @@ -230,7 +230,7 @@ private static void addMatchingParts(List<IterativeTaskPartItemsProcessingInform
List<IterativeTaskPartItemsProcessingInformationType> deltaParts) {
for (IterativeTaskPartItemsProcessingInformationType deltaPart : deltaParts) {
IterativeTaskPartItemsProcessingInformationType matchingPart =
findOrCreateMatchingPart(sumParts, deltaPart.getPartUri());
findOrCreateMatchingPart(sumParts, deltaPart.getPartIdentifier());
addPartInformation(matchingPart, deltaPart);
}
}
Expand All @@ -239,13 +239,13 @@ private static IterativeTaskPartItemsProcessingInformationType findOrCreateMatch
@NotNull List<IterativeTaskPartItemsProcessingInformationType> list, String partUri) {
return findMatchingPart(list, partUri)
.orElseGet(
() -> add(list, new IterativeTaskPartItemsProcessingInformationType().partUri(partUri)));
() -> add(list, new IterativeTaskPartItemsProcessingInformationType().partIdentifier(partUri)));
}

private static Optional<IterativeTaskPartItemsProcessingInformationType> findMatchingPart(
@NotNull List<IterativeTaskPartItemsProcessingInformationType> list, String partUri) {
return list.stream()
.filter(item -> Objects.equals(item.getPartUri(), partUri))
.filter(item -> Objects.equals(item.getPartIdentifier(), partUri))
.findFirst();
}

Expand Down Expand Up @@ -396,7 +396,7 @@ public void done(QualifiedItemProcessingOutcomeType outcome, Throwable exception
recordOperationEnd(this, outcome, exception);
StructuredProgressCollector progressCollector = startInfo.getStructuredProgressCollector();
if (progressCollector != null) {
progressCollector.incrementStructuredProgress(startInfo.getPartUri(), outcome);
progressCollector.incrementStructuredProgress(startInfo.getPartIdentifier(), outcome);
}
}

Expand Down
Expand Up @@ -45,7 +45,7 @@ private void createData(IterativeTaskPartItemsProcessingInformationType componen

for (ProcessedItemSetType set : processed) {
Data.Record record = data.createRecord();
record.add(component.getPartUri());
record.add(component.getPartIdentifier());
record.add(OutcomeKeyedCounterTypeUtil.getOutcome(set));
record.add(OutcomeKeyedCounterTypeUtil.getOutcomeQualifierUri(set));
record.add(set.getCount());
Expand Down
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.schema.util.task;

import org.jetbrains.annotations.NotNull;

import com.evolveum.midpoint.prism.PrismContainer;
import com.evolveum.midpoint.prism.PrismContainerValue;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ExtensionType;

public class LegacyWorkDefinitionSource implements WorkDefinitionSource {

@NotNull private final String taskHandlerUri;
private final PrismContainerValue<?> taskExtension;

private LegacyWorkDefinitionSource(@NotNull String taskHandlerUri, PrismContainerValue<?> taskExtension) {
this.taskHandlerUri = taskHandlerUri;
this.taskExtension = taskExtension;
}

@NotNull
public static WorkDefinitionSource create(@NotNull String handlerUri,
PrismContainer<? extends ExtensionType> extensionContainer) {
PrismContainerValue<?> pcv = extensionContainer != null ? extensionContainer.getValue() : null;
return new LegacyWorkDefinitionSource(handlerUri, pcv);
}

public @NotNull String getTaskHandlerUri() {
return taskHandlerUri;
}

public PrismContainerValue<?> getTaskExtension() {
return taskExtension;
}

@Override
public String toString() {
return "LegacyWorkDefinitionSource{" +
"taskHandlerUri='" + taskHandlerUri + '\'' +
", taskExtension size=" + getExtensionSize() +
'}';
}

private int getExtensionSize() {
return taskExtension != null ? taskExtension.size() : 0;
}
}
Expand Up @@ -162,7 +162,7 @@ private static IterativeTaskPartItemsProcessingInformationType getIterativeInfoF
return null;
} else {
return statistics.getIterativeTaskInformation().getPart().stream()
.filter(part -> Objects.equals(part.getPartUri(), partUri))
.filter(part -> Objects.equals(part.getPartIdentifier(), partUri))
.findFirst().orElse(null);
}
}
Expand Down
Expand Up @@ -83,7 +83,7 @@ private TaskPartPerformanceInformation(String partUri, int itemsProcessed, int e
public static TaskPartPerformanceInformation forPart(@NotNull IterativeTaskPartItemsProcessingInformationType info,
StructuredTaskProgressType structuredProgress) {

String partUri = info.getPartUri();
String partUri = info.getPartIdentifier();

int itemsProcessed = TaskOperationStatsUtil.getItemsProcessed(info);
int errors = TaskOperationStatsUtil.getErrors(info);
Expand All @@ -107,7 +107,7 @@ public static TaskPartPerformanceInformation forCurrentPart(OperationStatsType o
return forPart(info, structuredProgress);
}

public String getPartUri() {
public String getPartIdentifier() {
return partUri;
}

Expand Down

This file was deleted.

Expand Up @@ -98,7 +98,7 @@ private static TaskPerformanceInformation fromOtherTask(@NotNull TaskType task)
}

private void addPart(TaskPartPerformanceInformation part) {
parts.put(part.getPartUri(), part);
parts.put(part.getPartIdentifier(), part);
}

public Map<String, TaskPartPerformanceInformation> getParts() {
Expand Down

0 comments on commit 295b6ea

Please sign in to comment.