Skip to content

Commit

Permalink
Add executionEnvironment param for multinode tasks
Browse files Browse the repository at this point in the history
This parameter is inherited from the master to partitions and from
the coordinator to workers; it can be also set up explicitly in
partition definitions.
  • Loading branch information
mederly committed Jun 13, 2019
1 parent 80f5979 commit 05d44fe
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 50 deletions.
Expand Up @@ -3757,6 +3757,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="executionEnvironment" type="tns:TaskExecutionEnvironmentType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Execution environment configuration to use for partition task.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="copyMasterExtension" type="xsd:boolean" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -3823,6 +3830,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="executionEnvironment" type="tns:TaskExecutionEnvironmentType" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Execution environment configuration to use for partition task.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="copyMasterExtension" type="xsd:boolean" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Expand Down
Expand Up @@ -853,4 +853,9 @@ public ObjectReferenceType getOwnerRef() {
public Collection<String> getCachingProfiles() {
return emptySet();
}

@Override
public TaskExecutionEnvironmentType getExecutionEnvironment() {
return null;
}
}
Expand Up @@ -126,6 +126,11 @@ public TaskWorkManagementType getWorkManagement(Task masterTask) {
return data.getWorkManagement();
}

@Override
public TaskExecutionEnvironmentType getExecutionEnvironment(Task masterTask) {
return data.getExecutionEnvironment();
}

