Skip to content

Commit

Permalink
feat(Task): flesh out task management concept / prototype for reference
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Aug 15, 2019
1 parent 7c9450b commit 10e7943
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 2 deletions.
203 changes: 201 additions & 2 deletions src/main/java/com/conveyal/r5/analyst/progress/Task.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,208 @@
package com.conveyal.r5.analyst.progress;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

public class Task {
/**
* This is a draft for a more advanced task progress system. It is not yet complete or functional.
*
* A Task (or some interface that it implements) could be used by the AsyncLoader to track progress. Together with some
* AsyncLoader functionality it will be a bit like a Future with progress reporting. Use of AsyncLoader could then be
* generalized to building TranportNetworks, EgressCostTables, etc. We don't want to run too far with lazy Async loading
* though, since the end goal is for users to trigger builds manually.
*
* Each task should be able to have subtasks, each of which represents an expected percentage of the parent task.
* Or an expected number of "relative work units" if new sub-tasks will be added on the fly, so absolutes are not known.
* Tasks should implement ProgressListener functionality, and bubble up progress to parent tasks.
*
* This class serves simultaneously as an internal domain object for tracking task execution, and an external API model
* object for communicating relevant information to the web UI (when serialized to JSON).
*/
public class Task implements Runnable, ProgressListener {

// User and group are only relevant on the backend. On workers, we want to show network or cost table build progress
// to everyone in the organization, even if someone else's action kicked off the process.

private String user;

private String group;

// Timestamps to track execution time and wait time.

private Instant enqueued;

private Instant began;

private Instant completed;

private int bubbleUpdateFrequency = 0;

private int logFrequency = 0;

public String description;

public int totalWorkUnits;

public int currentWorkUnit;

/** To be on the safe side all tasks are considered heavyweight unless we explicitly set them to be lightweight. */
private boolean isHeavy = true;

/** Like a runnable, encapsulating the actual work that this task will perform. */
private TaskAction action;

// For operations that may perform a lot of fast copies?
// Maybe instead we should just always pre-calculate how many fast copies will happen, log that before even starting,
// and only track progress on the slow ones.
public int nWorkUnitsSkipped;

public boolean errored;

private Throwable throwable;

/** How often (every how many work units) this task will log progress on the backend. */
private int loggingFrequency;

@JsonIgnore
private Task parentTask;

// Maybe TObjectIntMap<Task> to store subtask weights.
public List<Task> subTasks;

// For non-hierarchical chaining? This may be needed even if our executor is backed by a queue,
// to prevent simultaneous execution of sequential tasks.
public Task nextTask;

public double getPercentComplete() {
return (currentWorkUnit * 100D) / totalWorkUnits;
}

List<Task> subtasks = new ArrayList<>();

/**
* Private constructor to encourage use of fluent methods.
*/
private Task () {}

public void addSubtask (Task subtask) {

}

// Because of the subtask / next task mechanism, we don't let the actions mark tasks complete.
// This is another reason to pass the actions only a limited progress reporting interface.
private void markComplete () {
this.completed = Instant.now();
}

public boolean isHeavy () {
return this.isHeavy;
}

/**
* Abort the current task and cancel any subtasks or following tasks.
* @param throwable the reason for aborting this task.
*/
public void abort (Throwable throwable) {
this.throwable = throwable;
// LOG?
this.markComplete();
}

protected void bubbleUpProgress() {
// weight progress of each subtask
double totalWeight = 0;
double totalProgress = 0;
for (Task task : subtasks) {
// Task weight could also be a property of the task itself.
double weight = 1; // fetch weight
totalWeight += weight;
double weightedProgress = (task.currentWorkUnit * weight) / task.totalWorkUnits;
totalProgress += weightedProgress;
}
totalProgress /= totalWeight;
// do something with total progress...
parentTask.bubbleUpProgress();
}

/**
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
*/
public void validate () {
if (this.user == null) {
throw new AssertionError("Task must have a defined user.");
}
// etc.
}

@Override
public void run () {
// The main action is run before the subtasks. It may not make sense progress reporting-wise for tasks to have
// both their own actions and subtasks with their own actions. Perhaps series of tasks are a special kind of
// action, which should encapsulate the bubble-up progress computation.
this.action.action(this);
for (Task subtask : subtasks) {
subtask.run();
}
this.markComplete();
this.nextTask.run();
}

@Override
public void beginTask(String description, int totalElements) {
// Just using an existing interface that may eventually be modified to not include this method.
throw new UnsupportedOperationException();
}

@Override
public void increment () {
this.currentWorkUnit += 1;
// Occasionally bubble up progress to parent tasks, log to console, etc.
if (this.bubbleUpdateFrequency > 0 && (currentWorkUnit % bubbleUpdateFrequency == 0)) {
parentTask.bubbleUpProgress();
}
if (this.logFrequency > 0 && (currentWorkUnit % logFrequency == 0)) {
// LOG.info...
}
}

// FLUENT METHODS FOR CONFIGURING

// Really we should make a User object that combines user and group fields.
public Task forUser (String user, String group) {
this.user = user;
this.group = group;
return this;
}

public Task withDescription (String description) {
this.description = description;
return this;
}

/**
* We may actually want the TaskAction to set the total work units via its restricted ProgressReporter interface.
*/
public Task withTotalWorkUnits (int totalWorkUnits) {
this.totalWorkUnits = totalWorkUnits;
this.bubbleUpdateFrequency = totalWorkUnits / 100;
return this;
}

public Task withAction (TaskAction action) {
this.action = action;
return this;
}

public Task setHeavy (boolean heavy) {
this.isHeavy = heavy;
return this;
}

List<Task> subtasks;
public static Task newTask () {
return new Task();
}

}
18 changes: 18 additions & 0 deletions src/main/java/com/conveyal/r5/analyst/progress/TaskAction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.conveyal.r5.analyst.progress;

/**
* This represents the work actually carried out by a Task.
* It's a single-method interface so it can be defined with lambda functions, or other objects can implement it.
* When the action is run, it will receive an object implementing an interface through which it can report progress
* and errors.
*/
public interface TaskAction {

/**
* This method will define an asynchronous action to take.
* The parameter is a simpler interface of Task that only allows progress reporting, to encapsulate actions and
* prevent them from seeing or modifying the task hierarchy that triggers and manages them.
*/
public void action (ProgressListener progressListener);

}
66 changes: 66 additions & 0 deletions src/main/java/com/conveyal/r5/analyst/progress/TaskExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.conveyal.r5.analyst.progress;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* This could replace our current bare ExecutorServices class. This can remain all-static for simplicity at first.
* Like many other things in Analysis it should probably become a singleton instance (though this is a pure question of
* style for the moment, as we never have more than one instance running in the same JVM).
*
* This executor is in R5 rather than analysis-backend, so does not now have access to AnalysisServerConfig.lightThreads
* and AnalysisServerConfig.heavyThreads config options. Pool sizes would need to be initialized manually at startup.
* There might not be any reason for it to be in R5 if it's entirely managing backend tasks. But we do expect to at
* least manage and report on progress building networks and distance tables, which needs to happen in a specific
* versioned worker instance.
*
* This is moving in the direction of having a single unified task management and reporting system across the backend
* and workers. It could be interesting to gather task status from the whole cluster of workers and merge them together
* into one view. This could conceivably even include regional analyses and chunks of work for regional analyses on
* workers. But even if such merging doesn't occur, it will be convenient to always report progress from backend and
* workers to the UI in the same data structures.
*/
public abstract class TaskExecutor {

private static final ExecutorService light = Executors.newFixedThreadPool(10);
private static final ExecutorService heavy = Executors.newFixedThreadPool(2);

public static void enqueue (Task task) {
task.validate();
if (task.isHeavy()) {
heavy.submit(task);
} else {
light.submit(task);
}
}

/**
* Just demonstrating how this would be used.
*/
public static void example () {
TaskExecutor.enqueue(Task.newTask()
.forUser("abyrd@conveyal.com", "conveyal")
.withDescription("Process some complicated things")
.withTotalWorkUnits(1024)
.withAction((progressListener -> {
double sum = 0;
for (int i = 0; i < 1024; i++) {
sum += Math.sqrt(i);
progressListener.increment();
}
}))
);
}

/**
* @return a hierarchical structure of all currently executing tasks, for serialization and transmission to the UI.
*/
public static List<Task> getAllTasksForUI () {
// Hmm, we need to get all the tasks back out of the Executor once they're seen as Runnables...
return Lists.newArrayList();
}

}

0 comments on commit 10e7943

Please sign in to comment.