Skip to content

Commit

Permalink
Add preliminary support for activity identifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
mederly committed May 21, 2021
1 parent 663759a commit 7f4d16a
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 34 deletions.
Expand Up @@ -221,7 +221,7 @@ public static List<WorkBucketType> getBuckets(TaskWorkStateType workState) {
@Nullable
public static TaskPartWorkStateType getCurrentPartWorkState(TaskWorkStateType workState) {
if (workState != null) {
return getPartWorkState(workState, getCurrentPartId(workState));
return getPartWorkState(workState, getCurrentActivityId(workState));
} else {
return null;
}
Expand Down Expand Up @@ -289,7 +289,7 @@ public static WorkDistributionType getWorkDistribution(ActivityDefinitionType wo
return partDef != null ? partDef.getDistribution() : null;
}

public static String getCurrentPartId(TaskWorkStateType workState) {
public static String getCurrentActivityId(TaskWorkStateType workState) {
return workState != null ? workState.getCurrentPartId() : null;
}

Expand Down
Expand Up @@ -519,7 +519,7 @@ public void setActivityNumber(int activityNumber) {
this.activityNumber = activityNumber;
}

public ActivityDefinition<WD> getActivityDefinition() {
public @NotNull ActivityDefinition<WD> getActivityDefinition() {
return activityDefinition;
}

Expand Down
Expand Up @@ -217,4 +217,11 @@ public String debugDump(int indent) {
return sb.toString();
}

public String getIdentifier() {
if (definitionBean != null) {
return definitionBean.getIdentifier();
} else {
return null;
}
}
}
Expand Up @@ -16,6 +16,8 @@

import org.jetbrains.annotations.NotNull;

import java.util.function.Supplier;

/**
* Base class for activity executions.
*
Expand Down Expand Up @@ -44,6 +46,11 @@ public abstract class AbstractActivityExecution<WD extends WorkDefinition,
*/
@NotNull protected final AH activityHandler;

/**
* Activity identifier, either provided by user or automatically generated. It is unique among siblings.
*/
private String identifier;

protected AbstractActivityExecution(@NotNull ActivityInstantiationContext<WD> context,
@NotNull AH activityHandler) {
this.taskExecution = context.getTaskExecution();
Expand All @@ -62,22 +69,46 @@ public CompositeActivityExecution getParent() {
return parent;
}

public ActivityDefinition<WD> getActivityDefinition() {
public @NotNull ActivityDefinition<WD> getActivityDefinition() {
return activityDefinition;
}

@NotNull public AH getActivityHandler() {
return activityHandler;
}

@NotNull
@Override
public String getIdentifier() {
return identifier;
}

@Override
public void setupIdentifier(Supplier<String> defaultIdentifierSupplier) {
String defined = activityDefinition.getIdentifier();
if (defined != null) {
identifier = defined;
return;
}

String generated = defaultIdentifierSupplier.get();
if (generated != null) {
identifier = generated;
return;
}

throw new IllegalStateException("Activity identifier was not defined nor generated");
}

public CommonTaskBeans getBeans() {
return taskExecution.getBeans();
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" +
"activityDefinition=" + activityDefinition +
"id=" + identifier +
", def=" + activityDefinition +
'}';
}

Expand Down
Expand Up @@ -22,6 +22,8 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Abstract superclass for both pure- and semi-composite activities.
Expand Down Expand Up @@ -53,6 +55,8 @@ protected AbstractCompositeActivityExecution(ActivityInstantiationContext<WD> co

List<ActivityExecution> children = createChildren(result);
this.children.addAll(children);
setupChildIdentifiers();

tailorChildren(result);

LOGGER.trace("After children creation, before execution:\n{}", debugDumpLazily());
Expand All @@ -64,6 +68,26 @@ protected AbstractCompositeActivityExecution(ActivityInstantiationContext<WD> co
return executionResult;
}

private void setupChildIdentifiers() {
for (ActivityExecution child : this.children) {
child.setupIdentifier(this::generateNextIdentifier);
}
}

// TODO implement seriously
private String generateNextIdentifier() {
Set<String> existing = children.stream()
.map(ActivityExecution::getIdentifier)
.collect(Collectors.toSet());

for (int i = 1; ; i++) {
String candidate = String.valueOf(i);
if (!existing.contains(candidate)) {
return candidate;
}
}
}

/**
* Executes tailoring instructions, i.e. inserts new activities before/after specified ones,
* or changes the configuration of specified activities.
Expand All @@ -75,6 +99,7 @@ private void tailorChildren(OperationResult result) {
/**
* Creates child activity executions: either explicitly specified by the configuration
* (for pure composite activities) or implicitly defined by this activity (for semi-composite ones).
* @return
*/
@NotNull
protected abstract List<ActivityExecution> createChildren(OperationResult result) throws SchemaException;
Expand Down Expand Up @@ -110,7 +135,8 @@ public void addChild(@NotNull ActivityExecution child) {
@Override
public String toString() {
return getClass().getSimpleName() + "{" +
"activityDefinition=" + activityDefinition +
"id=" + getIdentifier() +
", def=" + activityDefinition +
", children=" + children +
'}';
}
Expand Down
Expand Up @@ -16,6 +16,8 @@

import org.jetbrains.annotations.NotNull;

import java.util.function.Supplier;

/**
* Implements and represents an execution of an activity.
*/
Expand All @@ -37,4 +39,13 @@ public interface ActivityExecution extends DebugDumpable {
* Returns task execution that contains this activity execution.
*/
@NotNull TaskExecution getTaskExecution();

/** TODO */
void setupIdentifier(Supplier<String> defaultIdentifierSupplier);

/**
* Returns activity identifier (unique among siblings).
*/
@NotNull String getIdentifier();

}
Expand Up @@ -34,7 +34,6 @@ public PureCompositeActivityExecution(ActivityInstantiationContext<CompositeWork
super(context, activityHandler);
}

@NotNull
@Override
protected List<ActivityExecution> createChildren(OperationResult result) throws SchemaException {
return activityDefinition.getWorkDefinition().createChildDefinitions().stream()
Expand Down
Expand Up @@ -34,7 +34,7 @@

import static com.evolveum.midpoint.schema.util.task.TaskWorkStateUtil.findBucketByNumber;

import static com.evolveum.midpoint.schema.util.task.TaskWorkStateUtil.getCurrentPartId;
import static com.evolveum.midpoint.schema.util.task.TaskWorkStateUtil.getCurrentActivityId;
import static com.evolveum.midpoint.util.MiscUtil.stateCheck;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -158,7 +158,7 @@ ActivityDefinitionType getCoordinatorTaskPartDefinition() {
private WorkDistributionType getCoordinatorWorkManagement() {
return requireNonNull(
TaskWorkStateUtil.getWorkDistribution(coordinatorTask.getActivityDefinitionOrClone(),
getCurrentPartId(coordinatorTask.getWorkState())),
getCurrentActivityId(coordinatorTask.getWorkState())),
() -> "No work management for the current part in coordinator task " + coordinatorTask);
}

Expand Down Expand Up @@ -283,12 +283,12 @@ private void findOrCreatePartWorkStateInWorkerTask(OperationResult result)
}

private void createPartWorkStateInWorkerTask(OperationResult result) throws SchemaException, ObjectNotFoundException {
String currentPartId = getCurrentPartId(workerTask.getWorkState());
LOGGER.trace("No part work state found for part {}, creating new one", currentPartId);
String currentActivityId = getCurrentActivityId(workerTask.getWorkState());
LOGGER.trace("No activity work state found for activity id {}, creating new one", currentActivityId);

TaskPartWorkStateType newPartWorkState =
new TaskPartWorkStateType(prismContext)
.partId(currentPartId)
.partId(currentActivityId)
.bucketsProcessingRole(BucketsProcessingRoleType.STANDALONE);
List<ItemDelta<?, ?>> itemDeltas = prismContext.deltaFor(TaskType.class)
.item(TaskType.F_WORK_STATE, TaskWorkStateType.F_PART).add(newPartWorkState)
Expand All @@ -303,7 +303,7 @@ private void createPartWorkStateInWorkerTask(OperationResult result) throws Sche
@Nullable
private Long findPartWorkStatePcvIdInWorkerTask() {
TaskWorkStateType workState = workerTask.getWorkState();
String currentPartId = getCurrentPartId(workState);
String currentPartId = getCurrentActivityId(workState);
TaskPartWorkStateType partWorkState = TaskWorkStateUtil.getPartWorkState(workState, currentPartId);
if (partWorkState != null) {
stateCheck(partWorkState.getId() != null, "Null part work state id in %s: %s", workerTask, partWorkState);
Expand Down
Expand Up @@ -68,6 +68,7 @@ public class TestActivities extends AbstractRepoCommonTest {
private static final TestResource<TaskType> TASK_MOCK_ITERATIVE = new TestResource<>(TEST_DIR, "task-mock-iterative.xml", "c21785e9-1c67-492f-bc79-0c51f74561a1");
private static final TestResource<TaskType> TASK_MOCK_SEARCH_ITERATIVE = new TestResource<>(TEST_DIR, "task-mock-search-iterative.xml", "9d8384b3-a007-44e2-a9f7-084a64bdc285");
private static final TestResource<TaskType> TASK_MOCK_BUCKETED = new TestResource<>(TEST_DIR, "task-mock-bucketed.xml", "04e257d1-bb25-4675-8e00-f248f164fbc3");
private static final TestResource<TaskType> TASK_BUCKETED_TREE = new TestResource<>(TEST_DIR, "task-bucketed-tree.xml", "ac3220c5-6ded-4b94-894e-9ed39c05db66");

// private static final TestResource<TaskType> TASK_200_WORKER = new TestResource<>(TEST_DIR, "task-200-w.xml", "44444444-2222-2222-2222-200w00000000");
// private static final TestResource<TaskType> TASK_210_COORDINATOR = new TestResource<>(TEST_DIR, "task-210-c.xml", "44444444-2222-2222-2222-210c00000000");
Expand Down Expand Up @@ -355,6 +356,46 @@ public void test170RunBucketedTask() throws Exception {
// TODO assert the bucketing
}

@Test
public void test180RunBucketedTree() throws Exception {
given();

Task task = getTestTask();
OperationResult result = task.getResult();

recorder.reset();

Task task1 = taskAdd(TASK_BUCKETED_TREE, result);

when();

waitForTaskClose(task1.getOid(), result, 10000, 200);

then();

task1.refresh(result);
display("task after", task1);
assertSuccess(task1.getResult());

OperationStatsType stats = task1.getStoredOperationStatsOrClone();
displayValue("statistics", TaskOperationStatsUtil.format(stats));
// assertThat(stats.getIterativeTaskInformation().getPart().get(0).getProcessed().get(0).getCount())
// .as("count of processed items in first activity")
// .isEqualTo(100);

displayDumpable("recorder", recorder);
// Set<String> messages = IntStream.range(0, 100)
// .mapToObj(i -> String.format("Role: " + ROLE_NAME_PATTERN, i))
// .collect(Collectors.toSet());
// assertThat(recorder.getExecutions()).as("recorder")
// .containsExactlyInAnyOrderElementsOf(messages);

task1.setResult(null);
displayValue("task after (XML)", prismContext.xmlSerializer().serialize(task1.getRawTaskObjectClone()));

// TODO assert the bucketing
}

// @Test
// public void test200OneWorkerTask() throws Exception {
// given();
Expand Down
Expand Up @@ -22,11 +22,4 @@ class MockClosingActivityExecution extends MockComponentActivityExecution {
String getSubActivity() {
return "closing";
}

@Override
public String toString() {
return "CompositeMockClosingActivityExecution{" +
"activityDefinition=" + activityDefinition +
'}';
}
}
Expand Up @@ -93,13 +93,6 @@ private void sleep(RunningTask task, long delay) {

abstract String getSubActivity();

@Override
public String toString() {
return "CompositeMockSubActivityExecution{" +
"activityDefinition=" + activityDefinition +
'}';
}

@Override
public String debugDump(int indent) {
StringBuilder sb = new StringBuilder(super.debugDump(indent));
Expand Down
Expand Up @@ -22,11 +22,4 @@ class MockOpeningActivityExecution extends MockComponentActivityExecution {
String getSubActivity() {
return "opening";
}

@Override
public String toString() {
return "CompositeMockOpeningActivityExecution{" +
"activityDefinition=" + activityDefinition +
'}';
}
}

0 comments on commit 7f4d16a

Please sign in to comment.