Skip to content

JoelIngreen/MW-TS-Script-Train

Repository files navigation

mw-train-ingestor-job

Node.js + TypeScript service that periodically fetches train position data from an external API, calculates real-time positions using GeoJSON routes and Turf.js, and persists the results in PostgreSQL via Prisma.


Quick start

cp .env.example .env
# Edit .env — set DATABASE_URL, EXTERNAL_SERVICE_URL and API_KEY_MG_TRENES at minimum
npm install
npx prisma generate
npx prisma migrate deploy   # skip if ENABLE_DATABASE=false
npx prisma db seed          # skip if ENABLE_DATABASE=false
npm run job

ENABLE_DATABASE=false by default. The job still fetches real data from the external API but skips all DB writes and logs the payload instead. Set ENABLE_DATABASE=true and provide DATABASE_URL to persist data.

The database runs on a separate VPS — there is no local db service in Docker Compose. Provide the remote DATABASE_URL via environment variable or .env file.


Environment variables

Copy the example file and fill in the values:

cp .env.example .env
Variable Required Description
NODE_ENV No development or production. Default: development
SCHEDULE_STR No node-cron expression used only by npm run dev. Default: */30 * * * * *
EXTERNAL_SERVICE_URL Yes External train API endpoint
API_KEY_MG_TRENES Yes Bearer token for the external API
SPEED No Average train speed in km/h used to interpolate position. Default: 60
ENABLE_DATABASE No true / 1 enables PostgreSQL, false / 0 runs in mock mode. Default: false
POSTGRES_USER If ENABLE_DATABASE=true Database user
POSTGRES_PASSWORD If ENABLE_DATABASE=true Database password
POSTGRES_HOST If ENABLE_DATABASE=true Hostname or IP of the remote database VPS
POSTGRES_PORT No Database port. Default: 5432
POSTGRES_DB If ENABLE_DATABASE=true Database name
DATABASE_URL If ENABLE_DATABASE=true Full Prisma/PostgreSQL connection string. Overrides individual POSTGRES_* vars

If your tables live in a non-default schema, append ?schema=<your_schema> to DATABASE_URL:

DATABASE_URL="postgresql://user:pass@host:5432/db?schema=etl_example"

Mock mode (ENABLE_DATABASE=false): the job fetches real data from the external API and logs it, but skips all DB writes. No database required.


Switching the database per environment

The only value you need to change to point to a different database is DATABASE_URL. The POSTGRES_* variables are conveniences for building it — Prisma and the app only read DATABASE_URL.

How it works internally

src/db/prisma.ts creates the client like this:

const pool    = new Pool({ connectionString: envs.DATABASE_URL });
const adapter = new PrismaPg(pool);
export const prisma = new PrismaClient({ adapter });

If ENABLE_DATABASE=0, prisma is null and no connection is opened.

Configuration per environment

Environment DATABASE_URL Notes
Remote VPS postgresql://postgres:pass@10.238.192.89:5432/middleware?schema=etl_example Production DB on external VPS
CI / staging postgresql://ci_user:ci_pass@staging-host:5432/cde_trains Injected as a secret in the pipeline
Mock / no DB (any value or empty) ENABLE_DATABASE=0 — no connection is opened

Steps to point to the remote database

1. Edit DATABASE_URL in your .env:

POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_password
POSTGRES_HOST=10.238.192.89
POSTGRES_PORT=5432
POSTGRES_DB=middleware

DATABASE_URL="postgresql://postgres:your_password@10.238.192.89:5432/middleware?schema=etl_example"

2. Verify the user has the minimum required permissions:

-- Connected as superuser on the target database:
GRANT CONNECT ON DATABASE middleware       TO postgres;
GRANT USAGE   ON SCHEMA etl_example       TO postgres;
GRANT SELECT, INSERT, UPDATE, DELETE
      ON ALL TABLES IN SCHEMA etl_example TO postgres;
GRANT USAGE, SELECT
      ON ALL SEQUENCES IN SCHEMA etl_example TO postgres;

