Phase 3 of #139. Depends on Phase 1 (seam) and reuses migrations from Phase 2 (SQLite schema).
Goal
Implement PostgresAdapter — the production-grade server target. Multi-host aggregation is the first-class goal of this phase: N hosts running relayburn ingest concurrently against the same Postgres DB must converge to one canonical row per record, with no duplication and no manual coordination.
Selected via RELAYBURN_STORAGE=postgres + RELAYBURN_DATABASE_URL=postgres://….
Why dedup just works across hosts
The existing index-sidecar.ts hashes are content-addressed:
turnIdHash = SHA256(source | sessionId | messageId)
relationshipIdHash, toolResultEventIdHash, userTurnIdHash, turnContentFingerprint similarly.
Two hosts ingesting the same Claude session file produce identical hashes, so a PRIMARY KEY on the hash column + INSERT … ON CONFLICT DO NOTHING is sufficient. No client-side coordination needed.
Implementation notes
- Runtime dep:
pg. Loaded via dynamic import() so it's only required when the adapter is actually selected (file/sqlite users don't need it installed).
- Schema: same logical layout as the SqliteAdapter, generated from the shared
migrations/ directory with small dialect tweaks (e.g. BLOB → BYTEA, INTEGER PRIMARY KEY → BIGSERIAL). Versioned via a schema_versions table; migrations run automatically on init().
- Connection pooling via
pg.Pool.
withLock(name, fn) uses pg_advisory_xact_lock(hashtext($name)) inside a transaction — gives cross-host coordination without polling.
- Streaming
query* paths use cursor-based result iteration (pg-query-stream or manual cursor).
Files
- New:
packages/ledger/src/adapters/postgres-adapter.ts.
- Modified:
packages/ledger/src/adapters/migrations/001_initial.sql + dialect templating to emit Postgres SQL.
- Modified:
packages/ledger/src/adapters/factory.ts — wire up postgres branch.
- Modified:
packages/ledger/package.json — add pg to optionalDependencies.
Verification
- Parameterized adapter test suite (from Phase 2) passes against Postgres (testcontainers in CI;
pg-mem for fast unit tests).
- Multi-writer test: spawn two
relayburn ingest processes against overlapping Claude session files into the same DB; assert final turn count equals the union of host inputs and content rows have no duplicates.
- Advisory-lock smoke test: 10 concurrent writers calling
withLock('ingest', …) serialize correctly.
- Manual:
RELAYBURN_STORAGE=postgres RELAYBURN_DATABASE_URL=… relayburn analyze --json produces metrics matching file/sqlite runs within rounding.
Phase 3 of #139. Depends on Phase 1 (seam) and reuses migrations from Phase 2 (SQLite schema).
Goal
Implement
PostgresAdapter— the production-grade server target. Multi-host aggregation is the first-class goal of this phase: N hosts runningrelayburn ingestconcurrently against the same Postgres DB must converge to one canonical row per record, with no duplication and no manual coordination.Selected via
RELAYBURN_STORAGE=postgres+RELAYBURN_DATABASE_URL=postgres://….Why dedup just works across hosts
The existing
index-sidecar.tshashes are content-addressed:turnIdHash = SHA256(source | sessionId | messageId)relationshipIdHash,toolResultEventIdHash,userTurnIdHash,turnContentFingerprintsimilarly.Two hosts ingesting the same Claude session file produce identical hashes, so a PRIMARY KEY on the hash column +
INSERT … ON CONFLICT DO NOTHINGis sufficient. No client-side coordination needed.Implementation notes
pg. Loaded via dynamicimport()so it's only required when the adapter is actually selected (file/sqlite users don't need it installed).migrations/directory with small dialect tweaks (e.g.BLOB→BYTEA,INTEGER PRIMARY KEY→BIGSERIAL). Versioned via aschema_versionstable; migrations run automatically oninit().pg.Pool.withLock(name, fn)usespg_advisory_xact_lock(hashtext($name))inside a transaction — gives cross-host coordination without polling.query*paths use cursor-based result iteration (pg-query-streamor manual cursor).Files
packages/ledger/src/adapters/postgres-adapter.ts.packages/ledger/src/adapters/migrations/001_initial.sql+ dialect templating to emit Postgres SQL.packages/ledger/src/adapters/factory.ts— wire uppostgresbranch.packages/ledger/package.json— addpgto optionalDependencies.Verification
pg-memfor fast unit tests).relayburn ingestprocesses against overlapping Claude session files into the same DB; assert final turn count equals the union of host inputs and content rows have no duplicates.withLock('ingest', …)serialize correctly.RELAYBURN_STORAGE=postgres RELAYBURN_DATABASE_URL=… relayburn analyze --jsonproduces metrics matching file/sqlite runs within rounding.