Skip to content

Commit

Permalink
Refactor work-related java code
Browse files Browse the repository at this point in the history
This is an adaptation of class names to recent changes in XSD.
  • Loading branch information
mederly committed Mar 19, 2018
1 parent 878d15e commit b9ddfeb
Show file tree
Hide file tree
Showing 26 changed files with 163 additions and 168 deletions.
Expand Up @@ -17,10 +17,10 @@

import com.evolveum.midpoint.model.api.ModelPublicConstants;
import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.task.api.StaticTaskPartitioningDefinition;
import com.evolveum.midpoint.task.api.StaticTaskPartitionsDefinition;
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.api.TaskPartitioningDefinition;
import com.evolveum.midpoint.task.api.TaskPartitionsDefinition;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskPartitionsDefinitionType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.TaskType;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -53,17 +53,17 @@ public class PartitionedReconciliationTaskHandlerCreator {

@PostConstruct
private void initialize() {
taskManager.createAndRegisterPartitioningTaskHandler(HANDLER_URI, this::createPartitioningDefinition);
taskManager.createAndRegisterPartitioningTaskHandler(HANDLER_URI, this::createPartitionsDefinition);
}

private TaskPartitioningDefinition createPartitioningDefinition(Task masterTask) {
private TaskPartitionsDefinition createPartitionsDefinition(Task masterTask) {
TaskPartitionsDefinitionType definitionInTask = masterTask.getWorkManagement() != null ?
masterTask.getWorkManagement().getPartitions() : null;
TaskPartitionsDefinitionType partitioningDefinition = definitionInTask != null ?
TaskPartitionsDefinitionType partitionsDefinition = definitionInTask != null ?
definitionInTask.clone() : new TaskPartitionsDefinitionType();
partitioningDefinition.setCount(3);
partitioningDefinition.setCopyMasterExtension(true);
return new StaticTaskPartitioningDefinition(partitioningDefinition,
partitionsDefinition.setCount(3);
partitionsDefinition.setCopyMasterExtension(true);
return new StaticTaskPartitionsDefinition(partitionsDefinition,
prismContext.getSchemaRegistry().findObjectDefinitionByCompileTimeClass(TaskType.class));
}
}
Expand Up @@ -32,27 +32,27 @@
import java.util.List;

/**
* Task partitioning strategy based on statically configured values. More restricted but easier to use.
* Task partitions definition based on statically configured values. More restricted but easier to use.
* However, it is extensible by subclassing - in that way, some values can be provided statically while others on-demand.
*
* @author mederly
*/
public class StaticTaskPartitioningDefinition implements TaskPartitioningDefinition {
public class StaticTaskPartitionsDefinition implements TaskPartitionsDefinition {

@NotNull private final TaskPartitionsDefinitionType data;
@NotNull private final List<TaskPartitionDefinition> partitions;
@NotNull private final PrismObjectDefinition<TaskType> taskDefinition;

public StaticTaskPartitioningDefinition(@NotNull TaskPartitionsDefinitionType data,
public StaticTaskPartitionsDefinition(@NotNull TaskPartitionsDefinitionType data,
@Nullable List<TaskPartitionDefinition> partitionsOverride, @NotNull PrismObjectDefinition<TaskType> taskDefinition) {
this.data = data;
this.partitions = partitionsOverride != null ? partitionsOverride : createPartitionDefinitions(data);
this.taskDefinition = taskDefinition;
}

public StaticTaskPartitioningDefinition(@NotNull TaskPartitionsDefinitionType partitioningDefinition,
public StaticTaskPartitionsDefinition(@NotNull TaskPartitionsDefinitionType definition,
@NotNull PrismObjectDefinition<TaskType> taskDefinition) {
this(partitioningDefinition, null, taskDefinition);
this(definition, null, taskDefinition);
}

private List<TaskPartitionDefinition> createPartitionDefinitions(TaskPartitionsDefinitionType data) {
Expand Down
Expand Up @@ -706,7 +706,7 @@ ObjectQuery narrowQueryForWorkBucket(ObjectQuery query, Class<? extends ObjectTy
Function<ItemPath, ItemDefinition<?>> itemDefinitionProvider, Task workerTask,
WorkBucketType workBucket, OperationResult opResult) throws SchemaException, ObjectNotFoundException;

TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, Function<Task, TaskPartitioningDefinition> partitioningStrategy);
TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, Function<Task, TaskPartitionsDefinition> partitioningStrategy);

void setFreeBucketWaitInterval(long value);
}
Expand Up @@ -33,7 +33,7 @@
*
* @author mederly
*/
public interface TaskPartitioningDefinition {
public interface TaskPartitionsDefinition {

/**
* Number of partitions.
Expand Down
Expand Up @@ -2124,7 +2124,7 @@ public ObjectQuery narrowQueryForWorkBucket(ObjectQuery query, Class<? extends O
}

@Override
public TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, Function<Task, TaskPartitioningDefinition> partitioningStrategy) {
public TaskHandler createAndRegisterPartitioningTaskHandler(String handlerUri, Function<Task, TaskPartitionsDefinition> partitioningStrategy) {
PartitioningTaskHandler handler = new PartitioningTaskHandler(this, partitioningStrategy);
registerHandler(handlerUri, handler);
return handler;
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.evolveum.midpoint.prism.util.CloneUtil;
import com.evolveum.midpoint.schema.result.OperationResult;
import com.evolveum.midpoint.task.api.*;
import com.evolveum.midpoint.task.api.TaskPartitioningDefinition.TaskPartitionDefinition;
import com.evolveum.midpoint.task.api.TaskPartitionsDefinition.TaskPartitionDefinition;
import com.evolveum.midpoint.task.api.TaskRunResult.TaskRunResultStatus;
import com.evolveum.midpoint.task.quartzimpl.TaskManagerQuartzImpl;
import com.evolveum.midpoint.util.TemplateUtil;
Expand All @@ -42,7 +42,7 @@

/**
* Task that partitions the work into subtasks.
* Partitioning is driven by a TaskPartitioningStrategy.
* Partitioning is driven by a TaskPartitionsDefinition.
*
* @author mederly
*/
Expand All @@ -53,11 +53,11 @@ public class PartitioningTaskHandler implements TaskHandler {
private static final String DEFAULT_HANDLER_URI = "{masterTaskHandlerUri}#{index}";

private TaskManager taskManager;
private Function<Task, TaskPartitioningDefinition> partitioningDefinitionSupplier;
private Function<Task, TaskPartitionsDefinition> partitionsDefinitionSupplier;

public PartitioningTaskHandler(TaskManager taskManager, Function<Task, TaskPartitioningDefinition> partitioningDefinitionSupplier) {
public PartitioningTaskHandler(TaskManager taskManager, Function<Task, TaskPartitionsDefinition> partitionsDefinitionSupplier) {
this.taskManager = taskManager;
this.partitioningDefinitionSupplier = partitioningDefinitionSupplier;
this.partitionsDefinitionSupplier = partitionsDefinitionSupplier;
}

@Override
Expand Down Expand Up @@ -116,20 +116,20 @@ public TaskRunResult run(Task masterTask) {
*/
private List<Task> createSubtasks(Task masterTask, OperationResult opResult)
throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException {
TaskPartitioningDefinition partitioningDefinition = partitioningDefinitionSupplier.apply(masterTask);
TaskPartitionsDefinition partitionsDefinition = partitionsDefinitionSupplier.apply(masterTask);
List<String> subtaskOids = new ArrayList<>();
int count = partitioningDefinition.getCount(masterTask);
int count = partitionsDefinition.getCount(masterTask);
for (int i = 1; i <= count; i++) {
subtaskOids.add(createSubtask(i, partitioningDefinition, masterTask, opResult));
subtaskOids.add(createSubtask(i, partitionsDefinition, masterTask, opResult));
}
List<Task> subtasks = new ArrayList<>(subtaskOids.size());
for (String subtaskOid : subtaskOids) {
subtasks.add(taskManager.getTask(subtaskOid, opResult));
}
boolean sequential = partitioningDefinition.isSequentialExecution(masterTask);
boolean sequential = partitionsDefinition.isSequentialExecution(masterTask);
for (int i = 1; i <= count; i++) {
Task subtask = subtasks.get(i - 1);
TaskPartitionDefinition partition = partitioningDefinition.getPartition(masterTask, i);
TaskPartitionDefinition partition = partitionsDefinition.getPartition(masterTask, i);
Collection<Integer> dependents = new HashSet<>(partition.getDependents());
if (sequential && i < count) {
dependents.add(i + 1);
Expand All @@ -147,36 +147,36 @@ private List<Task> createSubtasks(Task masterTask, OperationResult opResult)
return subtasks;
}

private String createSubtask(int index, TaskPartitioningDefinition partitioningDefinition,
private String createSubtask(int index, TaskPartitionsDefinition partitionsDefinition,
Task masterTask, OperationResult opResult) throws SchemaException, ObjectAlreadyExistsException {
Map<String, String> replacements = new HashMap<>();
replacements.put("index", String.valueOf(index));
replacements.put("masterTaskName", String.valueOf(masterTask.getName().getOrig()));
replacements.put("masterTaskHandlerUri", masterTask.getHandlerUri());

TaskPartitionDefinition partition = partitioningDefinition.getPartition(masterTask, index);
TaskPartitionDefinition partition = partitionsDefinition.getPartition(masterTask, index);

TaskType masterTaskBean = masterTask.getTaskType();
TaskType subtask = new TaskType(getPrismContext());

String nameTemplate = applyDefaults(
p -> p.getName(masterTask),
ps -> ps.getName(masterTask),
"{masterTaskName} ({index})", partition, partitioningDefinition);
"{masterTaskName} ({index})", partition, partitionsDefinition);
String name = TemplateUtil.replace(nameTemplate, replacements);
subtask.setName(PolyStringType.fromOrig(name));

TaskWorkManagementType workManagement = applyDefaults(
p -> p.getWorkManagement(masterTask),
ps -> ps.getWorkManagement(masterTask),
null, partition, partitioningDefinition);
null, partition, partitionsDefinition);
// work management is updated and stored into subtask later

String handlerUriTemplate = applyDefaults(
p -> p.getHandlerUri(masterTask),
ps -> ps.getHandlerUri(masterTask),
null,
partition, partitioningDefinition);
partition, partitionsDefinition);
String handlerUri = TemplateUtil.replace(handlerUriTemplate, replacements);
if (handlerUri == null) {
// The default for coordinator-based partitions is to put default handler into workers configuration
Expand All @@ -203,7 +203,7 @@ private String createSubtask(int index, TaskPartitioningDefinition partitioningD
boolean copyMasterExtension = applyDefaults(
p -> p.isCopyMasterExtension(masterTask),
ps -> ps.isCopyMasterExtension(masterTask),
false, partition, partitioningDefinition);
false, partition, partitionsDefinition);
if (copyMasterExtension) {
PrismContainer<Containerable> masterExtension = masterTaskBean.asPrismObject().findContainer(TaskType.F_EXTENSION);
if (masterTaskBean.getExtension() != null) {
Expand All @@ -212,7 +212,7 @@ private String createSubtask(int index, TaskPartitioningDefinition partitioningD
}

applyDeltas(subtask, partition.getOtherDeltas(masterTask));
applyDeltas(subtask, partitioningDefinition.getOtherDeltas(masterTask));
applyDeltas(subtask, partitionsDefinition.getOtherDeltas(masterTask));
LOGGER.debug("Partitioned subtask to be created:\n{}", subtask.asPrismObject().debugDumpLazily());

return taskManager.addTask(subtask.asPrismObject(), opResult);
Expand All @@ -222,8 +222,8 @@ private boolean isCoordinator(TaskWorkManagementType workManagement) {
return workManagement != null && workManagement.getTaskKind() == TaskKindType.COORDINATOR;
}

private <T> T applyDefaults(Function<TaskPartitionDefinition, T> localGetter, Function<TaskPartitioningDefinition, T> globalGetter,
T defaultValue, TaskPartitionDefinition localDef, TaskPartitioningDefinition globalDef) {
private <T> T applyDefaults(Function<TaskPartitionDefinition, T> localGetter, Function<TaskPartitionsDefinition, T> globalGetter,
T defaultValue, TaskPartitionDefinition localDef, TaskPartitionsDefinition globalDef) {
T localValue = localGetter.apply(localDef);
if (localValue != null) {
return localValue;
Expand Down
Expand Up @@ -18,8 +18,8 @@

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.schema.util.TaskTypeUtil;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy.GetBucketResult.NothingFound;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy.GetBucketResult.NothingFound;
import com.evolveum.midpoint.util.exception.SchemaException;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.jetbrains.annotations.NotNull;
Expand All @@ -33,11 +33,11 @@
* @author mederly
*/
// <CNT extends AbstractWorkBucketContentType, CFG extends AbstractTaskWorkBucketsConfigurationType>
public abstract class BaseWorkBucketPartitioningStrategy implements WorkBucketPartitioningStrategy {
public abstract class BaseWorkSegmentationStrategy implements WorkSegmentationStrategy {

protected final PrismContext prismContext;

protected BaseWorkBucketPartitioningStrategy(PrismContext prismContext) {
protected BaseWorkSegmentationStrategy(PrismContext prismContext) {
this.prismContext = prismContext;
}

Expand Down
Expand Up @@ -51,7 +51,7 @@ public static Task findWorkerByBucketNumber(List<Task> workers, int sequentialNu
return null;
}

public static AbstractWorkSegmentationType getWorkBucketsConfiguration(TaskWorkManagementType cfg) {
public static AbstractWorkSegmentationType getWorkSegmentationConfiguration(TaskWorkManagementType cfg) {
if (cfg != null && cfg.getBuckets() != null) {
WorkBucketsManagementType buckets = cfg.getBuckets();
return MiscUtil.getFirstNonNull(
Expand Down
Expand Up @@ -33,14 +33,14 @@
import com.evolveum.midpoint.task.api.Task;
import com.evolveum.midpoint.task.api.TaskExecutionStatus;
import com.evolveum.midpoint.task.api.TaskManager;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.content.WorkBucketContentHandler;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.content.WorkBucketContentHandlerRegistry;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy.GetBucketResult;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy.GetBucketResult.FoundExisting;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy.GetBucketResult.NewBuckets;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkBucketPartitioningStrategy.GetBucketResult.NothingFound;
import com.evolveum.midpoint.task.quartzimpl.work.partitioning.WorkStateManagementStrategyFactory;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandler;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.content.WorkBucketContentHandlerRegistry;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy.GetBucketResult;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy.GetBucketResult.FoundExisting;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy.GetBucketResult.NewBuckets;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategy.GetBucketResult.NothingFound;
import com.evolveum.midpoint.task.quartzimpl.work.segmentation.WorkSegmentationStrategyFactory;
import com.evolveum.midpoint.util.exception.ObjectAlreadyExistsException;
import com.evolveum.midpoint.util.exception.ObjectNotFoundException;
import com.evolveum.midpoint.util.exception.SchemaException;
Expand Down Expand Up @@ -77,7 +77,7 @@ public class WorkStateManager {
@Autowired private TaskManager taskManager;
@Autowired private RepositoryService repositoryService;
@Autowired private PrismContext prismContext;
@Autowired private WorkStateManagementStrategyFactory strategyFactory;
@Autowired private WorkSegmentationStrategyFactory strategyFactory;
@Autowired private WorkBucketContentHandlerRegistry handlerFactory;

private static final int MAX_ATTEMPTS = 40; // temporary
Expand Down Expand Up @@ -166,7 +166,7 @@ private WorkBucketType findSelfAllocatedBucket(Context ctx) {
private WorkBucketType getWorkBucketMultiNode(Context ctx, long freeBucketWaitTime, OperationResult result)
throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException, InterruptedException {
long start = System.currentTimeMillis();
WorkBucketPartitioningStrategy workStateStrategy = strategyFactory.createStrategy(ctx.coordinatorTask.getWorkManagement());
WorkSegmentationStrategy workStateStrategy = strategyFactory.createStrategy(ctx.coordinatorTask.getWorkManagement());

waitForAvailableBucket: // this cycle exits when something is found OR when a definite 'no more buckets' answer is received
for (;;) {
Expand Down Expand Up @@ -284,7 +284,7 @@ private boolean reclaimWronglyAllocatedBuckets(Task coordinatorTask, OperationRe

private WorkBucketType getWorkBucketStandalone(Context ctx, OperationResult result)
throws SchemaException, ObjectAlreadyExistsException, ObjectNotFoundException {
WorkBucketPartitioningStrategy workStateStrategy = strategyFactory.createStrategy(ctx.workerTask.getWorkManagement());
WorkSegmentationStrategy workStateStrategy = strategyFactory.createStrategy(ctx.workerTask.getWorkManagement());
TaskWorkStateType workState = getWorkStateOrNew(ctx.workerTask.getTaskPrismObject());
GetBucketResult response = workStateStrategy.getBucket(workState);
LOGGER.trace("getWorkBucketStandalone: workStateStrategy returned {} for standalone task {}", response, ctx.workerTask);
Expand Down Expand Up @@ -542,7 +542,7 @@ public ObjectQuery narrowQueryForWorkBucket(Task workerTask, ObjectQuery query,
Context ctx = createContext(workerTask.getOid(), () -> true, result);

TaskWorkManagementType config = ctx.getWorkStateConfiguration();
AbstractWorkSegmentationType bucketsConfig = WorkBucketUtil.getWorkBucketsConfiguration(config);
AbstractWorkSegmentationType bucketsConfig = WorkBucketUtil.getWorkSegmentationConfiguration(config);
WorkBucketContentHandler handler = handlerFactory.getHandler(workBucket.getContent());
List<ObjectFilter> conjunctionMembers = new ArrayList<>(
handler.createSpecificFilters(workBucket, bucketsConfig, type, itemDefinitionProvider));
Expand Down
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

package com.evolveum.midpoint.task.quartzimpl.work.partitioning;
package com.evolveum.midpoint.task.quartzimpl.work.segmentation;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.schema.util.TaskTypeUtil;
import com.evolveum.midpoint.task.quartzimpl.work.BaseWorkBucketPartitioningStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.BaseWorkSegmentationStrategy;
import com.evolveum.midpoint.task.quartzimpl.work.WorkBucketUtil;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;
import org.jetbrains.annotations.NotNull;
Expand All @@ -33,17 +33,17 @@
*
* @author mederly
*/
public class EnumeratedWorkBucketPartitioningStrategy extends BaseWorkBucketPartitioningStrategy {
public class ExplicitWorkSegmentationStrategy extends BaseWorkSegmentationStrategy {

@NotNull private final TaskWorkManagementType configuration;
@NotNull private final ExplicitWorkSegmentationType bucketsConfiguration;

public EnumeratedWorkBucketPartitioningStrategy(@NotNull TaskWorkManagementType configuration,
public ExplicitWorkSegmentationStrategy(@NotNull TaskWorkManagementType configuration,
PrismContext prismContext) {
super(prismContext);
this.configuration = configuration;
this.bucketsConfiguration = (ExplicitWorkSegmentationType)
WorkBucketUtil.getWorkBucketsConfiguration(configuration);
WorkBucketUtil.getWorkSegmentationConfiguration(configuration);
}

@NotNull
Expand Down

0 comments on commit b9ddfeb

Please sign in to comment.