-- For permissions to apply to future tables (migrations) as well:
ALTER DEFAULT PRIVILEGES IN SCHEMA etl_example
  GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES    TO postgres;
ALTER DEFAULT PRIVILEGES IN SCHEMA etl_example
  GRANT USAGE, SELECT                   ON SEQUENCES TO postgres;

If the user also runs migrations (prisma migrate deploy), they additionally need CREATE on the schema:

GRANT CREATE ON SCHEMA etl_example TO postgres;

3. Apply migrations and seed if the database is new:

# Create all tables
npx prisma migrate deploy

# Insert route and stations (idempotent — safe to run multiple times)
npx prisma db seed

4. Verify the connection before starting:

npm run job
# You should see: "Database connection verified successfully."

Requirements on the remote VPS

Before connecting, make sure the remote VPS has:

  • Port 5432 open in the firewall for the IP of the server running the job
  • pg_hba.conf with a line that allows connections from that IP:
    host    middleware    postgres    <job_server_IP>/32    md5
    
  • postgresql.conf with listen_addresses = '*' (or the specific IP)

In Docker / Portainer

Never put credentials in the image. Pass them as environment variables at runtime:

docker run --rm \
  -e ENABLE_DATABASE=1 \
  -e DATABASE_URL="postgresql://postgres:your_password@10.238.192.89:5432/middleware?schema=etl_example" \
  mw-train-ingestor-job

In Portainer, configure them under Stacks → Environment variables or mount a .env as a secret.


Project structure

src/
├── config/
│   └── env.ts                 Environment variables via env-var
├── db/
│   └── prisma.ts              Prisma singleton + startup validation
├── jobs/
│   └── trainJob.ts            Orchestrates one full execution cycle
├── logger/
│   └── logger.ts              Winston logger with getLogger(name)
├── processing/
│   └── trainProcessor.ts      Parses API response, calculates positions with Turf.js
├── repositories/
│   ├── infrastructure.repository.ts   Route + station queries
│   └── train.repository.ts            Batch, snapshot and device persistence
├── scheduler/
│   └── jobScheduler.ts        node-cron wrapper used by npm run dev
├── services/
│   └── apiService.ts          HTTP client for the external train API (with retry)
├── types.ts                   Shared TypeScript interfaces and enums
├── index.ts                   Dev entry point — starts the scheduler
└── job.ts                     One-shot entry point — runs once and exits
prisma/
├── schema.prisma              Source of truth for the DB schema
├── prisma.config.ts           Prisma 7 config with pg adapter
└── seed.ts                    Idempotent seed (route and stations)

Running the job

Prerequisites

npm install
npx prisma generate
npx prisma migrate deploy   # skip if ENABLE_DATABASE=false
npx prisma db seed          # skip if ENABLE_DATABASE=false

One-shot mode (production)

Runs trainJob once and exits. This is the execution model used by Docker and Portainer jobs.

npm run job

Expected log flow:

2026-03-05 08:45:10 | info  | job        | One-shot job starting.
2026-03-05 08:45:11 | info  | app        | Database connection verified successfully.
2026-03-05 08:45:11 | info  | trainJob   | Train job started.
2026-03-05 08:45:11 | info  | trainJob   | Batch created: 9d6855d4-33f3-46cf-bb3f-ccf13869b1bc
2026-03-05 08:45:11 | info  | trainJob   | Fetching data from external API for app="MG_trenes"...
2026-03-05 08:45:12 | info  | trainJob   | External API response received (15420 chars).
2026-03-05 08:45:12 | info  | trainJob   | Payload parsed. executionDate="2026-03-05T07:44:48.000Z" trains=5
2026-03-05 08:45:12 | info  | trainJob   | Raw data persisted for batch="9d6855d4-...".
2026-03-05 08:45:12 | info  | trainJob   | Processing complete: 5 train snapshots generated.
2026-03-05 08:45:12 | info  | trainJob   | Snapshots persisted for batch="9d6855d4-...".
2026-03-05 08:45:12 | info  | trainJob   | Batch "9d6855d4-..." closed with status SUCCESS (records=5).
2026-03-05 08:45:12 | info  | trainJob   | Train job finished successfully.
2026-03-05 08:45:12 | info  | job        | One-shot job finished successfully.

