Broker-driven CSV import system for customer data. The local stack now implements the current end-to-end runtime plus API-driven seed tooling, Postgres-backed inspection CLIs, compose smoke coverage, and focused workflow verification so local work can start from real, inspectable state instead of reverse-engineering.
For the current local setup, you only need Docker with Docker Compose.
cp .env.example .env
npm run compose:up.env.example is the local config contract. Copy it to .env, keep .env out of git, and treat it as the only local secret file for this repo.
If you want the local log navigation stack as well:
npm run compose:up:allThat profile keeps the default application stack unchanged and adds Grafana, Loki, and Alloy for local log exploration.
To stop the stack:
npm run compose:downAdditional Compose shortcuts:
npm run compose:up: start the non-observability stacknpm run compose:down: stop and remove the non-observability stacknpm run compose:up:all: start the full stack including observabilitynpm run compose:down:all: stop and remove the full stack including observabilitynpm run compose:up:observability: start onlyloki,grafana, andalloynpm run compose:down:observability: stop and remove onlyloki,grafana, andalloy
Useful local endpoints:
- API service:
http://localhost:3000by default - Grafana:
http://localhost:3004by default when theobservabilityprofile is enabled - PostgreSQL:
postgresql://csv_importer:csv_importer@localhost:5432/csv_importerby default - Structurizr UI:
http://localhost:8080by default - RabbitMQ management:
http://localhost:15672by default - RabbitMQ broker:
amqp://localhost:5672by default - Dashboard:
http://localhost:3005by default
RabbitMQ uses the credentials from .env. The service containers now stay up behind local /health readiness endpoints that verify the dependencies each service actually owns: import-service, parser-service, and customer-service verify authenticated Postgres and RabbitMQ connectivity, while api-service verifies that import-service is reachable over its internal HTTP boundary.
On stack startup, a one-shot flyway container applies the shared Postgres schema from db/migrations before the application services boot.
Useful database commands:
npm run db:migrate
npm run db:validate
npm run db:infoUseful developer-tooling commands:
npm run seed:import
npm run inspect:jobs -- --limit 10
npm run inspect:recovery -- --job-id <job-id>
npm run inspect:staged-rows -- --job-id <job-id>
npm run inspect:outcomes -- --job-id <job-id>
npm run recover:job -- --job-id <job-id>
npm run test:contracts
npm run test:smoke
npm run test:devtools
npm run test:e2e
npm run test:integrationFor npm run test:e2e, set NO_COLOR=1 to disable ANSI colors or
E2E_VERBOSE_COMPOSE=1 to stream unfiltered docker compose up output.
For the workflow-oriented script guide, see
scripts/workflow/README.md.
The local stack uses one root .env file shared by Docker Compose and the service containers.
Config is grouped into:
APP_ENVandIMPORT_STORAGE_ROOT- root Postgres host, port, database, user, and password for Flyway plus local admin tooling
- service-owned Postgres connection variables for
import-service,parser-service, andcustomer-service POSTGRES_HOST_PORT,API_SERVICE_HOST_PORT,RABBITMQ_HOST_PORT,RABBITMQ_MANAGEMENT_HOST_PORT, andSTRUCTURIZR_PORTfor host-published local ports- RabbitMQ host, ports, user, and password
BROKER_RETRY_DELAYS_MSfor the shared delayed retry budget and queue TTLsIMPORT_SERVICE_BASE_URLfor the internalapi-servicetoimport-serviceboundary- per-service
*_SERVICE_NAME,*_SERVICE_LOG_LEVEL, and*_SERVICE_PORT GRAFANA_PORT,GRAFANA_ADMIN_USER, andGRAFANA_ADMIN_PASSWORDfor the optional local Grafana UIPARSER_CONSUMER_ENABLEDandCUSTOMER_CONSUMER_ENABLEDto control which background consumers run locally
The four application services default to:
api-service 3000, import-service 3001, parser-service 3002, and
customer-service 3003.
import-service, parser-service, and customer-service derive their Postgres and RabbitMQ connection URLs from those primitive variables at startup so the connection settings cannot drift. api-service derives its internal base URL for import-service the same way. Startup plus /health checks validate those dependencies with real authenticated handshakes rather than only checking open ports.
The shared upload storage root defaults to /data/imports and is backed by a Docker volume in local development.
For a quick import through the current local workflow, use the seed helper:
npm run seed:importThat command uploads csv/happy-path.csv through the real API boundary,
waits for terminal state, and prints the accepted job plus terminal status and
summary.
Other supported fixtures:
npm run seed:import -- --fixture partial-failure
npm run seed:import -- --fixture parse-failure
npm run seed:import -- --fixture single-rowTo inspect what the workflow persisted for one import job:
npm run inspect:jobs -- --job-id <job-id>
npm run inspect:recovery -- --job-id <job-id>
npm run inspect:staged-rows -- --job-id <job-id>
npm run inspect:outcomes -- --job-id <job-id>If a job is in dead_lettered, the operator recovery helpers now let you
inspect the active dead letters and trigger one job-scoped replay:
npm run inspect:recovery -- --job-id <job-id>
npm run recover:job -- --job-id <job-id>The compatibility upload helper still exists if you want to post an arbitrary CSV directly:
npm run import:post -- ./path/to/file.csvIf a required variable is missing or malformed, the affected service exits on startup with a clear validation error instead of running with partial configuration.
If you only want to run the architecture workspace:
docker compose up structurizrThe repo includes an opt-in local observability profile built on:
- Grafana Alloy for Docker log collection
- Loki for log storage and queries
- Grafana for Explore and Logs Drilldown
This profile only ingests logs from the four application services in version 1:
api-serviceimport-serviceparser-servicecustomer-service
Infrastructure containers such as postgres, rabbitmq, flyway, and structurizr stay out of Loki in this first iteration so the log views stay focused on import workflow debugging.
Typical local workflow:
- start the stack with
docker compose --profile observability up --build - or use
npm run compose:up:allfor the full stack andnpm run compose:up:observabilityfor the observability services only - open Grafana at
http://localhost:${GRAFANA_PORT}orhttp://localhost:3004 - use Explore or Logs Drilldown with the Loki data source
- filter first by the
service,level, andeventlabels - inspect
correlation_idandimport_job_idfrom structured metadata or the raw JSON log body
The application services still emit the same structured JSON logs to stdout. Alloy ships those logs to Loki without changing the application log schema.
For the detailed capture path and the workflow between Alloy, Loki, and
Grafana, read
docs/architecture/observability-profile.md.
- The repo includes the current end-to-end runtime plus local seed, inspection, and verification tooling.
- The local Docker Compose stack includes Postgres, Flyway, RabbitMQ, the four services, and Structurizr.
- An optional Docker Compose
observabilityprofile adds Grafana Alloy, Loki, and Grafana for local log navigation without changing the default stack. - The shared Postgres instance is managed by Flyway SQL migrations under
db/migrationsand is now split into service-owned schemas:import_service,parser_service,customer_service, andoperations. api-serviceaccepts CSV uploads, stores files in shared storage, forwards accepted import creation toimport-serviceover an internal HTTP boundary, and proxies public status, summary, failure, and recovery reads back toimport-service.import-servicenow owns accepted-job persistence, theimport.job.createdtransactional outbox, synchronous read and recovery APIs, and broker-safe operator replay orchestration.parser-servicestages normalized rows inparser_service.parsed_rows, writesimport.job.parse.succeededplus payload-carryingimport.row.processmessages intoparser_service.outbox_messages, and relays those success-path messages to RabbitMQ in publish order.customer-servicenow treats the RabbitMQ row message as the authoritative source of row content, never reads parser-owned tables, and reads or writes onlycustomer_servicetables.import-service,parser-service, andcustomer-serviceshare the current retry and recovery contract: retryable runtime failures publish delayed retry copies with broker confirms, non-retriable failures reject to service-local DLQs, and successful retried deliveries resolve their recovery rows.- RabbitMQ now uses the business topic exchange
csv-importer.v1plus the internal direct exchangecsv-importer.internal.v1, with per-service.retry.1,.retry.2,.retry.3, and.dlqqueues derived fromBROKER_RETRY_DELAYS_MS. - PostgreSQL now also stores
message_recovery_states,dead_letter_messages,operator_recovery_actions, andoperator_recovery_action_messagesso retry visibility, dead-letter inspection, and replay audit history survive consumer restarts. - Shared JSON message schemas, broker topology helpers, publish/consume validation, the import and parser outbox relays, and the recovery ledger are in place.
npm run seed:importnow provides fixture-driven end-to-end imports through the real API path, andnpm run import:postremains available as the lower-level upload helper.npm run inspect:jobs,npm run inspect:recovery,npm run inspect:staged-rows,npm run inspect:outcomes, andnpm run recover:jobnow cover the main local inspection and operator-recovery workflows.npm run test:smokenow validates the full current schema baseline through Flyway version15, including the service-owned schemas, andnpm run test:devtoolsverifies the seed plus inspect workflow against an isolated compose stack.docs/architecture/runtime.mdis the current runtime writeup.npm run test:contractsproves a realimport.job.createdpublish/consume flow through RabbitMQ.npm run test:e2eexercises the full stack through happy-path, malformed-CSV, partial-row-failure, duplicate-delivery, transient retry recovery, retry exhaustion to DLQ, and permanent business failures that stay on the normal failure path.npm run test:integrationchains the contracts, end-to-end, and developer-tooling verification scripts into one end-to-end developer workflow.- Implementation is being driven by the roadmap in
docs/architecture/roadmap.md. - The proposed production-oriented follow-on plan is in
docs/architecture/roadmap.v2.md.
Core directories:
db/migrations: Flyway SQL migrations for the shared Postgres instance and service-owned schemasservices/api-service: upload boundary plus public proxy toimport-serviceservices/import-service: import job persistence, state tracking, internal read API, and operator recovery orchestrationservices/parser-service: Rust CSV parsing, staging, and parser outbox relayservices/customer-service: payload-driven customer matching and writesservices/shared: shared message contracts, validation, logging, and Node broker runtime
For Node HTTP services, prefer Fastify for new or significantly refactored
service boundaries. api-service is the current reference implementation for
that pattern; other Node services have not been migrated yet.
scripts/workflow: seed/import helpers and Postgres inspection CLIsscripts/verification: smoke, contracts, end-to-end, and tooling verification scriptsscripts/lib: shared script helpersscripts/compose-stack.sh: compose wrapper kept in shell because it is mostly docker compose orchestrationobservability: local Grafana Alloy, Loki, and Grafana configurationdocs/architecture: architecture docs and ADRsdocs/architecture/structurizr: Structurizr workspace, generated static diagrams, and local cache
Read these next:
docs/architecture/runtime.md: current runtime flow, retry/DLQ topology, recovery ledger, and public recovery overlaydocs/architecture/observability-profile.md: local observability profile, Alloy log collection flow, and Grafana usagedocs/architecture/design-doc.md: architecture intent, scope, and constraintsdocs/architecture/roadmap.md: implementation order and milestone historydocs/architecture/roadmap.v2.md: production-oriented next direction for simplification, checkpointing, recovery, and operator visibilitydocs/architecture/open-questions.md: unresolved design decisionsdocs/architecture/structurizr/workspace.dsl: C4 model and static architecture viewsdocs/architecture/adr/0001-use-single-repo-without-nx.md: current repo structure decisiondocs/architecture/adr/0002-use-structured-json-logs-and-correlation-ids.md: logging and correlation conventionsdocs/architecture/adr/0003-use-flyway-for-shared-postgres-schema-migrations.md: schema migration workflow for the shared Postgres databasedocs/architecture/adr/0004-use-a-shared-rabbitmq-topology-and-contract-validation-at-broker-boundaries.md: RabbitMQ topology and contract validation rulesdocs/architecture/adr/0005-use-a-transactional-outbox-for-api-job-acceptance.md: original durableimport.job.createdpublication decision before the import boundary redesigndocs/architecture/adr/0007-add-broker-managed-retries-dead-letter-queues-and-a-recovery-ledger.md: delayed retry, DLQ, and recovery-ledger rules that extend ADR 0004docs/architecture/adr/0008-add-postgres-backed-operator-recovery-inspection-and-replay.md: operator recovery inspection, replay, and superseded dead-letter rules that extend ADR 0007docs/architecture/adr/0009-adopt-service-owned-schemas-and-payload-carrying-parser-handoffs.md: service-owned schemas, import-service workflow ownership, parser outbox, and payload-carryingimport.row.process
The project imports customer records from CSV files into a CRM. Files are accepted by the API boundary, processed asynchronously, parsed into normalized rows, and then applied to customer records with progress and row-level outcomes tracked along the way.
The architecture is intentionally distributed and broker-driven so the project can practice service boundaries, asynchronous workflows, idempotency, and observability instead of optimizing for the simplest possible implementation.
The current runtime now covers upload acceptance through the public API,
internal acceptance and read ownership in import-service, durable
import.job.created publication from the import outbox, parser-side
import.job.parse.started publication plus parser-side success-path outbox
relay, durable parser staging, payload-carrying row handoff to
customer-service, import-service-side row outcome aggregation, public failure
inspection through the API proxy, consumer-side retry plus DLQ visibility for
runtime failures, and job-scoped operator recovery inspection plus replay.
For v1, the system keeps the scope narrow: RabbitMQ handles the async workflow, Postgres stores durable state, the parser runs in Rust, and customer matching is email-only. The detailed domain rules and architectural rationale live in the docs rather than being duplicated here.