From f53df35369d2595110a5822d4f8dc3bb14934429 Mon Sep 17 00:00:00 2001 From: Trevor Gerhardt Date: Sun, 29 Jan 2023 17:24:54 +0100 Subject: [PATCH] Refactor the results assemblers Refactor the multi-origin assembler and the individual result assembler classes to allow them to be created during the regional analysis creation process so that the Broker and MultiOriginAssembler can be simpler and depend on fewer components. --- .../components/LocalBackendComponents.java | 2 +- .../analysis/components/broker/Broker.java | 83 ++---------- .../analysis/components/broker/Job.java | 43 +++--- .../components/broker/RedeliveryTest.java | 6 +- .../RegionalAnalysisController.java | 63 ++++++++- .../results/AccessCsvResultWriter.java | 6 +- .../analysis/results/BaseResultWriter.java | 86 ------------ .../analysis/results/CsvResultWriter.java | 44 ++++-- .../analysis/results/GridResultWriter.java | 73 ++++++++-- .../results/MultiGridResultWriter.java | 88 ------------ .../results/MultiOriginAssembler.java | 127 ++++-------------- .../analysis/results/PathCsvResultWriter.java | 3 +- .../results/RegionalResultWriter.java | 4 +- .../analysis/results/TimeCsvResultWriter.java | 4 +- .../java/com/conveyal/file/FileUtils.java | 15 +++ .../r5/analyst/cluster/RegionalTask.java | 10 ++ 16 files changed, 251 insertions(+), 406 deletions(-) delete mode 100644 src/main/java/com/conveyal/analysis/results/BaseResultWriter.java delete mode 100644 src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java diff --git a/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java b/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java index 4de8e3098..ec59b15b8 100644 --- a/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java +++ b/src/main/java/com/conveyal/analysis/components/LocalBackendComponents.java @@ -34,7 +34,7 @@ public LocalBackendComponents () { authentication = new LocalAuthentication(); // TODO add nested LocalWorkerComponents here, to reuse some components, and pass it into the LocalWorkerLauncher? workerLauncher = new LocalWorkerLauncher(config, fileStorage, gtfsCache, osmCache); - broker = new Broker(config, fileStorage, eventBus, workerLauncher); + broker = new Broker(config, eventBus, workerLauncher); censusExtractor = new SeamlessCensusGridExtractor(config); // Instantiate the HttpControllers last, when all the components except the HttpApi are already created. List httpControllers = standardHttpControllers(); diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index d8895e6e3..45f0b0535 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -6,17 +6,11 @@ import com.conveyal.analysis.components.eventbus.EventBus; import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent; import com.conveyal.analysis.components.eventbus.WorkerEvent; -import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.results.MultiOriginAssembler; -import com.conveyal.analysis.util.JsonUtil; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.conveyal.r5.analyst.cluster.WorkerStatus; -import com.conveyal.r5.analyst.scenario.Scenario; import com.conveyal.r5.util.ExceptionUtils; import com.google.common.collect.ListMultimap; import com.google.common.collect.MultimapBuilder; @@ -27,8 +21,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,7 +34,6 @@ import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT; -import static com.conveyal.file.FileCategory.BUNDLES; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -93,7 +84,6 @@ public interface Config { private Config config; // Component Dependencies - private final FileStorage fileStorage; private final EventBus eventBus; private final WorkerLauncher workerLauncher; @@ -143,9 +133,8 @@ public interface Config { public TObjectLongMap recentlyRequestedWorkers = TCollections.synchronizedMap(new TObjectLongHashMap<>()); - public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { + public Broker (Config config, EventBus eventBus, WorkerLauncher workerLauncher) { this.config = config; - this.fileStorage = fileStorage; this.eventBus = eventBus; this.workerLauncher = workerLauncher; } @@ -154,26 +143,16 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker * Enqueue a set of tasks for a regional analysis. * Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job. */ - public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) { - - // Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID. - RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis); - - LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId); - if (findJob(templateTask.jobId) != null) { - LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId); - throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId); + public synchronized void enqueueTasksForRegionalJob (Job job, MultiOriginAssembler assembler) { + LOG.info("Enqueuing tasks for job {} using template task.", job.jobId); + if (findJob(job.jobId) != null) { + LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId); + throw new RuntimeException("Enqueued duplicate job " + job.jobId); } - WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis); - Job job = new Job(templateTask, workerTags); jobs.put(job.workerCategory, job); // Register the regional job so results received from multiple workers can be assembled into one file. - // TODO encapsulate MultiOriginAssemblers in a new Component - // Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler. - // That is not catastrophic, but the user may need to recognize and delete the stalled regional job. - MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage); - resultAssemblers.put(templateTask.jobId, assembler); + resultAssemblers.put(job.jobId, assembler); if (config.testTaskRedelivery()) { // This is a fake job for testing, don't confuse the worker startup code below with null graph ID. @@ -181,56 +160,12 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn } if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) { - createOnDemandWorkerInCategory(job.workerCategory, workerTags); + createOnDemandWorkerInCategory(job.workerCategory, job.workerTags); } else { // Workers exist in this category, clear out any record that we're waiting for one to start up. recentlyRequestedWorkers.remove(job.workerCategory); } - eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group)); - } - - /** - * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin - * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to - * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID - * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, - * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on - * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't - * want to affect the original object which contains all the scenario details. - * TODO Why is all this detail added after the Persistence call? - * We don't want to store all the details added below in Mongo? - */ - private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) { - RegionalTask templateTask = regionalAnalysis.request.clone(); - // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. - Scenario scenario = templateTask.scenario; - templateTask.scenarioId = scenario.id; - // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. - templateTask.scenario = null; - String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id); - FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); - try { - File localScenario = FileUtils.createScratchFile("json"); - JsonUtil.objectMapper.writeValue(localScenario, scenario); - // FIXME this is using a network service in a method called from a synchronized broker method. - // Move file into storage before entering the synchronized block. - fileStorage.moveIntoStorage(fileStorageKey, localScenario); - } catch (IOException e) { - LOG.error("Error storing scenario for retrieval by workers.", e); - } - // Fill in all the fields in the template task that will remain the same across all tasks in a job. - // I am not sure why we are re-setting all these fields, it seems like they are already set when the task is - // initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before - // eliminating or moving these lines. - templateTask.jobId = regionalAnalysis._id; - templateTask.graphId = regionalAnalysis.bundleId; - templateTask.workerVersion = regionalAnalysis.workerVersion; - templateTask.height = regionalAnalysis.height; - templateTask.width = regionalAnalysis.width; - templateTask.north = regionalAnalysis.north; - templateTask.west = regionalAnalysis.west; - templateTask.zoom = regionalAnalysis.zoom; - return templateTask; + eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group)); } /** diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index fa2c2ca66..b3be77c70 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -13,7 +13,6 @@ import java.util.Set; import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkNotNull; /** * A Job is a collection of tasks that represent all the origins in a regional analysis. All the @@ -61,13 +60,13 @@ public class Job { * The number of remaining tasks can be derived from the deliveredTasks BitSet, but as an * optimization we keep a separate counter to avoid constantly scanning over that whole bitset. */ - protected int nTasksCompleted; + protected int nTasksCompleted = 0; /** * The total number of task deliveries that have occurred. A task may be counted more than * once if it is redelivered. */ - protected int nTasksDelivered; + protected int nTasksDelivered = 0; /** Every task in this job will be based on this template task, but have its origin coordinates changed. */ public final RegionalTask templateTask; @@ -128,23 +127,31 @@ private RegionalTask makeOneTask (int taskNumber) { */ public final Set errors = new HashSet(); - public Job (RegionalTask templateTask, WorkerTags workerTags) { - this.jobId = templateTask.jobId; - this.templateTask = templateTask; - this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion); - this.nTasksCompleted = 0; - this.nextTaskToDeliver = 0; - - if (templateTask.originPointSetKey != null) { - checkNotNull(templateTask.originPointSet); - this.nTasksTotal = templateTask.originPointSet.featureCount(); - } else { - this.nTasksTotal = templateTask.width * templateTask.height; - } - - this.completedTasks = new BitSet(nTasksTotal); + public Job (RegionalTask task, WorkerTags workerTags) { + templateTask = templateTaskFromRegionalTask(task); + jobId = task.jobId; + workerCategory = new WorkerCategory(task.graphId, task.workerVersion); + nTasksTotal = task.getTasksTotal(); + completedTasks = new BitSet(nTasksTotal); this.workerTags = workerTags; + } + /** + * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin + * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to + * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID + * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, + * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on + * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't + * want to affect the original object which contains all the scenario details. + */ + private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) { + RegionalTask templateTask = task.clone(); + // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. + templateTask.scenarioId = templateTask.scenario.id; + // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. + templateTask.scenario = null; + return templateTask; } public boolean markTaskCompleted(int taskId) { diff --git a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java index 3e6b53326..ddbd4760a 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java +++ b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java @@ -3,11 +3,13 @@ import com.conveyal.analysis.components.BackendComponents; import com.conveyal.analysis.components.LocalBackendComponents; import com.conveyal.analysis.models.RegionalAnalysis; +import com.conveyal.analysis.results.MultiOriginAssembler; import com.conveyal.r5.analyst.cluster.RegionalTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.UUID; /** @@ -66,7 +68,9 @@ private static void sendFakeJob(Broker broker) { templateTask.scenarioId = "FAKE"; RegionalAnalysis regionalAnalysis = new RegionalAnalysis(); regionalAnalysis.request = templateTask; - broker.enqueueTasksForRegionalJob(regionalAnalysis); + var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + var assembler = new MultiOriginAssembler(job, new ArrayList<>()); + broker.enqueueTasksForRegionalJob(job, assembler); } public static String compactUUID() { diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index 32336fd7c..fde0cc225 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -4,12 +4,20 @@ import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; import com.conveyal.analysis.components.broker.Broker; +import com.conveyal.analysis.components.broker.Job; import com.conveyal.analysis.components.broker.JobStatus; +import com.conveyal.analysis.components.broker.WorkerTags; import com.conveyal.analysis.models.AnalysisRequest; import com.conveyal.analysis.models.OpportunityDataset; import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; +import com.conveyal.analysis.results.AccessCsvResultWriter; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.results.GridResultWriter; +import com.conveyal.analysis.results.MultiOriginAssembler; +import com.conveyal.analysis.results.PathCsvResultWriter; +import com.conveyal.analysis.results.RegionalResultWriter; +import com.conveyal.analysis.results.TimeCsvResultWriter; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; @@ -20,6 +28,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.PointSetCache; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.scenario.Scenario; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.primitives.Ints; import com.mongodb.QueryBuilder; @@ -44,6 +53,7 @@ import static com.conveyal.analysis.util.JsonUtil.toJson; import static com.conveyal.file.FileCategory.BUNDLES; import static com.conveyal.file.FileCategory.RESULTS; +import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.conveyal.r5.transit.TransportNetworkCache.getScenarioFilename; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -506,17 +516,64 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro // This assigns it creation/update time stamps and an ID, which is needed to name any output CSV files. regionalAnalysis = Persistence.regionalAnalyses.create(regionalAnalysis); + // Create the regional job + var regionalJob = new Job(task, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + + // Create the result writers. Store their result file paths in the database. + var resultWriters = new ArrayList(); + if (!task.makeTauiSite) { + if (task.recordAccessibility) { + if (task.originPointSet != null) { + var accessWriter = new AccessCsvResultWriter(task, fileStorage); + resultWriters.add(accessWriter); + regionalAnalysis.resultStorage.put(accessWriter.resultType(), accessWriter.getFileName()); + } else { + resultWriters.addAll(GridResultWriter.createWritersFromTask(regionalAnalysis, task, fileStorage)); + } + } + + if (task.recordTimes) { + var timesWriter = new TimeCsvResultWriter(task, fileStorage); + resultWriters.add(timesWriter); + regionalAnalysis.resultStorage.put(timesWriter.resultType(), timesWriter.getFileName()); + } + + if (task.includePathResults) { + var pathsWriter = new PathCsvResultWriter(task, fileStorage); + resultWriters.add(pathsWriter); + regionalAnalysis.resultStorage.put(pathsWriter.resultType(), pathsWriter.getFileName()); + } + checkArgument(notNullOrEmpty(resultWriters), "A regional analysis should always create at least one grid or CSV file."); + } + + // Create the multi-origin assembler with the writers. + var assembler = new MultiOriginAssembler(regionalJob, resultWriters); + + // Stored scenario is needed by workers. Must be done ahead of enqueueing the job. + storeScenarioJson(task.graphId, task.scenario); + // Register the regional job with the broker, which will distribute individual tasks to workers and track progress. - broker.enqueueTasksForRegionalJob(regionalAnalysis); + broker.enqueueTasksForRegionalJob(regionalJob, assembler); // Flush to the database any information added to the RegionalAnalysis object when it was enqueued. - // This includes the paths of any CSV files that will be produced by this analysis. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock() or put(). + // This includes the paths of any CSV files that will be produced by this analysis. The regional analysis was + // created in this method and therefore we can bypass the nonce / permission checking. Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis); return regionalAnalysis; } + /** + * Store the regional analysis scenario as JSON for retrieval by the workers. + */ + private void storeScenarioJson(String graphId, Scenario scenario) throws IOException { + String fileName = getScenarioFilename(graphId, scenario.id); + FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); + File localScenario = FileUtils.createScratchFile("json"); + JsonUtil.objectMapper.writeValue(localScenario, scenario); + fileStorage.moveIntoStorage(fileStorageKey, localScenario); + } + private RegionalAnalysis updateRegionalAnalysis (Request request, Response response) throws IOException { RegionalAnalysis regionalAnalysis = JsonUtil.objectMapper.readValue(request.body(), RegionalAnalysis.class); return Persistence.regionalAnalyses.updateByUserIfPermitted(regionalAnalysis, UserPermissions.from(request)); diff --git a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java index e208ac827..118350a44 100644 --- a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java @@ -4,13 +4,12 @@ import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; public class AccessCsvResultWriter extends CsvResultWriter { - public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } @@ -49,13 +48,14 @@ public Iterable rowValues (RegionalWorkResult workResult) { for (int p = 0; p < task.percentiles.length; p++) { int[] cutoffsForPercentile = percentilesForDestPointset[p]; for (int c = 0; c < task.cutoffsMinutes.length; c++) { + int accessibilityValue = cutoffsForPercentile[c]; // Ideally we'd output the pointset IDs (rather than keys) which we have in the RegionalAnalysis rows.add(new String[] { originId, task.destinationPointSetKeys[d], Integer.toString(task.percentiles[p]), Integer.toString(task.cutoffsMinutes[c]), - Integer.toString(workResult.accessibilityValues[d][p][c]) + Integer.toString(accessibilityValue) }); } } diff --git a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java deleted file mode 100644 index df289c9fe..000000000 --- a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.file.FileCategory; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; -import com.google.common.io.ByteStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; - -import static com.conveyal.file.FileCategory.RESULTS; -import static com.conveyal.r5.common.Util.human; - -/** - * This is an abstract base class for writing regional analysis results into a file for long term - * storage. It provides reuseable logic for creating local buffer files and uploading them to long - * term cloud storage once the regional analysis is complete. Concrete subclasses handle writing CSV - * or proprietary binary grid files, depending on the type of regional analysis. - */ -public abstract class BaseResultWriter { - - private static final Logger LOG = LoggerFactory.getLogger(BaseResultWriter.class); - - private final FileStorage fileStorage; - - protected File bufferFile; - - public BaseResultWriter (FileStorage fileStorage) { - this.fileStorage = fileStorage; - } - - // Can this be merged into the constructor? - protected void prepare (String jobId) { - try { - bufferFile = File.createTempFile(jobId + "_", ".results"); - // On unexpected server shutdown, these files should be deleted. - // We could attempt to recover from shutdowns but that will take a lot of changes and persisted data. - bufferFile.deleteOnExit(); - } catch (IOException e) { - LOG.error("Exception while creating buffer file for multi-origin assembler: " + e.toString()); - } - } - - /** - * Gzip the access grid and store it. - */ - protected synchronized void finish (String fileName) throws IOException { - LOG.info("Compressing {} and moving into file storage.", fileName); - FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, fileName); - File gzippedResultFile = FileUtils.createScratchFile(); - - // There's probably a more elegant way to do this with NIO and without closing the buffer. - // That would be Files.copy(File.toPath(),X) or ByteStreams.copy. - InputStream is = new BufferedInputStream(new FileInputStream(bufferFile)); - OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(gzippedResultFile))); - ByteStreams.copy(is, os); - is.close(); - os.close(); - - LOG.info("GZIP compression reduced analysis results {} from {} to {} ({}x compression)", - fileName, - human(bufferFile.length(), "B"), - human(gzippedResultFile.length(), "B"), - (double) bufferFile.length() / gzippedResultFile.length() - ); - - fileStorage.moveIntoStorage(fileStorageKey, gzippedResultFile); - bufferFile.delete(); - } - - /** - * Close all buffers and temporary files. - */ - abstract void terminate () throws Exception; - -} diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index e07abc2af..501ac5306 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -1,6 +1,9 @@ package com.conveyal.analysis.results; +import com.conveyal.file.FileCategory; import com.conveyal.file.FileStorage; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.csvreader.CsvWriter; @@ -9,6 +12,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; +import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -19,12 +23,12 @@ * Subclasses are used to record origin/destination "skim" matrices, accessibility indicators for non-gridded * ("freeform") origin point sets, and cataloging paths between pairs of origins and destinations. */ -public abstract class CsvResultWriter extends BaseResultWriter implements RegionalResultWriter { +public abstract class CsvResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(CsvResultWriter.class); - - public final String fileName; + private final File bufferFile = FileUtils.createScratchFile("csv"); private final CsvWriter csvWriter; + private final FileStorage fileStorage; private int nDataColumns; /** @@ -51,19 +55,31 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region /** * Construct a writer to record incoming results in a CSV file, with header row consisting of * "origin", "destination", and the supplied indicator. - * FIXME it's strange we're manually passing injectable components into objects not wired up at application construction. */ - CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(fileStorage); - super.prepare(task.jobId); - this.fileName = task.jobId + "_" + resultType() +".csv"; - BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); - csvWriter = new CsvWriter(bufferedWriter, ','); - setDataColumns(columnHeaders()); + CsvResultWriter (RegionalTask task, FileStorage fileStorage) { + this.fileStorage = fileStorage; + String[] columns = columnHeaders(); + csvWriter = getBufferedCsvWriter(columns); + this.nDataColumns = columns.length; this.task = task; LOG.info("Created CSV file to hold {} results for regional job {}", resultType(), task.jobId); } + public String getFileName() { + return task.jobId + "_" + resultType() + ".csv"; + } + + private CsvWriter getBufferedCsvWriter(String[] columnHeaders) { + try { + BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); + var writer = new CsvWriter(bufferedWriter, ','); + writer.writeRecord(columnHeaders); + return writer; + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } + } + /** * Writes a header row containing the supplied data columns. */ @@ -74,14 +90,16 @@ protected void setDataColumns(String... columns) throws IOException { /** * Gzip the csv file and move it into permanent file storage such as AWS S3. - * Note: stored file will undergo gzip compression in super.finish(), but be stored with a .csv extension. + * Note: stored file will undergo gzip compression but be stored with a .csv extension. * When this file is downloaded from the UI, the browser will decompress, yielding a logically named .csv file. * Downloads through another channel (e.g. aws s3 cp), will need to be decompressed manually. */ @Override public synchronized void finish () throws IOException { csvWriter.close(); - super.finish(this.fileName); + var gzippedFile = FileUtils.gzipFile(bufferFile); + fileStorage.moveIntoStorage(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile); + bufferFile.delete(); } /** diff --git a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java index 88b1a8c08..72719c8e3 100644 --- a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java @@ -1,16 +1,24 @@ package com.conveyal.analysis.results; +import com.conveyal.analysis.models.RegionalAnalysis; +import com.conveyal.file.FileCategory; import com.conveyal.file.FileStorage; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.LittleEndianIntOutputStream; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; import static com.conveyal.r5.common.Util.human; @@ -37,11 +45,13 @@ *
  • (repeated 4-byte int) values of each pixel in row-major order: axis order (row, column, channel).
  • * */ -public class GridResultWriter extends BaseResultWriter { +public class GridResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(GridResultWriter.class); - private RandomAccessFile randomAccessFile; + private final File bufferFile = FileUtils.createScratchFile("grid"); + private final FileStorage fileStorage; + private final RandomAccessFile randomAccessFile; /** The version of the access grids we produce */ private static final int ACCESS_GRID_VERSION = 0; @@ -59,13 +69,44 @@ public class GridResultWriter extends BaseResultWriter { */ private final int channels; + private final int percentileIndex; + private final int destinationIndex; + private final String gridFileName; + /** - * Construct an writer for a single regional analysis result grid, using the proprietary + * We create one GridResultWriter for each destination pointset and percentile. + * Each of those output files contains data for all specified travel time cutoffs at each origin. + */ + public static List createWritersFromTask(RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage) { + int nPercentiles = task.percentiles.length; + int nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; + // Create one grid writer per percentile and destination pointset. + var gridWriters = new ArrayList(); + for (int destinationIndex = 0; destinationIndex < nDestinationPointSets; destinationIndex++) { + for (int percentileIndex = 0; percentileIndex < nPercentiles; percentileIndex++) { + String destinationPointSetId = regionalAnalysis.destinationPointSetIds[destinationIndex]; + gridWriters.add(new GridResultWriter( + task, + fileStorage, + percentileIndex, + destinationIndex, + destinationPointSetId + )); + } + } + return gridWriters; + } + + /** + * Construct a writer for a single regional analysis result grid, using the proprietary * Conveyal grid format. This also creates the on-disk scratch buffer into which the results * from the workers will be accumulated. */ - GridResultWriter (RegionalTask task, FileStorage fileStorage) { - super(fileStorage); + GridResultWriter (RegionalTask task, FileStorage fileStorage, int percentileIndex, int destinationIndex, String destinationPointSetId) { + this.fileStorage = fileStorage; + this.gridFileName = String.format("%s_%s_P%d.access", task.jobId, destinationPointSetId, task.percentiles[percentileIndex]); + this.percentileIndex = percentileIndex; + this.destinationIndex = destinationIndex; int width = task.width; int height = task.height; this.channels = task.cutoffsMinutes.length; @@ -75,7 +116,6 @@ public class GridResultWriter extends BaseResultWriter { height, channels ); - super.prepare(task.jobId); try { // Write the access grid file header to the temporary file. @@ -113,9 +153,11 @@ public class GridResultWriter extends BaseResultWriter { /** Gzip the access grid and upload it to file storage (such as AWS S3). */ @Override - protected synchronized void finish (String fileName) throws IOException { - super.finish(fileName); + public synchronized void finish () throws IOException { randomAccessFile.close(); + var gzippedFile = FileUtils.gzipFile(bufferFile); + fileStorage.moveIntoStorage(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile); + bufferFile.delete(); } /** @@ -130,18 +172,25 @@ private static byte[] intToLittleEndianByteArray (int i) { return byteBuffer.array(); } + @Override + public void writeOneWorkResult(RegionalWorkResult workResult) throws Exception { + // Drop work results for this particular origin into a little-endian output file. + int[][] percentilesForGrid = workResult.accessibilityValues[destinationIndex]; + int[] cutoffsForPercentile = percentilesForGrid[percentileIndex]; + writeOneOrigin(workResult.taskId, cutoffsForPercentile); + } + /** * Write all channels at once to the proper subregion of the buffer for this origin. The origins we receive have 2d * coordinates. Flatten them to compute file offsets and for the origin checklist. */ - synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOException { + private void writeOneOrigin (int taskNumber, int[] values) throws IOException { if (values.length != channels) { throw new IllegalArgumentException("Number of channels to be written does not match this writer."); } long offset = HEADER_LENGTH_BYTES + (taskNumber * channels * Integer.BYTES); // RandomAccessFile is not threadsafe and multiple threads may call this, so synchronize. - // TODO why is the method also synchronized then? - synchronized (this) { + synchronized (randomAccessFile) { randomAccessFile.seek(offset); // FIXME should this be delta-coded? The Selecting grid reducer seems to expect it to be. int lastValue = 0; @@ -154,7 +203,7 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti } @Override - synchronized void terminate () throws IOException { + public synchronized void terminate () throws IOException { randomAccessFile.close(); bufferFile.delete(); } diff --git a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java b/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java deleted file mode 100644 index 5f4d90f8a..000000000 --- a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.file.FileStorage; -import com.conveyal.r5.analyst.cluster.RegionalTask; -import com.conveyal.r5.analyst.cluster.RegionalWorkResult; - -/** - * Adapts our collection of grid writers (one for each destination pointset and percentile) to give them the - * same interface as our CSV writers, so CSV and Grids can be processed similarly in MultiOriginAssembler. - */ -public class MultiGridResultWriter implements RegionalResultWriter { - - private final RegionalAnalysis regionalAnalysis; - - private final RegionalTask task; - - /** - * We create one GridResultWriter for each destination pointset and percentile. - * Each of those output files contains data for all travel time cutoffs at each origin. - */ - private final GridResultWriter[][] accessibilityGridWriters; - - /** The number of different percentiles for which we're calculating accessibility on the workers. */ - private final int nPercentiles; - - /** The number of destination pointsets to which we're calculating accessibility */ - private final int nDestinationPointSets; - - /** Constructor */ - public MultiGridResultWriter ( - RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage - ) { - // We are storing the regional analysis just to get its pointset IDs (not keys) and its own ID. - this.regionalAnalysis = regionalAnalysis; - this.task = task; - this.nPercentiles = task.percentiles.length; - this.nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; - // Create one grid writer per percentile and destination pointset. - accessibilityGridWriters = new GridResultWriter[nDestinationPointSets][nPercentiles]; - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - accessibilityGridWriters[d][p] = new GridResultWriter(task, fileStorage); - } - } - } - - @Override - public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception { - // Drop work results for this particular origin into a little-endian output file. - // TODO more efficient way to write little-endian integers - // TODO check monotonic increasing invariants here rather than in worker. - // Infer x and y cell indexes based on the template task - int taskNumber = workResult.taskId; - for (int d = 0; d < workResult.accessibilityValues.length; d++) { - int[][] percentilesForGrid = workResult.accessibilityValues[d]; - for (int p = 0; p < nPercentiles; p++) { - int[] cutoffsForPercentile = percentilesForGrid[p]; - GridResultWriter gridWriter = accessibilityGridWriters[d][p]; - gridWriter.writeOneOrigin(taskNumber, cutoffsForPercentile); - } - } - } - - @Override - public void terminate () throws Exception { - for (GridResultWriter[] writers : accessibilityGridWriters) { - for (GridResultWriter writer : writers) { - writer.terminate(); - } - } - } - - @Override - public void finish () throws Exception { - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - int percentile = task.percentiles[p]; - String destinationPointSetId = regionalAnalysis.destinationPointSetIds[d]; - // TODO verify that regionalAnalysis._id is the same as job.jobId - String gridFileName = - String.format("%s_%s_P%d.access", regionalAnalysis._id, destinationPointSetId, percentile); - accessibilityGridWriters[d][p].finish(gridFileName); - } - } - } - -} diff --git a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java index 3bbd5915f..bdf2b07b2 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java +++ b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java @@ -2,23 +2,15 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.components.broker.Job; -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.analysis.persistence.Persistence; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageFormat; import com.conveyal.r5.analyst.PointSet; +import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import com.conveyal.r5.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkArgument; - /** * This assembles regional results arriving from workers into one or more files per regional analysis on * the backend. This is not a singleton component: one MultiOriginAssembler instance is created per currently active @@ -32,13 +24,6 @@ public class MultiOriginAssembler { private static final int MAX_FREEFORM_DESTINATIONS = 4_000_000; - /** - * The regional analysis for which this object is assembling results. - * We retain the whole object rather than just its ID so we'll have the full details, e.g. destination point set - * IDs and scenario, things that are stripped out of the template task sent to the workers. - */ - private final RegionalAnalysis regionalAnalysis; - /** * The object representing the progress of the regional analysis as tracked by the broker. * It may appear job.templateTask has all the information needed, making the regionalAnalysis field @@ -48,7 +33,7 @@ public class MultiOriginAssembler { public final Job job; // One writer per CSV/Grids we're outputting - private List resultWriters = new ArrayList<>(); + private final List resultWriters; /** * The number of distinct origin points for which we've received at least one result. If for @@ -68,95 +53,30 @@ public class MultiOriginAssembler { */ private final BitSet originsReceived; - /** - * Total number of origin points for which we're expecting results. Note that the total - * number of results received could be higher in the event of an overzealous task redelivery. - */ - public final int nOriginsTotal; - /** * Constructor. This sets up one or more ResultWriters depending on whether we're writing gridded or non-gridded * cumulative opportunities accessibility, or origin-destination travel times. - * TODO do not pass the FileStorage component down into this non-component and the ResultWriter non-component, - * clarify design concepts on this point (e.g. only components should know other components exist). - * Rather than pushing the component all the way down to the leaf function call, we return the finished - * file up to an umbrella location where a single reference to the file storage can be used to - * store all of them. */ - public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileStorage fileStorage) { - try { - this.regionalAnalysis = regionalAnalysis; - this.job = job; - this.nOriginsTotal = job.nTasksTotal; - this.originsReceived = new BitSet(job.nTasksTotal); - // Check that origin and destination sets are not too big for generating CSV files. - if (!job.templateTask.makeTauiSite && - job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension) - ) { - // This requires us to have already loaded this destination pointset instance into the transient field. - PointSet destinationPointSet = job.templateTask.destinationPointSets[0]; - if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) { - if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS || - destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS - ) { - throw new AnalysisServerException(String.format( - "Freeform requests limited to %d destinations and %d origin-destination pairs.", - MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS - )); - } - } - } - - if (job.templateTask.recordAccessibility) { - if (job.templateTask.originPointSet != null) { - resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage)); - } else { - resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage)); - } - } - - if (job.templateTask.recordTimes) { - resultWriters.add(new TimeCsvResultWriter(job.templateTask, fileStorage)); - } - - if (job.templateTask.includePathResults) { - resultWriters.add(new PathCsvResultWriter(job.templateTask, fileStorage)); - } - - checkArgument(job.templateTask.makeTauiSite || notNullOrEmpty(resultWriters), - "A non-Taui regional analysis should always create at least one grid or CSV file."); - - // Record the paths of any CSV files that will be produced by this analysis. - // The caller must flush the RegionalAnalysis back out to the database to retain this information. - // We avoid database access here in constructors, especially when called in synchronized methods. - for (RegionalResultWriter writer : resultWriters) { - // FIXME instanceof+cast is ugly, do this some other way or even record the Grids - if (writer instanceof CsvResultWriter) { - CsvResultWriter csvWriter = (CsvResultWriter) writer; - regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName); - } - } - } catch (Exception e) { - throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e)); - } + public MultiOriginAssembler (Job job, List resultWriters) { + this.job = job; + this.resultWriters = resultWriters; + this.originsReceived = new BitSet(job.nTasksTotal); } /** - * Gzip the output files and persist them to cloud storage. + * Check that origin and destination sets are not too big for generating CSV files. */ - private synchronized void finish() { - LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); - try { - for (RegionalResultWriter writer : resultWriters) { - writer.finish(); + public static void ensureOdPairsUnderLimit(RegionalTask task, PointSet destinationPointSet) { + // This requires us to have already loaded this destination pointset instance into the transient field. + if ((task.recordTimes || task.includePathResults) && !task.oneToOne) { + if (task.getTasksTotal() * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS || + destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS + ) { + throw new AnalysisServerException(String.format( + "Freeform requests limited to %d destinations and %d origin-destination pairs.", + MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS + )); } - regionalAnalysis.complete = true; - // Write updated regionalAnalysis object back out to database, to mark it complete and record locations - // of any CSV files generated. Use method that updates lock/timestamp, otherwise updates are not seen in UI. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock(). - Persistence.regionalAnalyses.put(regionalAnalysis); - } catch (Exception e) { - LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); } } @@ -178,8 +98,17 @@ public synchronized void handleMessage (RegionalWorkResult workResult) throws Ex originsReceived.set(workResult.taskId); nComplete += 1; } - if (nComplete == nOriginsTotal) { - finish(); + + // If finished, run finish on all the result writers. + if (nComplete == job.nTasksTotal) { + LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); + try { + for (RegionalResultWriter writer : resultWriters) { + writer.finish(); + } + } catch (Exception e) { + LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); + } } } diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 0dadb4337..3022befe3 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -6,7 +6,6 @@ import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.apache.commons.lang3.ArrayUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -14,7 +13,7 @@ public class PathCsvResultWriter extends CsvResultWriter { - public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } diff --git a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java index 8380a5bea..a765d951e 100644 --- a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java @@ -2,12 +2,10 @@ import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; - /** * Common interface for classes that write regional results out to CSV or Grids on the backend. */ -interface RegionalResultWriter { +public interface RegionalResultWriter { void writeOneWorkResult (RegionalWorkResult workResult) throws Exception; diff --git a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java index 144da7713..90d4f8f5e 100644 --- a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java @@ -5,16 +5,14 @@ import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; public class TimeCsvResultWriter extends CsvResultWriter { - public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { + public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) { super(task, fileStorage); } diff --git a/src/main/java/com/conveyal/file/FileUtils.java b/src/main/java/com/conveyal/file/FileUtils.java index f2eb94842..8b24570f5 100644 --- a/src/main/java/com/conveyal/file/FileUtils.java +++ b/src/main/java/com/conveyal/file/FileUtils.java @@ -12,6 +12,7 @@ import java.io.RandomAccessFile; import java.nio.file.Files; import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public abstract class FileUtils { /** @@ -86,6 +87,20 @@ public static void transferFromFileTo(File file, OutputStream os) { } } + /** + * GZIP a File and return the new File descriptor. + */ + public static File gzipFile(File file) { + try { + var gzippedFile = createScratchFile(); + var gzippedOs = new GZIPOutputStream(getOutputStream(gzippedFile)); + transferFromFileTo(file, gzippedOs); + return gzippedFile; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Get an BufferedInputStream for a file. Read bytes from the underlying file stream without causing a system call * for each byte read. diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java index c972057f8..11010ffbd 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java @@ -2,6 +2,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.WebMercatorExtents; +import com.google.common.base.Preconditions; /** * Represents a task to be performed as part of a regional analysis. @@ -105,4 +106,13 @@ public int nTargetsPerOrigin () { } } + public int getTasksTotal() { + if (originPointSetKey != null) { + Preconditions.checkNotNull(originPointSet); + return originPointSet.featureCount(); + } else { + return width * height; + } + } + }