Development mode

Starts the scheduler — re-runs the job on every SCHEDULE_STR tick. Restarts automatically on file changes via tsx watch.

npm run dev

Docker

docker build -t mw-train-ingestor-job .
docker run --rm --env-file .env mw-train-ingestor-job

The container exits automatically after the job completes. Scheduling is handled externally by a Portainer Job.

There is no local db service — the container connects directly to the remote PostgreSQL VPS via DATABASE_URL.


External API resilience

apiService classifies every error as either retryable or permanent and retries automatically when appropriate.

Retry policy

Attempt Delay before next attempt
1 (immediate)
2 5 s
3 15 s
4 30 s

After 4 failed attempts the error is re-thrown and the batch closes with status ERROR.

Error classification

HTTP / network condition Retryable Notes
Network timeout (ECONNABORTED, ETIMEDOUT) Axios-level, no response received
DNS failure (ENOTFOUND, EAI_AGAIN) Configuration issue — check EXTERNAL_SERVICE_URL
TLS certificate error Check NODE_ENV / rejectUnauthorized
401 / 403 Authentication failed Check API_KEY_MG_TRENES
404 — remote ASP internal timeout (80072ee2) Server-side timeout disguised as 404; see note below
404 — genuine endpoint not found Wrong EXTERNAL_SERVICE_URL or app value
429 Rate limited Too many requests
5xx Server error Upstream instability
Empty or unexpected response body (HTTP 200) Payload issue

Note on the ASP 404 timeout: The external API occasionally returns HTTP 404 with an HTML body containing msxml3.dll error '80072ee2'. This is an internal timeout on their side — not a configuration error. The service detects this pattern, logs a warn, and retries automatically.

All error events are logged via Winston (apiService logger) with full context: app name, URL, HTTP status, and whether the error is retryable.


Database

Stack

PostgreSQL accessed via Prisma 7 using the native pg driver through @prisma/adapter-pg. The client is initialised once as a module-level singleton in src/db/prisma.ts:

const pool    = new Pool({ connectionString: DATABASE_URL });
const adapter = new PrismaPg(pool);
export const prisma = new PrismaClient({ adapter });

prisma is null when ENABLE_DATABASE=false. All repository functions guard with a mock-mode log before any DB call.

📐 DB diagramdbdiagram.io/d/db-marzo-2026

Schema and naming conventions

The full schema is in prisma/schema.prisma. Conventions applied across all models:

  • Table names: snake_case with domain prefix (cfg_, rail_, job_)
  • Config IDs: VARCHAR(10) — short, readable, manually assigned (e.g. ST007)
  • Operational IDs: UUID auto-generated by Postgres (@default(uuid()))
  • Timestamps: TIMESTAMPTZ for all operational date fields

Models

Domain Table Description
cfg_ cfg_device Registered devices. Each row identifies a physical device by resource + capability (e.g. TRAIN + POSITION)
rail_ rail_station Train stations with name and latitude / longitude coordinates
rail_ rail_route Named routes with a GeoJSON FeatureCollection containing the LineString geometry
job_ job_execution_batch Record of each job execution (start, end, status)
job_ job_raw_data Raw payload received from the external API, per batch
job_ job_device_snapshot One snapshot per device per batch — records resource, capability and status
job_ job_train_position Position detail linked 1:1 to a job_device_snapshot — stores coordinates, departure/next station and minutes to next station

Key schema design decisions

Device identitycfg_device uses a (idExternal, resource, capability) unique key instead of a single deExternalId. This allows the same external ID to appear under different resource/capability combinations (e.g. a future elevator sensor reusing the same ID namespace).

Snapshot + position split — position data has been extracted from the snapshot row into a dedicated job_train_position table linked 1:1 via idSnapshot. The snapshot table stays generic (valid for any device type), while type-specific detail tables (job_train_position, and future job_elevator_electrical, etc.) carry the payload. This avoids a wide table with many nullable columns.

