Skip to content

Commit

Permalink
Merge pull request #678 from conveyal/paths-times-freeform
Browse files Browse the repository at this point in the history
Cleanup of paths, freeform, and OD matrix functionality
  • Loading branch information
abyrd committed Feb 25, 2021
2 parents e2843f7 + bad3aa5 commit c205c44
Show file tree
Hide file tree
Showing 31 changed files with 696 additions and 430 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker
/**
* Enqueue a set of tasks for a regional analysis.
* Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job.
* We pass in the group and user only to tag any newly created workers. This should probably be done in the caller.
*/
public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) {

Expand All @@ -169,9 +168,11 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn

// Register the regional job so results received from multiple workers can be assembled into one file.
// TODO encapsulate MultiOriginAssemblers in a new Component
MultiOriginAssembler assembler =
new MultiOriginAssembler(regionalAnalysis, job, config.resultsBucket(), fileStorage);

// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(
regionalAnalysis, job, config.resultsBucket(), fileStorage
);
resultAssemblers.put(templateTask.jobId, assembler);

if (config.testTaskRedelivery()) {
Expand Down Expand Up @@ -211,6 +212,8 @@ private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regional
try {
File localScenario = FileUtils.createScratchFile("json");
JsonUtil.objectMapper.writeValue(localScenario, scenario);
// FIXME this is using a network service in a method called from a synchronized broker method.
// Move file into storage before entering the synchronized block.
fileStorage.moveIntoStorage(fileStorageKey, localScenario);
} catch (IOException e) {
LOG.error("Error storing scenario for retrieval by workers.", e);
Expand Down Expand Up @@ -468,7 +471,7 @@ private void requestExtraWorkersIfAppropriate(Job job) {
// TODO more refined determination of number of workers to start (e.g. using tasks per minute)
int targetWorkerTotal = Math.min(MAX_WORKERS_PER_CATEGORY, job.nTasksTotal / TARGET_TASKS_PER_WORKER);
// Guardrail until freeform pointsets are tested more thoroughly
if (job.originPointSet != null) targetWorkerTotal = Math.min(targetWorkerTotal, 5);
if (job.templateTask.originPointSet != null) targetWorkerTotal = Math.min(targetWorkerTotal, 5);
int nSpot = targetWorkerTotal - categoryWorkersAlreadyRunning;
createWorkersInCategory(job.workerCategory, job.workerTags, 0, nSpot);
}
Expand All @@ -491,7 +494,7 @@ public File getPartialRegionalAnalysisResults (String jobId) {
if (resultAssembler == null) {
return null;
} else {
return resultAssembler.getGridBufferFile();
return null; // Was: resultAssembler.getGridBufferFile(); TODO implement fetching partially completed?
}
}

Expand Down
28 changes: 9 additions & 19 deletions src/main/java/com/conveyal/analysis/components/broker/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.BitSet;
import java.util.List;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* A Job is a collection of tasks that represent all the origins in a regional analysis. All the
* tasks in a Job must have the same network ID and be run against the same R5 version on the workers.
Expand Down Expand Up @@ -69,15 +71,6 @@ public class Job {
/** Every task in this job will be based on this template task, but have its origin coordinates changed. */
public final RegionalTask templateTask;

/**
* If non-null, this specifies the non-gridded (freeform) origin points of this regional
* analysis. If null, the origin points are specified implicitly by web mercator dimensions of
* the template task. Ideally we'd always have a PointSet here and use polymorphism to get the
* lat and lon coordinates of each point, whether it's a grid or freeform.
* FIXME really we should not have the destinationPointSet in the RegionalTask, but originPointSet here.
*/
public final FreeFormPointSet originPointSet;

/**
* The only thing that changes from one task to the next is the origin coordinates. If this job
* does not include an originPointSet, derive these coordinates from the web mercator grid
Expand All @@ -92,17 +85,18 @@ public class Job {
private RegionalTask makeOneTask (int taskNumber) {
RegionalTask task = templateTask.clone();
task.taskId = taskNumber;
if (originPointSet == null) {
if (task.originPointSet == null) {
// Origins specified implicitly by web mercator dimensions of task
int x = taskNumber % templateTask.width;
int y = taskNumber / templateTask.width;
task.fromLat = Grid.pixelToCenterLat(task.north + y, task.zoom);
task.fromLon = Grid.pixelToCenterLon(task.west + x, task.zoom);
} else {
// Look up coordinates and originId from job's originPointSet
task.originId = originPointSet.getId(taskNumber);
task.fromLat = originPointSet.getLat(taskNumber);
task.fromLon = originPointSet.getLon(taskNumber);
// saving them in non-transient fields for transmission to the worker.
task.originId = task.originPointSet.getId(taskNumber);
task.fromLat = task.originPointSet.getLat(taskNumber);
task.fromLon = task.originPointSet.getLon(taskNumber);
}
return task;
}
Expand Down Expand Up @@ -135,13 +129,9 @@ public Job (RegionalTask templateTask, WorkerTags workerTags) {
this.nextTaskToDeliver = 0;

if (templateTask.originPointSetKey != null) {
// If an originPointSetKey is specified, get it from S3 and set the number of origins
// FIXME we really shouldn't call network services in a constructor, especially when used in a synchronized
// method on the Broker. However this is only triggered by experimental FreeFormPointSet code.
originPointSet = PointSetCache.readFreeFormFromFileStore(templateTask.originPointSetKey);
this.nTasksTotal = originPointSet.featureCount();
checkNotNull(templateTask.originPointSet);
this.nTasksTotal = templateTask.originPointSet.featureCount();
} else {
originPointSet = null;
this.nTasksTotal = templateTask.width * templateTask.height;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private boolean deleteFile(Request req, Response res) {
*/
private String generateDownloadURL(Request req, Response res) {
FileInfo file = fileCollection.findPermittedByRequestParamId(req, res);
res.type("text");
res.type("text/plain");
return fileStorage.getURL(file.getKey());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import com.conveyal.analysis.models.Project;
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.results.CsvResultWriter;
import com.conveyal.analysis.results.CsvResultWriter.Result;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.r5.analyst.Grid;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.google.common.primitives.Ints;
import com.mongodb.QueryBuilder;
Expand Down Expand Up @@ -331,7 +332,8 @@ private Object getRegionalResults (Request req, Response res) throws IOException

private String getCsvResults (Request req, Response res) {
final String regionalAnalysisId = req.params("_id");
final Result resultType = Result.valueOf(req.params("resultType").toUpperCase());
final CsvResultType resultType = CsvResultType.valueOf(req.params("resultType").toUpperCase());
// If the resultType parameter received on the API is unrecognized, valueOf throws IllegalArgumentException

RegionalAnalysis analysis = Persistence.regionalAnalyses.findPermitted(
QueryBuilder.start("_id").is(regionalAnalysisId).get(),
Expand All @@ -343,23 +345,14 @@ private String getCsvResults (Request req, Response res) {
throw AnalysisServerException.notFound("The specified analysis is unknown, incomplete, or deleted.");
}

if (resultType == Result.ACCESS && !analysis.request.recordAccessibility) {
throw AnalysisServerException.notFound("Accessibility results were not recorded for this analysis");
String storageKey = analysis.resultStorage.get(resultType);
if (storageKey == null) {
throw AnalysisServerException.notFound("This regional analysis does not contain CSV results of type " + resultType);
}

if (resultType == Result.TIMES && !analysis.request.recordTimes) {
throw AnalysisServerException.notFound("Travel time results were not recorded for this analysis");
}

if (resultType == Result.PATHS && !analysis.request.includePathResults) {
throw AnalysisServerException.notFound("Path results were not recorded for this analysis");
}
FileStorageKey fileStorageKey = new FileStorageKey(config.resultsBucket(), storageKey);

// TODO result path is stored in model, use that?
String key = regionalAnalysisId + '_' + resultType + ".csv.gz";
FileStorageKey fileStorageKey = new FileStorageKey(config.resultsBucket(), key);

res.type("text");
res.type("text/plain");
return fileStorage.getURL(fileStorageKey);
}

Expand Down Expand Up @@ -438,10 +431,21 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro
task.validatePercentiles();
}

// Set the origin pointset if one is specified.
// Set the origin pointset key if an ID is specified. Currently this will always be a freeform pointset.
// Also load this freeform origin pointset instance itself, so broker can see point coordinates, ids etc.
if (analysisRequest.originPointSetId != null) {
task.originPointSetKey = Persistence.opportunityDatasets
.findByIdIfPermitted(analysisRequest.originPointSetId, accessGroup).storageLocation();
task.originPointSet = PointSetCache.readFreeFormFromFileStore(task.originPointSetKey);
}

// If our destinations are freeform, pre-load the destination pointset on the backend.
// This allows MultiOriginAssembler to know the number of points, and in one-to-one mode to look up their IDs.
if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) {
checkArgument(task.destinationPointSetKeys.length == 1);
task.destinationPointSets = new PointSet[] {
PointSetCache.readFreeFormFromFileStore(task.destinationPointSetKeys[0])
};
}

task.oneToOne = analysisRequest.oneToOne;
Expand Down Expand Up @@ -509,16 +513,18 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro
task.cutoffsMinutes = regionalAnalysis.cutoffsMinutes;
task.percentiles = regionalAnalysis.travelTimePercentiles;

// Persist this newly created RegionalAnalysis to Mongo, which assigns it an id and creation/update time stamps.
// Persist this newly created RegionalAnalysis to Mongo.
// This assigns it creation/update time stamps and an ID, which is needed to name any output CSV files.
regionalAnalysis = Persistence.regionalAnalyses.create(regionalAnalysis);
if (analysisRequest.recordTimes) regionalAnalysis.addCsvStoragePath(Result.TIMES, config.resultsBucket());
if (analysisRequest.recordPaths) regionalAnalysis.addCsvStoragePath(Result.PATHS, config.resultsBucket());
if (analysisRequest.recordAccessibility) regionalAnalysis.addCsvStoragePath(Result.ACCESS, config.resultsBucket());
Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis);

// Register the regional job with the broker, which will distribute individual tasks to workers and track progress.
broker.enqueueTasksForRegionalJob(regionalAnalysis);

// Flush to the database any information added to the RegionalAnalysis object when it was enqueued.
// This includes the paths of any CSV files that will be produced by this analysis.
// TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock() or put().
Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis);

return regionalAnalysis;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public String getType() {

public String[] routes;

/** At least one trip from each pattern to modify. Does not select single trips, only whole patterns. */
public String[] trips;

/** array of [from stop, to stop] specifying single hops this should be applied to */
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/com/conveyal/analysis/models/RegionalAnalysis.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.conveyal.analysis.models;

import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.results.CsvResultWriter;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import org.locationtech.jts.geom.Geometry;

Expand Down Expand Up @@ -76,12 +76,22 @@ public class RegionalAnalysis extends Model implements Cloneable {
/** Has this analysis been (soft) deleted? */
public boolean deleted;

public Map<CsvResultWriter.Result, String> resultStorage = new HashMap<>();

public void addCsvStoragePath (CsvResultWriter.Result resultType, String outputBucket) {
// TODO less fragile path, consistent gzip behavior
resultStorage.put(resultType, outputBucket + "/" + this._id + "_" + resultType + ".csv.gz");
}
/**
* Storage locations of supplemental regional analysis results intended for export (rather than direct UI display).
* A map from result type (times, paths, or access) to the file name. This could conceivably be a more structured
* Java type rather than a String-keyed map, but for now we want to maintain flexibility for new result types.
*
* These CSV regional results are meant by design to be downloaded as files by end users. We store their filenames
* to facilitate download via the UI without having to replicate any backend logic that generates filenames from
* analysis characteristics.
*
* This stands in opposition to our original regional analysis results: grids of accessibility for different travel
* times and percentiles. These are fetched by the backend and returned to the UI on demand. Those file names are
* derived from the ID of the regional analysis and other details. The backend has all that naming logic in one
* place because these files are not meant for direct download by the end user, so the UI doesn't need to replicate
* any of that logic.
*/
public Map<CsvResultType, String> resultStorage = new HashMap<>();

public RegionalAnalysis clone () {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ public V put(V value, DBObject optionalQuery) {

/**
* Insert without updating the nonce or updateBy/updatedAt
* @return
*/
public V modifiyWithoutUpdatingLock (V value) {
wrappedCollection.updateById(value._id, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileStorage;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class AccessCsvResultWriter extends CsvResultWriter {

public AccessCsvResultWriter (RegionalTask task, String outputBucket, FileStorage fileStorage) throws IOException {
super(task, outputBucket, fileStorage);
}

@Override
public CsvResultType resultType () {
return CsvResultType.ACCESS;
}

@Override
public String[] columnHeaders () {
// We could potentially make the percentile and cutoff columns optional, but it's unnecessary complexity.
return new String[] { "origin", "destinations", "percentile", "cutoff", "access" };
}

/**
* Check that each dimension of the 3D accessibility results array
* matches the expected size for the job being processed.
*/
@Override
protected void checkDimension (RegionalWorkResult workResult) {
checkDimension(workResult, "destination pointsets", workResult.accessibilityValues.length, task.destinationPointSetKeys.length);
for (int[][] percentilesForGrid : workResult.accessibilityValues) {
checkDimension(workResult, "percentiles", percentilesForGrid.length, task.percentiles.length);
for (int[] cutoffsForPercentile : percentilesForGrid) {
checkDimension(workResult, "cutoffs", cutoffsForPercentile.length, task.cutoffsMinutes.length);
}
}
}

@Override
public Iterable<String[]> rowValues (RegionalWorkResult workResult) {
String originId = task.originPointSet.getId(workResult.taskId);
List<String[]> rows = new ArrayList<>();
for (int d = 0; d < task.destinationPointSetKeys.length; d++) {
int[][] percentilesForDestPointset = workResult.accessibilityValues[d];
for (int p = 0; p < task.percentiles.length; p++) {
int[] cutoffsForPercentile = percentilesForDestPointset[p];
for (int c = 0; c < task.cutoffsMinutes.length; c++) {
rows.add(new String[] {
originId,
task.destinationPointSetKeys[d],
Integer.toString(task.percentiles[p]),
Integer.toString(task.cutoffsMinutes[c]),
Integer.toString(workResult.accessibilityValues[d][p][c])
});
}
}
}
return rows;
}

}
Loading

0 comments on commit c205c44

Please sign in to comment.