@Override
public Boolean isCopyMasterExtension(Task masterTask) {
return data.isCopyMasterExtension();
Expand Down Expand Up @@ -169,6 +174,11 @@ public TaskWorkManagementType getWorkManagement(Task masterTask) {
return data.getWorkManagement();
}

@Override
public TaskExecutionEnvironmentType getExecutionEnvironment(Task masterTask) {
return data.getExecutionEnvironment();
}

@Override
public Boolean isCopyMasterExtension(Task masterTask) {
return data.isCopyMasterExtension();
Expand Down
Expand Up @@ -1047,4 +1047,6 @@ void flushPendingModifications(OperationResult parentResult) throws ObjectNotFou

@NotNull
Collection<String> getCachingProfiles();

TaskExecutionEnvironmentType getExecutionEnvironment();
}
Expand Up @@ -17,6 +17,7 @@
package com.evolveum.midpoint.task.api;

import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskExecutionEnvironmentType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkManagementType;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -77,6 +78,13 @@ default TaskWorkManagementType getWorkManagement(Task masterTask) {
return null;
}

/**
* Execution environment to be used in subtask.
*/
default TaskExecutionEnvironmentType getExecutionEnvironment(Task masterTask) {
return null;
}

/**
* Whether to copy extension from master task into subtask.
*/
Expand Down Expand Up @@ -124,6 +132,13 @@ default TaskWorkManagementType getWorkManagement(Task masterTask) {
return null;
}

/**
* Execution environment to be used in subtask. Overrides strategy.executionEnvironment.
*/
default TaskExecutionEnvironmentType getExecutionEnvironment(Task masterTask) {
return null;
}

/**
* Whether to copy extension from master task into subtask. Overrides strategy.copyMasterExtension.
*/
Expand Down
Expand Up @@ -2603,7 +2603,12 @@ public void addSubtask(TaskType subtaskBean) {
@NotNull
@Override
public Collection<String> getCachingProfiles() {
TaskExecutionEnvironmentType executionEnvironment = getTaskType().getExecutionEnvironment();
TaskExecutionEnvironmentType executionEnvironment = getExecutionEnvironment();
return executionEnvironment != null ? executionEnvironment.getCachingProfile() : emptySet();
}

@Override
public TaskExecutionEnvironmentType getExecutionEnvironment() {
return getTaskType().getExecutionEnvironment();
}
}
Expand Up @@ -16,7 +16,6 @@

package com.evolveum.midpoint.task.quartzimpl.handlers;

import com.evolveum.midpoint.prism.Containerable;
import com.evolveum.midpoint.prism.PrismContainer;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.delta.ItemDelta;
Expand Down Expand Up @@ -261,6 +260,12 @@ private String createSubtask(int index, TaskPartitionsDefinition partitionsDefin
null, partition, partitionsDefinition);
// work management is updated and stored into subtask later

TaskExecutionEnvironmentType executionEnvironment = applyDefaults(
p -> p.getExecutionEnvironment(masterTask),
ps -> ps.getExecutionEnvironment(masterTask),
masterTask.getExecutionEnvironment(), partition, partitionsDefinition);
subtask.setExecutionEnvironment(CloneUtil.clone(executionEnvironment));

String handlerUriTemplate = applyDefaults(
p -> p.getHandlerUri(masterTask),
ps -> ps.getHandlerUri(masterTask),
Expand Down
Expand Up @@ -16,27 +16,9 @@

package com.evolveum.midpoint.task.quartzimpl.work;

import static com.evolveum.midpoint.schema.util.TaskWorkStateTypeUtil.findBucketByNumber;
import static java.util.Collections.singletonList;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang.BooleanUtils;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.evolveum.midpoint.prism.ItemDefinition;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.PrismObject;
import com.evolveum.midpoint.prism.delta.ContainerDelta;
import com.evolveum.midpoint.prism.delta.ItemDelta;
import com.evolveum.midpoint.prism.delta.ItemDeltaCollectionsUtil;
import com.evolveum.midpoint.prism.path.ItemPath;
import com.evolveum.midpoint.prism.query.ObjectFilter;
import com.evolveum.midpoint.prism.query.ObjectQuery;
Expand All @@ -47,7 +29,6 @@
import com.evolveum.midpoint.repo.api.VersionPrecondition;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.schema.util.TaskWorkStateTypeUtil;
import com.evolveum.midpoint.task.api.RunningTask;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskExecutionStatus;
import com.evolveum.midpoint.task.api.TaskManager;
Expand All @@ -62,7 +43,6 @@
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategyFactory;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandler;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandlerRegistry;
import com.evolveum.midpoint.util.MiscUtil;
import com.evolveum.midpoint.util.backoff.BackoffComputer;
import com.evolveum.midpoint.util.backoff.ExponentialBackoffComputer;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
Expand All @@ -71,15 +51,21 @@
import com.evolveum.midpoint.util.exception.SystemException;
import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.AbstractWorkSegmentationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.ObjectType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskKindType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkManagementType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskWorkStateType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkAllocationConfigurationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkBucketStateType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.WorkBucketType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.apache.commons.lang.BooleanUtils;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static com.evolveum.midpoint.schema.util.TaskWorkStateTypeUtil.findBucketByNumber;
import static java.util.Collections.singletonList;

/**
* Responsible for managing task work state.
Expand Down Expand Up @@ -109,7 +95,7 @@ private class Context {
Task coordinatorTask; // null for standalone worker tasks
final Supplier<Boolean> canRunSupplier;

public Context(Supplier<Boolean> canRunSupplier) {
Context(Supplier<Boolean> canRunSupplier) {
this.canRunSupplier = canRunSupplier;
}

Expand All @@ -121,15 +107,15 @@ public boolean isStandalone() {
return kind == null || kind == TaskKindType.STANDALONE;
}

public void reloadCoordinatorTask(OperationResult result) throws SchemaException, ObjectNotFoundException {
void reloadCoordinatorTask(OperationResult result) throws SchemaException, ObjectNotFoundException {
coordinatorTask = taskManager.getTask(coordinatorTask.getOid(), null, result);
}

public void reloadWorkerTask(OperationResult result) throws SchemaException, ObjectNotFoundException {
void reloadWorkerTask(OperationResult result) throws SchemaException, ObjectNotFoundException {
workerTask = taskManager.getTask(workerTask.getOid(), null, result);
}

public TaskWorkManagementType getWorkStateConfiguration() {
TaskWorkManagementType getWorkStateConfiguration() {
return isStandalone() ? workerTask.getWorkManagement() : coordinatorTask.getWorkManagement();
}
}
Expand Down
Expand Up @@ -250,9 +250,10 @@ private int createWorkers(Task coordinatorTask, MultiValuedMap<String, WorkerKey
}
switch (coordinatorTask.getExecutionStatus()) {
case WAITING: workerExecutionStatus = TaskExecutionStatusType.RUNNABLE; break;
case SUSPENDED: workerExecutionStatus = TaskExecutionStatusType.SUSPENDED; break;
case SUSPENDED:
case RUNNABLE:
workerExecutionStatus = TaskExecutionStatusType.SUSPENDED; break;
case CLOSED: workerExecutionStatus = TaskExecutionStatusType.CLOSED; break; // not very useful
case RUNNABLE: workerExecutionStatus = TaskExecutionStatusType.SUSPENDED; break;
default: throw new IllegalStateException("Unsupported executionStatus of " + coordinatorTask + ": " + coordinatorTask.getExecutionStatus());
}

Expand All @@ -277,6 +278,7 @@ private int createWorkers(Task coordinatorTask, MultiValuedMap<String, WorkerKey
worker.setObjectRef(CloneUtil.clone(coordinatorTask.getObjectRef()));
worker.setRecurrence(TaskRecurrenceType.SINGLE);
worker.setParent(coordinatorTask.getTaskIdentifier());
worker.setExecutionEnvironment(CloneUtil.clone(coordinatorTask.getExecutionEnvironment()));
worker.beginWorkManagement().taskKind(TaskKindType.WORKER);
PrismContainer<?> coordinatorExtension = coordinatorTask.getExtensionClone();
if (coordinatorExtension != null) {
Expand Down
Expand Up @@ -43,13 +43,10 @@
import org.testng.annotations.BeforeSuite;
import org.xml.sax.SAXException;

import javax.xml.bind.JAXBException;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.*;

import static com.evolveum.midpoint.test.IntegrationTestTools.display;
import static com.evolveum.midpoint.test.IntegrationTestTools.waitFor;
Expand Down Expand Up @@ -77,7 +74,9 @@ public class AbstractTaskManagerTest extends AbstractTestNGSpringContextTests {
protected static final String PARALLEL_TASK_HANDLER_URI = "http://midpoint.evolveum.com/test/parallel-task-handler";
protected static final String LONG_TASK_HANDLER_URI = "http://midpoint.evolveum.com/test/long-task-handler";

private static final String USER_ADMINISTRATOR_FILE = "src/test/resources/common/user-administrator.xml";
public static final String COMMON_DIR = "src/test/resources/common";
private static final File USER_ADMINISTRATOR_FILE = new File(COMMON_DIR, "user-administrator.xml");
static final File SYSTEM_CONFIGURATION_FILE = new File(COMMON_DIR, "system-configuration.xml");

// TODO make configurable. Due to a race condition there can be a small number of unoptimized complete buckets
// (it should not exceed the number of workers ... at least not by much amount :)
Expand Down Expand Up @@ -149,10 +148,10 @@ public void setup() throws SchemaException, SAXException, IOException {

public void initialize() throws Exception {
initHandlers();
addObjectFromFile(USER_ADMINISTRATOR_FILE);
addObjectFromFile(USER_ADMINISTRATOR_FILE.getPath());
}

protected <T extends ObjectType> PrismObject<T> unmarshallJaxbFromFile(String filePath) throws IOException, JAXBException, SchemaException {
protected <T extends ObjectType> PrismObject<T> unmarshallJaxbFromFile(String filePath) throws IOException, SchemaException {
File file = new File(filePath);
return PrismTestUtil.parseObject(file);
}
Expand Down Expand Up @@ -347,4 +346,13 @@ protected Collection<SelectorOptions<GetOperationOptions>> retrieveItemsNamed(Ob
.build();
}

void assertCachingProfiles(Task task, String... expectedProfiles) {
Set<String> realProfiles = getCachingProfiles(task);
assertEquals("Wrong caching profiles in " + task, new HashSet<>(Arrays.asList(expectedProfiles)), realProfiles);
}

private Set<String> getCachingProfiles(Task task) {
TaskExecutionEnvironmentType env = task.getExecutionEnvironment();
return env != null ? new HashSet<>(env.getCachingProfile()) : Collections.emptySet();
}
}

0 comments on commit 05d44fe

Please sign in to comment.