Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1b322b4
fix comment on NETWORK_FORMAT_VERSION
abyrd Nov 24, 2023
e6ffda3
Do not record opportunities 120 minutes away
ansoncfit Dec 4, 2023
ef9638b
Merge pull request #917 from conveyal/opportunity-density-fix
abyrd Dec 15, 2023
6b6c767
Fix freeform guardrails
ansoncfit Dec 19, 2023
7dc2908
only record job when assembler created, fixes #887
abyrd Dec 20, 2023
9ab99be
record errors (stopping job) before other checks
abyrd Dec 20, 2023
6ccae66
report worker errors as much shorter stack trace
abyrd Dec 20, 2023
9df41b6
record only one error per job in broker
abyrd Dec 20, 2023
2481a93
Add final modifier to PathResult constants
ansoncfit Dec 26, 2023
f349656
Merge pull request #921 from conveyal/job-error-limiting
abyrd Dec 28, 2023
06e479f
Merge branch 'dev' into freeform-guardrails
abyrd Dec 28, 2023
9690d9d
Use specific AnalysisServerException
ansoncfit Dec 29, 2023
9abf498
Filter stack traces sent to UI
ansoncfit Dec 29, 2023
6a6ab47
build filtered stack trace directly from throwable
abyrd Dec 29, 2023
5d53072
Update to 2020 Census geometries
ansoncfit Dec 30, 2023
48c537e
Update seamless-census test fixtures
ansoncfit Dec 30, 2023
14cbb20
Merge pull request #918 from conveyal/freeform-guardrails
ansoncfit Dec 31, 2023
dcfe1dd
Merge branch 'dev' into census-loader-update
ansoncfit Dec 31, 2023
b3fa65b
Merge pull request #923 from conveyal/census-loader-update
abyrd Jan 3, 2024
b7d02fd
do not include stacktrace in message
abyrd Jan 4, 2024
584aafa
Merge pull request #925 from conveyal/dev
ansoncfit Jan 5, 2024
50d908a
Implement Timo Jaakkonen’s crossing delays (#4), upload package to DG…
christophfink Mar 2, 2023
2795148
Run on older image, as ubuntu-latest has gradle-8, which fails (#5)
christophfink Mar 2, 2023
ee00d93
Freeze gradle version (#6)
christophfink Mar 2, 2023
d77dbc2
fix build workflow (#7)
christophfink Mar 2, 2023
505ef94
fix build workflow (#8)
christophfink Mar 2, 2023
dc44055
configurable (per-repo) package registry (#9)
christophfink Mar 2, 2023
c3ae9be
fix build system (#10)
christophfink Mar 2, 2023
a0c119d
read custom OSM tags, use them for bike routing (#11)
christophfink Mar 6, 2023
fd6d773
Add default speeds that represent the speed limits on Finnish roads (…
christophfink Mar 15, 2023
c43214b
revert Jaakkola implementation (#16)
christophfink Mar 30, 2023
8ab3b20
disable turning penalties for bicycle and car, do not apply perceived…
christophfink Mar 30, 2023
c285f9b
Use per-edge speeds for cycling (#19)
christophfink Apr 28, 2023
73d8c01
Timo Jakkola’s times (#21)
christophfink May 12, 2023
817d73f
save shapes of GTFS patterns (#22)
christophfink Jun 19, 2023
adb0d6b
23 roll back jaakkola congestion timings (#24)
christophfink Aug 11, 2023
869bc3b
make MAX_PATH_DESTINATIONS *not* final
christophfink Jan 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ name: Java CI
on:
# Should catch merged pull requests, pushes to Conveyal PR branches, and tags.
# The push event's ref reveals the branch name for S3 upload, unlike pull_request which sees the merge target.
push
push:
pull_request:
# Try to catch PRs from outside authors, which don't need to be uploaded to S3.
# pull_request:
# branches:
Expand Down Expand Up @@ -50,5 +51,7 @@ jobs:
- name: Publish to GH Packages
# Supply access token to build.gradle (used in publishing.repositories.maven.credentials)
env:
GITHUB_PACKAGE_REPOSITORY: ${{ github.event.pull_request.head.repo.full_name || github.repository }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
continue-on-error: true
run: gradle publish
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ publishing {
repositories {
maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/conveyal/r5")
url = uri("https://maven.pkg.github.com/" + System.getenv("GITHUB_PACKAGE_REPOSITORY"))
credentials {
username = System.getenv("GITHUB_ACTOR")
password = System.getenv("GITHUB_TOKEN")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void respondToException(Exception e, Request request, Response response,
// Include a stack trace except when the error is known to be about unauthenticated or unauthorized access,
// in which case we don't want to leak information about the server to people scanning it for weaknesses.
if (type != UNAUTHORIZED && type != FORBIDDEN) {
body.put("stackTrace", errorEvent.stackTrace);
body.put("stackTrace", errorEvent.filteredStackTrace);
}
response.status(code);
response.type("application/json");
Expand Down
38 changes: 25 additions & 13 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -385,14 +389,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -488,21 +498,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
* the workers as well, but we'd have to be careful not to generate hundreds of messages at once.
*/
public class ErrorEvent extends Event {

// We may serialize this object, so we convert the Throwable to two strings to control its representation.
// All Events are intended to be eligible for serialization into a log or database, so we convert the Throwable to
// some Strings to determine its representation in a simple way.
// For flexibility in event handlers, it is tempting to hold on to the original Throwable instead of derived
// Strings. Exceptions are famously slow, but it's the initial creation and filling in the stack trace that are
// slow. Once the instace exists, repeatedly examining its stack trace should not be prohibitively costly. Still,
// we do probably gain some efficiency by converting the stack trace to a String once and reusing that.
// slow. Once the instance exists, repeatedly examining its stack trace should not be prohibitively costly.

public final String summary;

Expand All @@ -23,11 +25,16 @@ public class ErrorEvent extends Event {
*/
public final String httpPath;

/** The full stack trace of the exception that occurred. */
public final String stackTrace;

/** A minimal stack trace showing the immediate cause within Conveyal code. */
public final String filteredStackTrace;

public ErrorEvent (Throwable throwable, String httpPath) {
this.summary = ExceptionUtils.shortCauseString(throwable);
this.stackTrace = ExceptionUtils.stackTraceString(throwable);
this.filteredStackTrace = ExceptionUtils.filterStackTrace(throwable);
this.httpPath = httpPath;
}

Expand All @@ -54,25 +61,9 @@ public String traceWithContext (boolean verbose) {
if (verbose) {
builder.append(stackTrace);
} else {
builder.append(filterStackTrace(stackTrace));
builder.append(filteredStackTrace);
}
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.util.ExceptionUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,21 +91,27 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
this.job = job;
this.nOriginsTotal = job.nTasksTotal;
this.originsReceived = new BitSet(job.nTasksTotal);
// Check that origin and destination sets are not too big for generating CSV files.
if (!job.templateTask.makeTauiSite &&
job.templateTask.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)
) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = job.templateTask.destinationPointSets[0];
if ((job.templateTask.recordTimes || job.templateTask.includePathResults) && !job.templateTask.oneToOne) {
if (nOriginsTotal * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS ||
destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS
) {
throw new AnalysisServerException(String.format(
"Freeform requests limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
// If results have been requested for freeform origins, check that the origin and
// destination pointsets are not too big for generating CSV files.
RegionalTask task = job.templateTask;
if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) {
// This requires us to have already loaded this destination pointset instance into the transient field.
PointSet destinationPointSet = task.destinationPointSets[0];
int nDestinations = destinationPointSet.featureCount();
int nODPairs = task.oneToOne ? nOriginsTotal : nOriginsTotal * nDestinations;
if (task.recordTimes &&
(nDestinations > MAX_FREEFORM_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Travel time results limited to %d destinations and %d origin-destination pairs.",
MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
if (task.includePathResults &&
(nDestinations > PathResult.MAX_PATH_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) {
throw AnalysisServerException.badRequest(String.format(
"Path results limited to %d destinations and %d origin-destination pairs.",
PathResult.MAX_PATH_DESTINATIONS, MAX_FREEFORM_OD_PAIRS
));
}
}

Expand Down Expand Up @@ -152,8 +160,11 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto
regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName);
}
}
} catch (AnalysisServerException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException("Exception while creating multi-origin assembler: " + ExceptionUtils.stackTraceString(e));
// Handle any obscure problems we don't want end users to see without context of MultiOriginAssembler.
throw new RuntimeException("Exception while creating multi-origin assembler: " + e.toString(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void load (ShapeDataStore store) throws Exception {
for (SimpleFeatureIterator it = sfc.features(); it.hasNext();) {
GeobufFeature feat = new GeobufFeature(it.next());
feat.id = null;
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID10"));
feat.numericId = Long.parseLong((String) feat.properties.get("GEOID20"));
feat.properties = new HashMap<>();
store.add(feat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public class TemporalDensityResult {

/**
* The temporal density of opportunities. For each destination set, for each percentile, for each minute of
* travel from 0 to 120, the number of opportunities reached in travel times from i (inclusive) to i+1 (exclusive).
* travel m from 0 to 119, the number of opportunities reached in travel times from m (inclusive) to m+1
* (exclusive).
*/
public final double[][][] opportunitiesPerMinute;

Expand All @@ -57,7 +58,7 @@ public void recordOneTarget (int target, int[] travelTimePercentilesSeconds) {
break; // If any percentile is unreached, all higher ones are also unreached.
}
int m = travelTimePercentilesSeconds[p] / 60;
if (m <= 120) {
if (m < 120) {
opportunitiesPerMinute[d][p][m] += dps.getOpportunityCount(target);
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/conveyal/r5/analyst/cluster/PathResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PathResult {
* These results are returned to the backend over an HTTP API so we don't want to risk making them too huge.
* This could be set to a higher number in cases where you know the result return channel can handle the size.
*/
public static int maxDestinations = 5000;
public static int MAX_PATH_DESTINATIONS = 5_000;

private final int nDestinations;
/**
Expand All @@ -49,7 +49,7 @@ public class PathResult {
public final Multimap<RouteSequence, Iteration>[] iterationsForPathTemplates;
private final TransitLayer transitLayer;

public static String[] DATA_COLUMNS = new String[]{
public static final String[] DATA_COLUMNS = new String[]{
"routes",
"boardStops",
"alightStops",
Expand All @@ -70,8 +70,8 @@ public PathResult(AnalysisWorkerTask task, TransitLayer transitLayer) {
// In regional analyses, return paths to all destinations
nDestinations = task.nTargetsPerOrigin();
// This limitation reflects the initial design, for use with freeform pointset destinations
if (nDestinations > maxDestinations) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + maxDestinations);
if (nDestinations > MAX_PATH_DESTINATIONS) {
throw new UnsupportedOperationException("Number of detailed path destinations exceeds limit of " + MAX_PATH_DESTINATIONS);
}
}
iterationsForPathTemplates = new Multimap[nDestinations];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,17 @@ public RegionalWorkResult(OneOriginResult result, RegionalTask task) {
// TODO checkTravelTimeInvariants, checkAccessibilityInvariants to verify that values are monotonically increasing
}

/** Constructor used when results for this origin are considered unusable due to an unhandled error. */
/**
* Constructor used when results for this origin are considered unusable due to an unhandled error. Besides the
* short-form exception, most result fields are left null. There is no information to communicate, and because
* errors are often produced faster than valid results, we don't want to flood the backend with unnecessarily
* voluminous error reports. The short-form exception message is used for a similar reason, to limit the total size
* of error messages.
*/
public RegionalWorkResult(Throwable t, RegionalTask task) {
this.jobId = task.jobId;
this.taskId = task.taskId;
this.error = ExceptionUtils.shortAndLongString(t);
this.error = ExceptionUtils.filterStackTrace(t);
}

}
Loading