Station coordinatesrail_station stores latitude and longitude as plain Float columns instead of a geoJson blob. The repository converts them to [longitude, latitude] arrays (GeoJSON / Turf convention) before passing them to trainProcessor.

Route geometryrail_route uses a geoJson column containing a FeatureCollection with a LineString feature, which Turf reads directly for position interpolation.

Migrations

# Generate client after schema changes
npx prisma generate

# Apply migrations in development
npx prisma migrate dev

# Apply migrations in production (also runs automatically in Docker on startup)
npx prisma migrate deploy

# Seed route and stations (idempotent — safe to run multiple times)
npx prisma db seed

# Generate equivalent SQL for review
npx prisma migrate diff --from-empty --to-schema prisma/schema.prisma --script > docs/db/schema.sql

How the job works

Each execution cycle runs the following steps in order:

  1. Open batch — creates a job_execution_batch record with status PENDING
  2. Fetch — calls the external API via apiService, with automatic retry on transient errors (up to 4 attempts)
  3. Parse — strips zero-width characters, parses JSON, extracts cargaInicio timestamp
  4. Persist raw data — inserts the raw payload into job_raw_data linked to the batch
  5. Process — resolves station coordinates from rail_station, interpolates train positions along the rail_route LineString using Turf.js
  6. Persist snapshots — upserts devices in cfg_device, resolves station IDs, inserts one job_device_snapshot + one job_train_position per train
  7. Close batch — updates the batch record with status SUCCESS, device counts and end timestamp

On any error, the batch is closed with status ERROR and the full error message is recorded before the process exits with code 1. ApiServiceError includes additional context (HTTP status, retryable flag) which is included in the error log.


Relation to mw-train-api

This repo is the write side of the system. It has no HTTP server and no API endpoints. The data it writes is read by mw-train-api, which exposes it via REST. Both repos share the same PostgreSQL database and Prisma schema — neither depends on the other at runtime.


Troubleshooting

Cannot find module '@prisma/client' or Prisma type errors Run npx prisma generate after every schema change or fresh npm install.

ENABLE_DATABASE=true but the job exits immediately with a DB error Check that DATABASE_URL is reachable from the machine running the job. Verify credentials, host, port, and that the remote VPS firewall allows connections from your IP.

Remote DB unreachable — connection refused or timeout Verify that port 5432 is open on the remote VPS firewall, pg_hba.conf allows your IP, and postgresql.conf has listen_addresses set to '*' or the specific IP.

npm run dev exits before starting the scheduler With ENABLE_DATABASE=true, startup validates the DB connection. If unreachable, the process exits before the scheduler starts. Use ENABLE_DATABASE=false for mock mode, or fix connectivity first.

Job runs but no data is persisted Confirm ENABLE_DATABASE=true in .env. If you see [MOCK] lines in logs, the service is still running in mock mode.

Tables not found / schema errors If your tables are in a non-default schema, make sure DATABASE_URL includes ?schema=etl_example (or your schema name). Run npx prisma migrate deploy to ensure all migrations are applied.

EXTERNAL_SERVICE_URL returns 401 API_KEY_MG_TRENES is missing or invalid. The client sends it as a Bearer token in the Authorization header.

Batch closes with ERROR after retries — remote ASP timeout The external API sometimes returns HTTP 404 with an ASP error 80072ee2 (internal timeout on their infrastructure). The service retries this automatically up to 4 times. If all attempts fail, check the apiService logs for warn lines showing each retry. This is a transient issue on the upstream server — try again later or contact the API provider.

Portainer job shows exit code 1 Check the container log for the [job] Fatal error: line. Common causes: DB unreachable, invalid API key, missing required env variable, or upstream API repeatedly timing out.

Old trains remain marked INACTIVE The job sets status: INACTIVE on the job_device_snapshot for any device not present in the latest payload. If this is not happening, verify that the last job_execution_batch finished with status SUCCESS. Partial or errored batches do not trigger the deactivation step.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors