Skip to content

Commit

Permalink
Provide TaskPerformanceInformation
Browse files Browse the repository at this point in the history
This is very quick-and-dirty attempt to provide easy access to the
task performance information.
  • Loading branch information
mederly committed Mar 19, 2021
1 parent 20374ee commit 350e587
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 24 deletions.
Expand Up @@ -7,28 +7,68 @@

package com.evolveum.midpoint.schema.util.task;

import java.io.Serializable;
import java.util.Locale;
import javax.xml.datatype.XMLGregorianCalendar;

import com.google.common.annotations.VisibleForTesting;
import org.jetbrains.annotations.NotNull;

import com.evolveum.midpoint.util.DebugDumpable;
import com.evolveum.midpoint.util.DebugUtil;
import com.evolveum.midpoint.util.annotation.Experimental;
import com.evolveum.midpoint.xml.ns._public.common.common_3.IterativeTaskPartItemsProcessingInformationType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.OperationStatsType;
import com.evolveum.midpoint.xml.ns._public.common.common_3.StructuredTaskProgressType;

import javax.xml.datatype.XMLGregorianCalendar;

import static java.util.Collections.emptyList;

/**
* Extract of the most relevant performance information about a task part.
*/
public class TaskPartPerformanceInformation {
@Experimental
public class TaskPartPerformanceInformation implements DebugDumpable, Serializable {

/**
* FIXME Sometimes we put handler URI here. That is not correct.
*/
private final String partUri;

/**
* Items processed. Good indicator for avg wall clock time and throughput.
*/
private final int itemsProcessed;

/**
* Number of errors. Included only because we traditionally displayed it in log messages.
*/
private final int errors;

/**
* Real progress. This is less than items processed. Does not include duplicate processing.
*/
private final int progress;

/**
* The sum of time spent processing individual items. For multithreaded execution each thread counts separately.
* Does not include pre-processing. So this is actually not very helpful indicator of the performance.
* But we include it here for completeness.
*/
private final double processingTime;

/**
* Wall-clock time spent in processing this task part. Relevant for performance determination and ETA computation.
* Includes time from all known executions of this tasks.
*/
private final Long wallClockTime;

/**
* Earliest known time when this part execution started. Just for testing/debugging reasons.
*/
@VisibleForTesting
private final XMLGregorianCalendar earliestStartTime;

private TaskPartPerformanceInformation(int itemsProcessed, int errors, int progress, double processingTime,
private TaskPartPerformanceInformation(String partUri, int itemsProcessed, int errors, int progress, double processingTime,
Long wallClockTime, XMLGregorianCalendar earliestStartTime) {
this.partUri = partUri;
this.itemsProcessed = itemsProcessed;
this.errors = errors;
this.progress = progress;
Expand All @@ -37,27 +77,38 @@ private TaskPartPerformanceInformation(int itemsProcessed, int errors, int progr
this.earliestStartTime = earliestStartTime;
}

public static TaskPartPerformanceInformation forCurrentPart(OperationStatsType operationStats,
@NotNull
public static TaskPartPerformanceInformation forPart(@NotNull IterativeTaskPartItemsProcessingInformationType info,
StructuredTaskProgressType structuredProgress) {

IterativeTaskPartItemsProcessingInformationType info = TaskOperationStatsUtil
.getIterativeInfoForCurrentPart(operationStats, structuredProgress);
String partUri = info.getPartUri();

int itemsProcessed = TaskOperationStatsUtil.getItemsProcessed(info);
int errors = TaskOperationStatsUtil.getErrors(info);
int progress = TaskProgressUtil.getTotalProgressForCurrentPart(structuredProgress);
int progress = TaskProgressUtil.getTotalProgressForPart(structuredProgress, partUri);
double processingTime = TaskOperationStatsUtil.getProcessingTime(info);

WallClockTimeComputer wallClockTimeComputer =
new WallClockTimeComputer(info != null ? info.getExecution() : emptyList());
WallClockTimeComputer wallClockTimeComputer = new WallClockTimeComputer(info.getExecution());

long wallClockTime = wallClockTimeComputer.getSummaryTime();
XMLGregorianCalendar earliestStartTime = wallClockTimeComputer.getEarliestStartTime();

return new TaskPartPerformanceInformation(itemsProcessed, errors, progress, processingTime,
return new TaskPartPerformanceInformation(partUri, itemsProcessed, errors, progress, processingTime,
wallClockTime, earliestStartTime);
}

@NotNull
public static TaskPartPerformanceInformation forCurrentPart(OperationStatsType operationStats,
StructuredTaskProgressType structuredProgress) {
IterativeTaskPartItemsProcessingInformationType info = TaskOperationStatsUtil
.getIterativeInfoForCurrentPart(operationStats, structuredProgress);
return forPart(info, structuredProgress);
}

public String getPartUri() {
return partUri;
}

public int getItemsProcessed() {
return itemsProcessed;
}
Expand Down Expand Up @@ -106,4 +157,41 @@ public Double getThroughput() {
public XMLGregorianCalendar getEarliestStartTime() {
return earliestStartTime;
}

@Override
public String toString() {
return "TaskPartPerformanceInformation{" +
"URI=" + partUri +
", itemsProcessed=" + itemsProcessed +
", errors=" + errors +
", progress=" + progress +
", processingTime=" + processingTime +
", wallClockTime=" + wallClockTime +
", earliestStartTime=" + earliestStartTime +
'}';
}

public String toHumanReadableString() {
Double throughput = getThroughput();
if (throughput != null) {
return String.format(Locale.US, "Throughput: %,.1f per minute (%,d items, %,.1f ms per item)",
throughput, getItemsProcessed(), getAverageWallClockTime());
} else {
return "No information";
}
}

@Override
public String debugDump(int indent) {
StringBuilder sb = new StringBuilder();
DebugUtil.debugDumpWithLabelLn(sb, "URI", partUri, indent);
DebugUtil.debugDumpWithLabelLn(sb, "Items processed", itemsProcessed, indent);
DebugUtil.debugDumpWithLabelLn(sb, "Errors", errors, indent);
DebugUtil.debugDumpWithLabelLn(sb, "Progress", progress, indent);
DebugUtil.debugDumpWithLabelLn(sb, "Processing time", String.valueOf(processingTime), indent);
DebugUtil.debugDumpWithLabelLn(sb, "Wall clock time", wallClockTime, indent);
DebugUtil.debugDumpWithLabelLn(sb, "Earliest start time", String.valueOf(earliestStartTime), indent);
DebugUtil.debugDumpWithLabel(sb, "Summary", toHumanReadableString(), indent);
return sb.toString();
}
}
@@ -0,0 +1,122 @@
/*
* Copyright (C) 2010-2021 Evolveum and contributors
*
* This work is dual-licensed under the Apache License 2.0
* and European Union Public License. See LICENSE file for details.
*/

package com.evolveum.midpoint.schema.util.task;

import static java.util.Objects.requireNonNull;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.util.annotation.Experimental;

import com.evolveum.midpoint.util.logging.Trace;
import com.evolveum.midpoint.util.logging.TraceManager;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;

import org.jetbrains.annotations.NotNull;

import com.evolveum.midpoint.util.DebugDumpable;
import com.evolveum.midpoint.util.DebugUtil;

/**
* Quickly hacked "API" presenting task performance information.
*
* Use with care. Will change in next midPoint release.
*
* TODO deduplicate with {@link TaskProgressInformation}.
*/
@Experimental
public class TaskPerformanceInformation implements DebugDumpable, Serializable {

private static final Trace LOGGER = TraceManager.getTrace(TaskPerformanceInformation.class);

/**
* Information on the progress in individual parts. Indexed by part URI.
*/
private final Map<String, TaskPartPerformanceInformation> parts = new HashMap<>();

private TaskPerformanceInformation() {
}

/**
* Precondition: the task contains fully retrieved and resolved subtasks.
*/
public static TaskPerformanceInformation fromTaskTree(TaskType task) {
if (TaskWorkStateUtil.isPartitionedMaster(task)) {
return fromPartitionedMaster(task);
} else {
return fromOtherTask(task);
}
}

@NotNull
private static TaskPerformanceInformation fromPartitionedMaster(TaskType task) {
Map<String, TaskPartPerformanceInformation> partsByUri = new HashMap<>();
List<ObjectReferenceType> subtasks = task.getSubtaskRef();

for (ObjectReferenceType subtaskRef : subtasks) {
TaskType subtask = (TaskType)
requireNonNull(subtaskRef.asReferenceValue().getObject(),
() -> "Task " + task + " has unresolved subtask: " + subtaskRef)
.asObjectable();

TaskPerformanceInformation subInfo = fromOtherTask(subtask);
if (subInfo.parts.size() > 1) {
LOGGER.warn("Partitioned task has more than one part - ignoring: {}\n{}", subtask, subInfo.parts);
} else {
partsByUri.putAll(subInfo.parts);
}
}

TaskPerformanceInformation info = new TaskPerformanceInformation();
info.parts.putAll(partsByUri);
return info;
}

/**
* Not a partitioned master.
*/
@NotNull
private static TaskPerformanceInformation fromOtherTask(@NotNull TaskType task) {
TaskPerformanceInformation info = new TaskPerformanceInformation();
StructuredTaskProgressType progress = TaskProgressUtil.getStructuredProgressFromTree(task);
OperationStatsType operationStats = TaskOperationStatsUtil.getOperationStatsFromTree(task, PrismContext.get());
if (operationStats != null && operationStats.getIterativeTaskInformation() != null) {
for (IterativeTaskPartItemsProcessingInformationType part : operationStats.getIterativeTaskInformation().getPart()) {
info.addPart(TaskPartPerformanceInformation.forPart(part, progress));
}
}
return info;
}

private void addPart(TaskPartPerformanceInformation part) {
parts.put(part.getPartUri(), part);
}

public Map<String, TaskPartPerformanceInformation> getParts() {
return parts;
}

@Override
public String toString() {
return "TaskPerformanceInformation{" +
"parts=" + parts +
'}';
}

@Override
public String debugDump(int indent) {
StringBuilder sb = new StringBuilder();
DebugUtil.debugDumpLabelLn(sb, getClass().getSimpleName(), indent);
DebugUtil.debugDumpWithLabel(sb, "Parts", parts, indent + 1);
return sb.toString();
}
}
Expand Up @@ -16,9 +16,12 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;

import com.evolveum.midpoint.prism.PrismContext;
import com.evolveum.midpoint.prism.xml.XmlTypeConverter;
import com.evolveum.midpoint.schema.statistics.OutcomeKeyedCounterTypeUtil;
import com.evolveum.midpoint.schema.statistics.*;
import com.evolveum.midpoint.util.annotation.Experimental;
import com.evolveum.midpoint.xml.ns._public.common.common_3.*;

/**
Expand Down Expand Up @@ -146,21 +149,34 @@ public static int getTotalProgressOpen(StructuredTaskProgressType progress) {
return getCounts(progress.getPart(), c -> true, true);
}

public static TaskPartProgressType getCurrentPart(StructuredTaskProgressType progress) {
public static TaskPartProgressType getForCurrentPart(StructuredTaskProgressType progress) {
if (progress == null) {
return null;
} else {
return getForPart(progress, progress.getCurrentPartUri());
}
}

public static TaskPartProgressType getForPart(StructuredTaskProgressType progress, String partUri) {
if (progress == null) {
return null;
} else {
return progress.getPart().stream()
.filter(part -> Objects.equals(part.getPartUri(), progress.getCurrentPartUri()))
.filter(part -> Objects.equals(part.getPartUri(), partUri))
.findAny().orElse(null);
}
}

public static int getTotalProgressForCurrentPart(StructuredTaskProgressType progress) {
TaskPartProgressType currentPart = getCurrentPart(progress);
TaskPartProgressType currentPart = getForCurrentPart(progress);
return currentPart != null ? getTotalProgress(currentPart) : 0;
}

public static int getTotalProgressForPart(StructuredTaskProgressType progress, String partUri) {
TaskPartProgressType forPart = getForPart(progress, partUri);
return forPart != null ? getTotalProgress(forPart) : 0;
}

/**
* Returns a value suitable for storing in task.progress property.
*/
Expand All @@ -171,4 +187,17 @@ public static long getTotalProgress(StructuredTaskProgressType progress) {
public static String getCurrentPartUri(StructuredTaskProgressType structuredTaskProgress) {
return structuredTaskProgress != null ? structuredTaskProgress.getCurrentPartUri() : null;
}

@Experimental
public static StructuredTaskProgressType getStructuredProgressFromTree(TaskType task) {
StructuredTaskProgressType aggregate = new StructuredTaskProgressType(PrismContext.get());
Stream<TaskType> subTasks = TaskTreeUtil.getAllTasksStream(task);
subTasks.forEach(subTask -> {
StructuredTaskProgressType progress = subTask.getStructuredProgress();
if (progress != null) {
StructuredTaskProgress.addTo(aggregate, progress);
}
});
return aggregate;
}
}

0 comments on commit 350e587

Please sign in to comment.