feat: ETL steps for terminals and terminal distances with hourly scheduler#220
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
Adds two new Catalog ETL steps to ingest UEX “terminals” and “terminal distances” data and schedules them to run hourly, with a 12-hour skip guard to respect UEX caching.
Changes:
- Introduces
TerminalsSyncStepandTerminalDistancesSyncStepwith upsert logic and warning emission. - Adds hourly
CatalogEtlSchedulercron jobs to trigger the new steps viaCatalogEtlService.runStep(...). - Registers new steps in the ETL pipeline and module wiring; adds unit tests for both steps.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/src/modules/catalog-etl/steps/terminals-sync.step.ts | New ETL step to fetch /terminals, resolve parent FKs, and upsert into station_terminal with a 12h skip guard. |
| backend/src/modules/catalog-etl/steps/terminals-sync.step.spec.ts | Unit tests for terminals skip guard, upsert behavior, and warning scenarios. |
| backend/src/modules/catalog-etl/steps/terminal-distances-sync.step.ts | New ETL step to fetch /terminal_distances, resolve terminal FKs, and upsert into station_terminal_distance with a 12h skip guard. |
| backend/src/modules/catalog-etl/steps/terminal-distances-sync.step.spec.ts | Unit tests for distance skip guard, upsert behavior, and warning/skip scenarios. |
| backend/src/modules/catalog-etl/schedulers/catalog-etl.scheduler.ts | Adds hourly cron triggers for the two new ETL steps. |
| backend/src/modules/catalog-etl/catalog-etl.service.ts | Wires new steps into the ETL pipeline and introduces runStep(stepName). |
| backend/src/modules/catalog-etl/catalog-etl.service.spec.ts | Updates service test module providers to include the new steps. |
| backend/src/modules/catalog-etl/catalog-etl.module.ts | Registers the new steps and the new scheduler provider. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
1 task
- Add TerminalsSyncStep: 12-hour skip guard, location FK resolution (space station → outpost → city priority), 43-column upsert with ON CONFLICT (uex_id) DO UPDATE - Add TerminalDistancesSyncStep: 12-hour skip guard, terminal FK resolution by uex_id, upsert by (terminal_origin_id, terminal_destination_id) - Add CatalogEtlScheduler: @Cron('0 * * * *') for both terminal steps via runStep() - Add runStep(name) to CatalogEtlService for targeted single-step execution - Register both steps and scheduler in CatalogEtlModule; extend ETL_STEPS pipeline - Unit tests: skip guard, upsert, FK resolution, warning emission (32 tests across both steps)
…ix scheduler race - Resolve star_system_id, planet_id, orbit_id, moon_id, faction_id, company_id via lookup maps (9 parallel queries) instead of passing raw UEX IDs into FK columns - Add TERMINAL_TYPE_MAP to translate UEX type strings to the schema CHECK-constraint enum; skip + warn on unknown types rather than violating the CHECK constraint - Offset terminal-distances-sync cron to '5 * * * *' so it fires 5 min after terminals-sync, preventing a race where the distances job snapshots station_terminal before it is populated - Expand spec: 45 tests (was 32) covering type mapping, secondary FK resolution, and null passthrough
… EtlRun; fix mock names - Replace station_etl_run_state skip guard with query on station_etl_run (the old table is dropped by CatalogEtlSchemaMigration1748000000001) - runStep() now creates a real EtlRun row (UUID), passes its runId to the step, and saves an error EtlWarning on failure — warnings no longer have an invalid run_id or missing FK row - Remove INSERT INTO station_etl_run_state at end of each step (run state is now owned by runStep) - Align service spec mock names: 'terminals-sync' / 'terminal-distances-sync' - Add runStep unit tests: success path, failure path, unknown-step throws
- Replace station_etl_run NOT EXISTS skip guard with MAX(synced_at) on
target table — eliminates false-positive skips on first deployment
where an empty table now correctly returns NULL and bypasses the guard
- Wrap runStep() in advisoryLockService.withLock('catalog_etl') to
prevent concurrent scheduler invocations from racing with runEtl()
- Rethrow caught step errors after persisting warning + saving failed
run state — scheduler's try/catch now receives the error for logging
- Update runStep tests: assert advisory lock is acquired, assert step
failure rejects (not resolves), add ConflictException lock-held case
terminals-sync: - Add poi_id FK resolution — load SELECT uex_id, id FROM station_poi and resolve record.id_poi to local BIGINT id; stored as null when not found (same pattern as other nullable secondary FKs) - Add poi_id to INSERT column list and ON CONFLICT DO UPDATE SET - Fix misleading inline comment on secondary FK block terminal-distances-sync: - Correct endpoint from /terminal_distances to /terminals_distances per schema docs; API response uses terminal_code_origin / terminal_code_destination (string codes), not integer IDs - Resolve distances by station_terminal.code lookup instead of uex_id - Replace per-row INSERT loop with batched multi-row upserts (UPSERT_BATCH_SIZE=500) to handle the ~500k-row dataset efficiently tests: - Update terminal-distances-sync spec for new endpoint/field shapes - Update terminals-sync spec for poi FK map, poi_id param index, shifted parameter positions for secondary FKs and boolean flags
… in batches Skip guard — both steps now write 'epoch' into synced_at on every INSERT and ON CONFLICT UPDATE, then issue a single UPDATE ... SET synced_at = NOW() after the loop completes. MAX(synced_at) is therefore only non-epoch once the entire run has succeeded, so a mid-run failure never advances the guard and the next scheduled run always retries the full load. Memory — terminal-distances-sync no longer accumulates a validRows array for the whole dataset before batching. Records are now streamed: params and placeholders are built incrementally and flushed every UPSERT_BATCH_SIZE rows, keeping peak memory proportional to the batch size rather than the full dataset. Tests — added assertions that INSERTs contain 'epoch' and that the final synced_at UPDATE fires after the loop for both steps.
…ial failure A run that fails mid-way previously left untouched rows with their prior non-epoch synced_at, causing MAX(synced_at) to still reflect the last successful run and the next hourly schedule to skip. Both terminals-sync and terminal-distances-sync now issue an UPDATE ... SET synced_at = 'epoch' before fetching data from UEX. This ensures that if the run fails at any point after that, all rows (touched or not) are at epoch, MAX(synced_at) WHERE synced_at > '1970-01-01' returns NULL, and the guard is bypassed on the next run. Tests updated to assert the epoch-reset fires and precedes the first INSERT.
… in scheduler - Replace '> 1970-01-01' with '> epoch' in both skip guard queries; the DATE cast is timezone-sensitive and can include epoch rows on non-UTC DB servers — 'epoch' is an unambiguous PG epoch synonym - Catch ConflictException in both cron handlers and log at debug rather than error; a lock conflict is expected concurrency, not a step failure
…mocks - Skip guard now queries station_etl_run (status=completed, steps_failed=0, no error warning for this step) instead of MAX(synced_at) on the data table; removes both full-table epoch-reset and NOW()-advance UPDATEs that were rewriting ~500k rows on every terminal-distances run - INSERTs now write synced_at=NOW() directly; no epoch sentinel needed - Fix duplicate @nestjs/common imports in catalog-etl.scheduler.ts - Tighten terminal-distances spec mock: FROM station_terminal && !station_terminal_distance to prevent substring collision masking SQL changes - Update PR description: correct cron schedule (minute 5 for distances), correct endpoint/field names, accurate test plan
The skip guard previously lived inside each step's execute(), which caused runStep() to create a completed EtlRun record on every cron tick — even when returning early — refreshing completed_at and making the step permanently unable to run past the first cycle. Move the guard to CatalogEtlScheduler.shouldSkip() via a new CatalogEtlService.getLastSuccessfulStepRun() method so no run record is created on a skip. Steps execute() now starts immediately with the API fetch. Tests updated to drop skip-guard describe blocks and the station_etl_run mock branch; DataSource mock added to service spec.
…th CategoriesSyncStep - Add nullable step_name column to station_etl_run (migration 1779710000000) and EtlRun entity so runStep() records which step the single-step run executed - Set stepName in runStep() create call so every scheduler-triggered run is tagged with its step name from the start - Replace the NOT EXISTS warning-based skip guard query with a direct WHERE step_name = $1 filter, preventing historical full-ETL runs (which have no step_name) from falsely satisfying the guard on first deploy - Add index idx_etl_run_step_name for efficient skip guard lookups - Update runStep spec assertion to verify stepName is set on the created run - Merge conflict: include CategoriesSyncStep (from main/ISSUE-196) alongside TerminalsSyncStep and TerminalDistancesSyncStep in module, service, and spec
b618fc8 to
a518a31
Compare
Add NOT EXISTS check against station_etl_warning so that completed runs which recorded severity='error' warnings (without throwing) are not counted as successful — the skip guard would otherwise hold for 12h after a run that silently logged errors.
This was referenced May 26, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #195
Summary
TerminalsSyncStep: fetchesGET /terminals, resolves location FK (space station → outpost → city priority order), 44-column upsert withON CONFLICT (uex_id) DO UPDATE. Unresolvable primary-location FKs emitseverity=warnand the row still upserts with the FK nulled.poi_idand secondary FKs (star_system, planet, orbit, moon, faction, company) are nullable and silently nulled when not found.TerminalDistancesSyncStep: fetchesGET /terminals_distances, resolvesterminal_origin_idandterminal_destination_idfromstation_terminalby code string, batched upsert (500 rows/batch) intostation_terminal_distanceby(terminal_origin_id, terminal_destination_id). Unknown codes emitseverity=warnand the row is skipped.station_etl_runfor the most recentcompletedrun withsteps_failed=0and noseverity=errorwarning for this step name; skips and logs at DEBUG if within 12 hours. No full-tablesynced_atrewrites — skip guard is backed by the run lifecycle table.CatalogEtlService.runStep(name): acquires the samecatalog_etladvisory lock asrunEtl(), creates a realEtlRunrow, executes the step, and persists anEtlWarningon failure before rethrowing.CatalogEtlScheduler:terminals-syncfires at minute 0 (@Cron('0 * * * *'));terminal-distances-syncfires at minute 5 (@Cron('5 * * * *')) to ensure terminals are populated first.ConflictExceptionfrom the advisory lock is logged at DEBUG (expected concurrency); all other errors are logged at ERROR.CatalogEtlModuleproviders and added toETL_STEPSpipeline afterjump-points-sync.Test plan
pnpm test --filter backendpasses (175 catalog-etl tests green)station_etl_runreturningcompleted_atwithin 12h → UEX client not calledcompleted_at> 12h ago → full upsert cycle runslast_completed) → runs unconditionally (first-deploy safe)ConflictExceptionfromrunStepis caught and logged at debug in schedulerrunStep('unknown-step')→ throwsError: Unknown ETL step: unknown-step