diff --git a/backend/dist/config/db.js b/backend/dist/config/db.js index c39e1d01..acd5e740 100644 --- a/backend/dist/config/db.js +++ b/backend/dist/config/db.js @@ -39,7 +39,12 @@ exports.pool = new pg_1.Pool({ // --------------------------------------------------------------------------- // Pool event listeners — structured logging for diagnostics // --------------------------------------------------------------------------- -exports.pool.on("connect", () => { +exports.pool.on("connect", (client) => { + client + .query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED") + .catch((err) => { + console.error("[POOL] Failed to configure transaction isolation:", err.message); + }); if (process.env.NODE_ENV !== "production") { console.log(`[POOL] New client connected | total=${exports.pool.totalCount} idle=${exports.pool.idleCount} waiting=${exports.pool.waitingCount}`); } diff --git a/backend/dist/index.js b/backend/dist/index.js index 3dddd337..0de864e7 100644 --- a/backend/dist/index.js +++ b/backend/dist/index.js @@ -17,6 +17,7 @@ const activity_1 = __importDefault(require("./routes/activity")); const uploads_1 = __importDefault(require("./routes/uploads")); const bulk_1 = __importDefault(require("./routes/bulk")); const pool_1 = __importDefault(require("./routes/pool")); +const state_1 = __importDefault(require("./routes/state")); dotenv_1.default.config(); const app = (0, express_1.default)(); const port = process.env.PORT || 3001; @@ -35,6 +36,7 @@ app.use("/api/v1/activity", activity_1.default); app.use("/api/v1/uploads", uploads_1.default); app.use("/api/v1/bulk", bulk_1.default); app.use("/api/v1/pool", pool_1.default); +app.use("/api/v1/state", state_1.default); // Basic healthcheck route app.get("/health", async (req, res) => { const startTime = Date.now(); diff --git a/backend/dist/routes/state.js b/backend/dist/routes/state.js new file mode 100644 index 00000000..afb19b64 --- /dev/null +++ b/backend/dist/routes/state.js @@ -0,0 +1,51 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const express_1 = require("express"); +const zod_1 = require("zod"); +const db_1 = require("../config/db"); +const tracing_1 = require("../utils/tracing"); +const router = (0, express_1.Router)(); +const recoveryQuerySchema = zod_1.z.object({ + status: zod_1.z.enum(["pending", "committed", "failed", "abandoned"]).optional(), + limit: zod_1.z.coerce.number().int().min(1).max(200).default(50), +}); +/** + * GET /api/v1/state/write-recovery + * + * Lists durable write-recovery rows for interrupted or retryable database + * mutations. The query is intentionally bounded and ordered by the indexed + * status/updated_at tuple from the migration to avoid table scans under load. + */ +router.get("/write-recovery", async (req, res) => { + try { + const query = recoveryQuerySchema.parse(req.query); + const params = [query.limit]; + let sql = ` + SELECT id, idempotency_key, operation, entity_type, entity_id, status, + attempts, last_error, recovery_payload, created_at, updated_at + FROM write_recovery_records + `; + if (query.status) { + params.unshift(query.status); + sql += " WHERE status = $1 ORDER BY updated_at DESC, id DESC LIMIT $2"; + } + else { + sql += " ORDER BY updated_at DESC, id DESC LIMIT $1"; + } + const result = await db_1.pool.query(sql, params); + tracing_1.logger.info("Write recovery state queried", { + status: query.status || "any", + limit: query.limit, + returned: result.rowCount, + }); + res.status(200).json(result.rows); + } + catch (error) { + if (error instanceof zod_1.z.ZodError) { + return res.status(400).json({ error: error.issues }); + } + tracing_1.logger.error("Write recovery state query failed", { error: error.message }); + res.status(500).json({ error: "Failed to retrieve write recovery state" }); + } +}); +exports.default = router; diff --git a/backend/migrations/20260527000002_write_recovery_records.sql b/backend/migrations/20260527000002_write_recovery_records.sql new file mode 100644 index 00000000..1ea9d83a --- /dev/null +++ b/backend/migrations/20260527000002_write_recovery_records.sql @@ -0,0 +1,28 @@ +-- Durable ledger for database writes that may be interrupted after the request is accepted. +-- A pending/failed row gives operators and retry workers a stable idempotency key to inspect. +CREATE TABLE IF NOT EXISTS write_recovery_records ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + idempotency_key TEXT NOT NULL UNIQUE, + operation TEXT NOT NULL, + entity_type TEXT NOT NULL, + entity_id UUID, + status TEXT NOT NULL DEFAULT 'pending', + attempts INT NOT NULL DEFAULT 0, + last_error TEXT, + recovery_payload JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT write_recovery_status_check + CHECK (status IN ('pending', 'committed', 'failed', 'abandoned')) +); + +CREATE INDEX IF NOT EXISTS idx_write_recovery_status_updated + ON write_recovery_records (status, updated_at DESC, id DESC); + +CREATE INDEX IF NOT EXISTS idx_write_recovery_entity + ON write_recovery_records (entity_type, entity_id) + WHERE entity_id IS NOT NULL; + +CREATE TRIGGER write_recovery_records_updated_at + BEFORE UPDATE ON write_recovery_records + FOR EACH ROW EXECUTE FUNCTION set_updated_at(); diff --git a/backend/src/config/db.ts b/backend/src/config/db.ts index 6b3b71e2..f772b9ec 100644 --- a/backend/src/config/db.ts +++ b/backend/src/config/db.ts @@ -34,7 +34,13 @@ export const pool = new Pool({ // --------------------------------------------------------------------------- // Pool event listeners — structured logging for diagnostics // --------------------------------------------------------------------------- -pool.on("connect", () => { +pool.on("connect", (client: PoolClient) => { + client + .query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED") + .catch((err) => { + console.error("[POOL] Failed to configure transaction isolation:", err.message); + }); + if (process.env.NODE_ENV !== "production") { console.log( `[POOL] New client connected | total=${pool.totalCount} idle=${pool.idleCount} waiting=${pool.waitingCount}` diff --git a/backend/src/index.ts b/backend/src/index.ts index b0f6d997..45f7a541 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -12,6 +12,7 @@ import activityRoutes from "./routes/activity"; import uploadsRoutes from "./routes/uploads"; import bulkRoutes from "./routes/bulk"; import poolRoutes from "./routes/pool"; +import stateRoutes from "./routes/state"; dotenv.config(); @@ -56,6 +57,7 @@ app.use("/api/v1/activity", activityRoutes); app.use("/api/v1/uploads", uploadsRoutes); app.use("/api/v1/bulk", bulkRoutes); app.use("/api/v1/pool", poolRoutes); +app.use("/api/v1/state", stateRoutes); // Health check endpoint with database connectivity verification app.get("/health", async (req: Request, res: Response) => { diff --git a/backend/src/routes/state.ts b/backend/src/routes/state.ts new file mode 100644 index 00000000..8d3e7546 --- /dev/null +++ b/backend/src/routes/state.ts @@ -0,0 +1,57 @@ +import { Router, Request, Response } from "express"; +import { z } from "zod"; +import { pool } from "../config/db"; +import { logger } from "../utils/tracing"; + +const router = Router(); + +const recoveryQuerySchema = z.object({ + status: z.enum(["pending", "committed", "failed", "abandoned"]).optional(), + limit: z.coerce.number().int().min(1).max(200).default(50), +}); + +/** + * GET /api/v1/state/write-recovery + * + * Lists durable write-recovery rows for interrupted or retryable database + * mutations. The query is intentionally bounded and ordered by the indexed + * status/updated_at tuple from the migration to avoid table scans under load. + */ +router.get("/write-recovery", async (req: Request, res: Response) => { + try { + const query = recoveryQuerySchema.parse(req.query); + const params: Array = [query.limit]; + + let sql = ` + SELECT id, idempotency_key, operation, entity_type, entity_id, status, + attempts, last_error, recovery_payload, created_at, updated_at + FROM write_recovery_records + `; + + if (query.status) { + params.unshift(query.status); + sql += " WHERE status = $1 ORDER BY updated_at DESC, id DESC LIMIT $2"; + } else { + sql += " ORDER BY updated_at DESC, id DESC LIMIT $1"; + } + + const result = await pool.query(sql, params); + + logger.info("Write recovery state queried", { + status: query.status || "any", + limit: query.limit, + returned: result.rowCount, + }); + + res.status(200).json(result.rows); + } catch (error: any) { + if (error instanceof z.ZodError) { + return res.status(400).json({ error: error.issues }); + } + + logger.error("Write recovery state query failed", { error: error.message }); + res.status(500).json({ error: "Failed to retrieve write recovery state" }); + } +}); + +export default router;