Skip to content

Commit

Permalink
schedule creation of large zip files in background
Browse files Browse the repository at this point in the history
also added some more debug level logging to observe speed
  • Loading branch information
abyrd committed Apr 11, 2024
1 parent e30d765 commit 91aa0e9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.SelectingGridReducer;
import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.TaskScheduler;
import com.conveyal.analysis.components.broker.Broker;
import com.conveyal.analysis.components.broker.JobStatus;
import com.conveyal.analysis.models.AnalysisRequest;
Expand All @@ -11,6 +12,7 @@
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
Expand All @@ -22,6 +24,7 @@
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.progress.Task;
import com.google.common.primitives.Ints;
import com.mongodb.QueryBuilder;
import gnu.trove.list.array.TIntArrayList;
Expand All @@ -36,6 +39,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand All @@ -45,9 +49,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;

import static com.conveyal.analysis.util.JsonUtil.toJson;
Expand All @@ -60,6 +67,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN;

/**
* Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching
Expand All @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController {

private final Broker broker;
private final FileStorage fileStorage;
private final TaskScheduler taskScheduler;

public RegionalAnalysisController (Broker broker, FileStorage fileStorage) {
public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) {
this.broker = broker;
this.fileStorage = fileStorage;
this.taskScheduler = taskScheduler;
}

private Collection<RegionalAnalysis> getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) {
Expand Down Expand Up @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid (
grid.writeGeotiff(fos);
break;
}

LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey);
fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile);
LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey);
}
String analysisHumanName = humanNameForEntity(analysis);
String destinationHumanName = humanNameForEntity(destinations);
Expand All @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid (
return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename);
}

// Prevent multiple requests from creating the same files in parallel.
// This could potentially be integrated into FileStorage with enum return values or an additional boolean method.
private Set<String> filesBeingPrepared = Collections.synchronizedSet(new HashSet<>());

private Object getAllRegionalResults (Request req, Response res) throws IOException {
final String regionalAnalysisId = req.params("_id");
final UserPermissions userPermissions = UserPermissions.from(req);
Expand All @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept
throw AnalysisServerException.badRequest("Batch result download only available for gridded origins.");
}
FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip");
if (!fileStorage.exists(zippedResultsKey)) {
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
if (fileStorage.exists(zippedResultsKey)) {
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
}
if (filesBeingPrepared.contains(zippedResultsKey.path)) {
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Geotiff zip is already being prepared in the background.";
}
// File did not exist. Create it in the background and ask caller to request it later.
filesBeingPrepared.add(zippedResultsKey.path);
Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name)
.forUser(userPermissions)
.withAction(progressListener -> {
int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length *
analysis.travelTimePercentiles.length * 2 + 1;
progressListener.beginTask("Creating and archiving geotiffs...", nSteps);
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
progressListener.increment();
}
}
}
}
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries.
// May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual
// entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
progressListener.increment();
}
}
}
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
}
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
progressListener.increment();
filesBeingPrepared.remove(zippedResultsKey.path);
});
taskScheduler.enqueue(task);
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Building geotiff zip in background.";
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/analyst/progress/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void bubbleUpProgress() {
}

/**
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
* Check that all necessary fields have been set before enqueueing for execution, and check any invariants.
*/
public void validate () {
if (this.user == null) {
Expand Down

0 comments on commit 91aa0e9

Please sign in to comment.