From 12cbbea1708a90429842ad5444c7d44a50d470eb Mon Sep 17 00:00:00 2001 From: rajashish147 Date: Fri, 27 Mar 2026 14:36:53 +0530 Subject: [PATCH 1/2] Phase 25 : implement webhook delivery system with BullMQ - Add webhook.queue.ts for managing asynchronous webhook delivery jobs. - Introduce webhook.worker.ts to handle delivery attempts, including retries and error handling. - Create integration tests for the webhooks admin API, covering CRUD operations and delivery management. - Add unit tests for HMAC signature generation, URL validation, and retry delay calculations. - Define database schema for webhooks, webhook events, and delivery tracking in a new SQL migration. --- .vscode/settings.json | 3 +- apps/api/src/db/query.ts | 3 + .../modules/employees/employees.controller.ts | 11 + .../modules/webhooks/webhooks.controller.ts | 90 ++++ .../modules/webhooks/webhooks.repository.ts | 181 +++++++ .../src/modules/webhooks/webhooks.routes.ts | 130 ++++++ .../src/modules/webhooks/webhooks.schema.ts | 101 ++++ .../src/modules/webhooks/webhooks.service.ts | 145 ++++++ apps/api/src/routes/index.ts | 2 + apps/api/src/types/database.ts | 137 +++++- apps/api/src/utils/event-bus.ts | 16 + apps/api/src/workers/startup.ts | 8 +- apps/api/src/workers/webhook-event.service.ts | 232 +++++++++ apps/api/src/workers/webhook.queue.ts | 159 +++++++ apps/api/src/workers/webhook.worker.ts | 344 ++++++++++++++ .../admin/webhooks.integration.test.ts | 440 ++++++++++++++++++ .../api/tests/unit/utils/webhook.unit.test.ts | 154 ++++++ .../20260326000200_phase25_webhooks.sql | 116 +++++ 18 files changed, 2269 insertions(+), 3 deletions(-) create mode 100644 apps/api/src/modules/webhooks/webhooks.controller.ts create mode 100644 apps/api/src/modules/webhooks/webhooks.repository.ts create mode 100644 apps/api/src/modules/webhooks/webhooks.routes.ts create mode 100644 apps/api/src/modules/webhooks/webhooks.schema.ts create mode 100644 apps/api/src/modules/webhooks/webhooks.service.ts create mode 100644 apps/api/src/workers/webhook-event.service.ts create mode 100644 apps/api/src/workers/webhook.queue.ts create mode 100644 apps/api/src/workers/webhook.worker.ts create mode 100644 apps/api/tests/integration/admin/webhooks.integration.test.ts create mode 100644 apps/api/tests/unit/utils/webhook.unit.test.ts create mode 100644 supabase/migrations/20260326000200_phase25_webhooks.sql diff --git a/.vscode/settings.json b/.vscode/settings.json index 3b66410..ab39889 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "git.ignoreLimitWarning": true + "git.ignoreLimitWarning": true, + "codeQL.githubDatabase.update": "never" } \ No newline at end of file diff --git a/apps/api/src/db/query.ts b/apps/api/src/db/query.ts index 1433317..8ea90aa 100644 --- a/apps/api/src/db/query.ts +++ b/apps/api/src/db/query.ts @@ -37,6 +37,9 @@ export type OrgScopedTable = Extract< | "org_daily_metrics" | "org_dashboard_snapshot" | "session_summaries" + | "webhook_deliveries" + | "webhook_events" + | "webhooks" >; export function orgTable( diff --git a/apps/api/src/modules/employees/employees.controller.ts b/apps/api/src/modules/employees/employees.controller.ts index 7c80d95..11c37d6 100644 --- a/apps/api/src/modules/employees/employees.controller.ts +++ b/apps/api/src/modules/employees/employees.controller.ts @@ -7,6 +7,7 @@ import { } from "./employees.schema.js"; import { ok, fail, paginated, handleError } from "../../utils/response.js"; import { NotFoundError } from "../../utils/errors.js"; +import { emitEvent } from "../../utils/event-bus.js"; export const employeesController = { /** @@ -29,6 +30,16 @@ export const employeesController = { "Employee created", ); + emitEvent("employee.created", { + organization_id: request.organizationId, + data: { + employee_id: employee.id, + employee_code: employee.employee_code, + name: employee.name, + created_at: employee.created_at, + }, + }); + reply.status(201).send(ok(employee)); } catch (error) { handleError(error, request, reply, "Unexpected error creating employee"); diff --git a/apps/api/src/modules/webhooks/webhooks.controller.ts b/apps/api/src/modules/webhooks/webhooks.controller.ts new file mode 100644 index 0000000..d1a01c3 --- /dev/null +++ b/apps/api/src/modules/webhooks/webhooks.controller.ts @@ -0,0 +1,90 @@ +/** + * webhooks.controller.ts — HTTP handler layer for the webhooks admin API. + * + * Delegates all business logic to webhooksService. + * Uses the standard ok / paginated / fail / handleError response helpers. + */ + +import type { FastifyRequest, FastifyReply } from "fastify"; +import { webhooksService } from "./webhooks.service.js"; +import { + createWebhookBodySchema, + updateWebhookBodySchema, + deliveryListQuerySchema, +} from "./webhooks.schema.js"; +import { ok, paginated, handleError } from "../../utils/response.js"; + +export const webhooksController = { + // ─── Webhook CRUD ────────────────────────────────────────────────────────── + + async create(request: FastifyRequest, reply: FastifyReply): Promise { + try { + const body = createWebhookBodySchema.parse(request.body); + const webhook = await webhooksService.createWebhook(request, body); + reply.status(201).send(ok(webhook)); + } catch (error) { + handleError(error, request, reply, "Failed to create webhook"); + } + }, + + async list(request: FastifyRequest, reply: FastifyReply): Promise { + try { + const webhooks = await webhooksService.listWebhooks(request); + reply.status(200).send(ok(webhooks)); + } catch (error) { + handleError(error, request, reply, "Failed to list webhooks"); + } + }, + + async update( + request: FastifyRequest<{ Params: { id: string } }>, + reply: FastifyReply, + ): Promise { + try { + const { id } = request.params; + const body = updateWebhookBodySchema.parse(request.body); + const webhook = await webhooksService.updateWebhook(request, id, body); + reply.status(200).send(ok(webhook)); + } catch (error) { + handleError(error, request, reply, "Failed to update webhook"); + } + }, + + async remove( + request: FastifyRequest<{ Params: { id: string } }>, + reply: FastifyReply, + ): Promise { + try { + const { id } = request.params; + await webhooksService.deleteWebhook(request, id); + reply.status(204).send(); + } catch (error) { + handleError(error, request, reply, "Failed to delete webhook"); + } + }, + + // ─── Deliveries ──────────────────────────────────────────────────────────── + + async listDeliveries(request: FastifyRequest, reply: FastifyReply): Promise { + try { + const query = deliveryListQuerySchema.parse(request.query); + const { data, total } = await webhooksService.listDeliveries(request, query); + reply.status(200).send(paginated(data, query.page, query.limit, total)); + } catch (error) { + handleError(error, request, reply, "Failed to list webhook deliveries"); + } + }, + + async retryDelivery( + request: FastifyRequest<{ Params: { id: string } }>, + reply: FastifyReply, + ): Promise { + try { + const { id } = request.params; + const delivery = await webhooksService.retryDelivery(request, id); + reply.status(200).send(ok(delivery)); + } catch (error) { + handleError(error, request, reply, "Failed to retry delivery"); + } + }, +}; diff --git a/apps/api/src/modules/webhooks/webhooks.repository.ts b/apps/api/src/modules/webhooks/webhooks.repository.ts new file mode 100644 index 0000000..860a74f --- /dev/null +++ b/apps/api/src/modules/webhooks/webhooks.repository.ts @@ -0,0 +1,181 @@ +/** + * webhooks.repository.ts — Data access layer for webhooks, webhook_events, + * and webhook_deliveries tables. + * + * All queries are org-scoped via orgTable() to enforce tenant isolation. + * INSERT and UPSERT operations set organization_id explicitly and call + * supabaseServiceClient directly (matching existing repository conventions). + */ + +import type { FastifyRequest } from "fastify"; +import { orgTable } from "../../db/query.js"; +import { supabaseServiceClient as supabase } from "../../config/supabase.js"; +import type { + CreateWebhookBody, + UpdateWebhookBody, + WebhookPublic, + WebhookDelivery, + DeliveryListQuery, +} from "./webhooks.schema.js"; + +// ─── Webhook CRUD ───────────────────────────────────────────────────────────── + +export const webhooksRepository = { + /** + * Create a new webhook for the request's organization. + * The secret is stored but never returned in listing/get responses. + */ + async create( + request: FastifyRequest, + body: CreateWebhookBody, + ): Promise { + const { data, error } = await supabase + .from("webhooks") + .insert({ + organization_id: request.organizationId, + url: body.url, + secret: body.secret, + events: body.events as string[], + is_active: true, + }) + .select("id, organization_id, url, is_active, events, created_at, updated_at") + .single(); + + if (error) throw new Error(`Failed to create webhook: ${error.message}`); + return data as WebhookPublic; + }, + + /** Return all webhooks for the request's organization (secret excluded). */ + async list(request: FastifyRequest): Promise { + const { data, error } = await orgTable(request, "webhooks") + .select("id, organization_id, url, is_active, events, created_at, updated_at") + .order("created_at", { ascending: false }); + + if (error) throw new Error(`Failed to list webhooks: ${error.message}`); + return (data ?? []) as WebhookPublic[]; + }, + + /** Fetch a single webhook by id, org-scoped. Returns null if not found. */ + async findById( + request: FastifyRequest, + webhookId: string, + ): Promise { + const { data, error } = await orgTable(request, "webhooks") + .select("id, organization_id, url, is_active, events, created_at, updated_at") + .eq("id", webhookId) + .limit(1) + .maybeSingle(); + + if (error) throw new Error(`Failed to fetch webhook: ${error.message}`); + return (data as WebhookPublic | null) ?? null; + }, + + /** Update a webhook's mutable fields (url, events, is_active, secret). */ + async update( + request: FastifyRequest, + webhookId: string, + body: UpdateWebhookBody, + ): Promise { + const patch: Record = {}; + if (body.url !== undefined) patch.url = body.url; + if (body.events !== undefined) patch.events = body.events; + if (body.is_active !== undefined) patch.is_active = body.is_active; + if (body.secret !== undefined) patch.secret = body.secret; + + const { data, error } = await orgTable(request, "webhooks") + .update(patch) + .eq("id", webhookId) + .select("id, organization_id, url, is_active, events, created_at, updated_at") + .single(); + + if (error) throw new Error(`Failed to update webhook: ${error.message}`); + return data as WebhookPublic; + }, + + /** Soft-delete: permanently remove the webhook row (and cascade deliveries). */ + async delete(request: FastifyRequest, webhookId: string): Promise { + const { error } = await orgTable(request, "webhooks") + .delete() + .eq("id", webhookId); + + if (error) throw new Error(`Failed to delete webhook: ${error.message}`); + }, + + // ─── Deliveries ───────────────────────────────────────────────────────────── + + /** Paginated list of delivery attempts for the request's org. */ + async listDeliveries( + request: FastifyRequest, + query: DeliveryListQuery, + ): Promise<{ data: WebhookDelivery[]; total: number }> { + const from = (query.page - 1) * query.limit; + const to = from + query.limit - 1; + + let q = orgTable(request, "webhook_deliveries") + .select("*", { count: "exact" }) + .order("created_at", { ascending: false }) + .range(from, to); + + // Chainable filters — orgTable returns a select builder + if (query.webhook_id) { + q = (q as ReturnType).eq("webhook_id", query.webhook_id); + } + if (query.status) { + q = (q as ReturnType).eq("status", query.status); + } + + const { data, error, count } = await q; + + if (error) throw new Error(`Failed to list deliveries: ${error.message}`); + return { data: (data ?? []) as WebhookDelivery[], total: count ?? 0 }; + }, + + /** Fetch a single delivery row by id. */ + async findDeliveryById( + request: FastifyRequest, + deliveryId: string, + ): Promise { + const { data, error } = await orgTable(request, "webhook_deliveries") + .select("*") + .eq("id", deliveryId) + .limit(1) + .maybeSingle(); + + if (error) throw new Error(`Failed to fetch delivery: ${error.message}`); + return (data as WebhookDelivery | null) ?? null; + }, + + /** + * Fetch a webhook's url and secret for delivery — only the fields + * needed by the retry / queue path. + */ + async findWebhookSecretById( + request: FastifyRequest, + webhookId: string, + ): Promise<{ id: string; url: string; secret: string } | null> { + const { data, error } = await orgTable(request, "webhooks") + .select("id, url, secret") + .eq("id", webhookId) + .limit(1) + .maybeSingle(); + + if (error) throw new Error(`Failed to fetch webhook secret: ${error.message}`); + return (data as { id: string; url: string; secret: string } | null) ?? null; + }, + + /** Reset a delivery to pending with an updated next_retry_at. */ + async resetDeliveryForRetry( + request: FastifyRequest, + deliveryId: string, + nextRetryAt: string, + ): Promise { + const { data, error } = await orgTable(request, "webhook_deliveries") + .update({ status: "pending", next_retry_at: nextRetryAt }) + .eq("id", deliveryId) + .select("*") + .single(); + + if (error) throw new Error(`Failed to reset delivery: ${error.message}`); + return data as WebhookDelivery; + }, +}; diff --git a/apps/api/src/modules/webhooks/webhooks.routes.ts b/apps/api/src/modules/webhooks/webhooks.routes.ts new file mode 100644 index 0000000..3257969 --- /dev/null +++ b/apps/api/src/modules/webhooks/webhooks.routes.ts @@ -0,0 +1,130 @@ +/** + * webhooks.routes.ts — Admin API routes for webhook management. + * + * All routes require ADMIN role. + * + * GET /admin/webhooks — list webhooks (no secrets) + * POST /admin/webhooks — register a new webhook + * PATCH /admin/webhooks/:id — update url / events / active / secret + * DELETE /admin/webhooks/:id — remove webhook and all deliveries + * + * GET /admin/webhook-deliveries — list delivery attempts + * POST /admin/webhook-deliveries/:id/retry — manually retry a failed delivery + */ + +import type { FastifyInstance } from "fastify"; +import { z } from "zod"; +import { authenticate } from "../../middleware/auth.js"; +import { requireRole } from "../../middleware/role-guard.js"; +import { webhooksController } from "./webhooks.controller.js"; +import { + createWebhookBodySchema, + updateWebhookBodySchema, + deliveryListQuerySchema, + webhookPublicSchema, + webhookDeliverySchema, +} from "./webhooks.schema.js"; + +const webhookResponse = z.object({ + success: z.literal(true), + data: webhookPublicSchema, +}); + +const webhookListResponse = z.object({ + success: z.literal(true), + data: z.array(webhookPublicSchema), +}); + +const deliveryResponse = z.object({ + success: z.literal(true), + data: webhookDeliverySchema, +}); + +export async function webhooksRoutes(app: FastifyInstance): Promise { + // ─── Webhooks ────────────────────────────────────────────────────────────── + + app.get( + "/admin/webhooks", + { + schema: { + tags: ["admin", "webhooks"], + summary: "List registered webhooks (secrets omitted)", + response: { 200: webhookListResponse }, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.list, + ); + + app.post( + "/admin/webhooks", + { + schema: { + tags: ["admin", "webhooks"], + summary: "Register a new webhook endpoint", + body: createWebhookBodySchema, + response: { 201: webhookResponse }, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.create, + ); + + app.patch<{ Params: { id: string } }>( + "/admin/webhooks/:id", + { + schema: { + tags: ["admin", "webhooks"], + summary: "Update webhook url, events, active state, or secret", + params: z.object({ id: z.string().uuid() }), + body: updateWebhookBodySchema, + response: { 200: webhookResponse }, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.update, + ); + + app.delete<{ Params: { id: string } }>( + "/admin/webhooks/:id", + { + schema: { + tags: ["admin", "webhooks"], + summary: "Delete a webhook and all its delivery history", + params: z.object({ id: z.string().uuid() }), + response: { 204: z.null().describe("No content") }, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.remove, + ); + + // ─── Deliveries ──────────────────────────────────────────────────────────── + + app.get( + "/admin/webhook-deliveries", + { + schema: { + tags: ["admin", "webhooks"], + summary: "List webhook delivery attempts for this organization", + querystring: deliveryListQuerySchema, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.listDeliveries, + ); + + app.post<{ Params: { id: string } }>( + "/admin/webhook-deliveries/:id/retry", + { + schema: { + tags: ["admin", "webhooks"], + summary: "Manually retry a failed or succeeded delivery", + params: z.object({ id: z.string().uuid() }), + response: { 200: deliveryResponse }, + }, + preValidation: [authenticate, requireRole("ADMIN")], + }, + webhooksController.retryDelivery, + ); +} diff --git a/apps/api/src/modules/webhooks/webhooks.schema.ts b/apps/api/src/modules/webhooks/webhooks.schema.ts new file mode 100644 index 0000000..2b13571 --- /dev/null +++ b/apps/api/src/modules/webhooks/webhooks.schema.ts @@ -0,0 +1,101 @@ +/** + * webhooks.schema.ts — Zod schemas for the webhooks admin API. + * + * WEBHOOK_EVENT_TYPES is the canonical list of events that can be subscribed + * to. These must stay in sync with the EventDataMap keys in event-bus.ts. + */ + +import { z } from "zod"; + +// ─── Event type constants ──────────────────────────────────────────────────── + +export const WEBHOOK_EVENT_TYPES = [ + "employee.checked_in", + "employee.checked_out", + "expense.created", + "expense.approved", + "expense.rejected", + "employee.created", +] as const; + +export type WebhookEventType = typeof WEBHOOK_EVENT_TYPES[number]; + +// ─── Row shape ──────────────────────────────────────────────────────────────── + +/** Public-safe webhook row — secret is stripped before sending to clients. */ +export const webhookPublicSchema = z.object({ + id: z.string().uuid(), + organization_id: z.string().uuid(), + url: z.string().url(), + is_active: z.boolean(), + events: z.array(z.string()), + created_at: z.string(), + updated_at: z.string(), +}); + +export type WebhookPublic = z.infer; + +// ─── Request bodies ─────────────────────────────────────────────────────────── + +export const createWebhookBodySchema = z.object({ + url: z + .string() + .min(1, "url is required") + .url("url must be a valid URL"), + events: z + .array(z.enum(WEBHOOK_EVENT_TYPES)) + .min(1, "events must contain at least one event type"), + secret: z + .string() + .min(16, "secret must be at least 16 characters") + .max(256, "secret must be at most 256 characters"), +}); +export type CreateWebhookBody = z.infer; + +export const updateWebhookBodySchema = z + .object({ + url: z.string().url().optional(), + events: z.array(z.enum(WEBHOOK_EVENT_TYPES)).min(1).optional(), + is_active: z.boolean().optional(), + secret: z + .string() + .min(16, "secret must be at least 16 characters") + .max(256) + .optional(), + }) + .refine( + (v) => + v.url !== undefined || + v.events !== undefined || + v.is_active !== undefined || + v.secret !== undefined, + { message: "At least one field must be provided for update" }, + ); +export type UpdateWebhookBody = z.infer; + +// ─── Delivery row ───────────────────────────────────────────────────────────── + +export const webhookDeliverySchema = z.object({ + id: z.string().uuid(), + webhook_id: z.string().uuid(), + event_id: z.string().uuid(), + organization_id: z.string().uuid(), + status: z.enum(["pending", "success", "failed"]), + attempt_count: z.number(), + response_status: z.number().nullable(), + response_body: z.string().nullable(), + last_attempt_at: z.string().nullable(), + next_retry_at: z.string().nullable(), + created_at: z.string(), +}); +export type WebhookDelivery = z.infer; + +// ─── Query params ────────────────────────────────────────────────────────────── + +export const deliveryListQuerySchema = z.object({ + page: z.coerce.number().int().positive().default(1), + limit: z.coerce.number().int().min(1).max(100).default(20), + webhook_id: z.string().uuid().optional(), + status: z.enum(["pending", "success", "failed"]).optional(), +}); +export type DeliveryListQuery = z.infer; diff --git a/apps/api/src/modules/webhooks/webhooks.service.ts b/apps/api/src/modules/webhooks/webhooks.service.ts new file mode 100644 index 0000000..19434d5 --- /dev/null +++ b/apps/api/src/modules/webhooks/webhooks.service.ts @@ -0,0 +1,145 @@ +/** + * webhooks.service.ts — Business logic for webhook management and delivery retry. + * + * Rules: + * - URL validation (HTTPS-only, no private/loopback) is enforced before any + * write so invalid webhooks never reach the database. + * - Secret never returned to callers after creation — not even in update responses. + * - Manual retry re-enqueues a BullMQ job using attempt_count+1 so the + * existing retry delay schedule applies. + * - All queries are org-scoped through the repository. + */ + +import type { FastifyRequest } from "fastify"; +import { webhooksRepository } from "./webhooks.repository.js"; +import { validateWebhookUrl, InvalidWebhookUrlError } from "../../utils/url-validator.js"; +import { BadRequestError, NotFoundError } from "../../utils/errors.js"; +import { + enqueueWebhookDelivery, + WEBHOOK_MAX_ATTEMPTS, +} from "../../workers/webhook.queue.js"; +import type { + CreateWebhookBody, + UpdateWebhookBody, + WebhookPublic, + WebhookDelivery, + DeliveryListQuery, +} from "./webhooks.schema.js"; + +export const webhooksService = { + // ─── Webhook CRUD ────────────────────────────────────────────────────────── + + async createWebhook( + request: FastifyRequest, + body: CreateWebhookBody, + ): Promise { + try { + validateWebhookUrl(body.url); + } catch (err) { + if (err instanceof InvalidWebhookUrlError) { + throw new BadRequestError(err.message); + } + throw err; + } + + return webhooksRepository.create(request, body); + }, + + async listWebhooks(request: FastifyRequest): Promise { + return webhooksRepository.list(request); + }, + + async updateWebhook( + request: FastifyRequest, + webhookId: string, + body: UpdateWebhookBody, + ): Promise { + const existing = await webhooksRepository.findById(request, webhookId); + if (!existing) throw new NotFoundError("Webhook not found"); + + if (body.url) { + try { + validateWebhookUrl(body.url); + } catch (err) { + if (err instanceof InvalidWebhookUrlError) { + throw new BadRequestError(err.message); + } + throw err; + } + } + + return webhooksRepository.update(request, webhookId, body); + }, + + async deleteWebhook( + request: FastifyRequest, + webhookId: string, + ): Promise { + const existing = await webhooksRepository.findById(request, webhookId); + if (!existing) throw new NotFoundError("Webhook not found"); + return webhooksRepository.delete(request, webhookId); + }, + + // ─── Deliveries ──────────────────────────────────────────────────────────── + + async listDeliveries( + request: FastifyRequest, + query: DeliveryListQuery, + ): Promise<{ data: WebhookDelivery[]; total: number }> { + return webhooksRepository.listDeliveries(request, query); + }, + + /** + * Manually retry a delivery. + * + * Resets the delivery to `pending` and enqueues a new BullMQ job. + * The attempt_count is preserved so the existing retry schedule continues. + * This is intended for admin-initiated retries after investigating a failure. + * + * @throws {NotFoundError} if the delivery doesn't belong to this org. + * @throws {BadRequestError} if the delivery has not yet reached a terminal state. + */ + async retryDelivery( + request: FastifyRequest, + deliveryId: string, + ): Promise { + const delivery = await webhooksRepository.findDeliveryById(request, deliveryId); + if (!delivery) throw new NotFoundError("Delivery not found"); + + if (delivery.status === "pending") { + throw new BadRequestError("Delivery is already pending — retry not needed"); + } + + // Allow re-try even after max attempts — admin override + const nextAttempt = delivery.attempt_count + 1; + // Manual admin retries use immediate delay (no jitter for predictability) + const delayMs = 0; + + // Fetch the webhook to get current URL + secret (may have changed since creation) + const webhook = await webhooksRepository.findWebhookSecretById(request, delivery.webhook_id); + if (!webhook) throw new NotFoundError("Webhook not found"); + + // Reset delivery to pending + const nextRetryAt = new Date(Date.now() + delayMs).toISOString(); + const updated = await webhooksRepository.resetDeliveryForRetry(request, deliveryId, nextRetryAt); + + await enqueueWebhookDelivery( + { + delivery_id: deliveryId, + webhook_id: delivery.webhook_id, + event_id: delivery.event_id, + url: webhook.url, + secret: webhook.secret, + attempt_number: nextAttempt, + }, + delayMs, + ); + + request.log.info( + { deliveryId, webhookId: delivery.webhook_id, nextAttempt, delayMs }, + "webhooks.service: manual retry enqueued", + ); + + return updated; + }, +}; diff --git a/apps/api/src/routes/index.ts b/apps/api/src/routes/index.ts index e4d9117..bdc50e5 100644 --- a/apps/api/src/routes/index.ts +++ b/apps/api/src/routes/index.ts @@ -14,6 +14,7 @@ import { profileRoutes } from "../modules/profile/profile.routes.js"; import { adminDashboardRoutes } from "../modules/admin/dashboard.routes.js"; import { adminMapRoutes } from "../modules/admin/map.routes.js"; import { eventsRoutes } from "./events.routes.js"; +import { webhooksRoutes } from "../modules/webhooks/webhooks.routes.js"; export async function registerRoutes(app: FastifyInstance): Promise { await app.register(healthRoutes); @@ -31,4 +32,5 @@ export async function registerRoutes(app: FastifyInstance): Promise { await app.register(adminDashboardRoutes); await app.register(adminMapRoutes); await app.register(eventsRoutes); + await app.register(webhooksRoutes); } diff --git a/apps/api/src/types/database.ts b/apps/api/src/types/database.ts index 31af41f..a8e0ae5 100644 --- a/apps/api/src/types/database.ts +++ b/apps/api/src/types/database.ts @@ -629,8 +629,143 @@ export type Database = { referencedColumns: ["id"] }, ] + } webhook_deliveries: { + Row: { + id: string + webhook_id: string + event_id: string + organization_id: string + status: string + attempt_count: number + response_status: number | null + response_body: string | null + last_attempt_at: string | null + next_retry_at: string | null + created_at: string + } + Insert: { + id?: string + webhook_id: string + event_id: string + organization_id: string + status?: string + attempt_count?: number + response_status?: number | null + response_body?: string | null + last_attempt_at?: string | null + next_retry_at?: string | null + created_at?: string + } + Update: { + id?: string + webhook_id?: string + event_id?: string + organization_id?: string + status?: string + attempt_count?: number + response_status?: number | null + response_body?: string | null + last_attempt_at?: string | null + next_retry_at?: string | null + created_at?: string + } + Relationships: [ + { + foreignKeyName: "webhook_deliveries_webhook_id_fkey" + columns: ["webhook_id"] + isOneToOne: false + referencedRelation: "webhooks" + referencedColumns: ["id"] + }, + { + foreignKeyName: "webhook_deliveries_event_id_fkey" + columns: ["event_id"] + isOneToOne: false + referencedRelation: "webhook_events" + referencedColumns: ["id"] + }, + { + foreignKeyName: "webhook_deliveries_organization_id_fkey" + columns: ["organization_id"] + isOneToOne: false + referencedRelation: "organizations" + referencedColumns: ["id"] + }, + ] } - } + webhook_events: { + Row: { + id: string + organization_id: string + event_type: string + payload: Json + created_at: string + } + Insert: { + id?: string + organization_id: string + event_type: string + payload?: Json + created_at?: string + } + Update: { + id?: string + organization_id?: string + event_type?: string + payload?: Json + created_at?: string + } + Relationships: [ + { + foreignKeyName: "webhook_events_organization_id_fkey" + columns: ["organization_id"] + isOneToOne: false + referencedRelation: "organizations" + referencedColumns: ["id"] + }, + ] + } + webhooks: { + Row: { + id: string + organization_id: string + url: string + secret: string + is_active: boolean + events: string[] + created_at: string + updated_at: string + } + Insert: { + id?: string + organization_id: string + url: string + secret: string + is_active?: boolean + events?: string[] + created_at?: string + updated_at?: string + } + Update: { + id?: string + organization_id?: string + url?: string + secret?: string + is_active?: boolean + events?: string[] + created_at?: string + updated_at?: string + } + Relationships: [ + { + foreignKeyName: "webhooks_organization_id_fkey" + columns: ["organization_id"] + isOneToOne: false + referencedRelation: "organizations" + referencedColumns: ["id"] + }, + ] + } } Views: { [_ in never]: never } diff --git a/apps/api/src/utils/event-bus.ts b/apps/api/src/utils/event-bus.ts index a1515ad..effe288 100644 --- a/apps/api/src/utils/event-bus.ts +++ b/apps/api/src/utils/event-bus.ts @@ -164,6 +164,22 @@ export type EventDataMap = { /** Admin-supplied rejection reason; undefined if not provided */ rejection_comment: string | undefined; }; + + // ── Employees ────────────────────────────────────────────────────────────── + + /** + * Fired when an admin creates a new employee record. + * + * Contains: the new employee's id, code, and name so subscribers can + * react to org roster changes without an additional API call. + */ + "employee.created": { + employee_id: string; + employee_code: string; + name: string; + /** ISO timestamp — copied from employees.created_at */ + created_at: string; + }; }; export type EventName = keyof EventDataMap; diff --git a/apps/api/src/workers/startup.ts b/apps/api/src/workers/startup.ts index c93f902..ffe90f4 100644 --- a/apps/api/src/workers/startup.ts +++ b/apps/api/src/workers/startup.ts @@ -46,12 +46,18 @@ export function areWorkersStarted(): boolean { * This keeps worker lifecycle out of module-import side effects. */ export async function startWorkers(app: FastifyInstance): Promise { - const [{ startDistanceWorker }, { startAnalyticsWorker }] = await Promise.all([ + const [ + { startDistanceWorker }, + { startAnalyticsWorker }, + { startWebhookWorker }, + ] = await Promise.all([ import("./distance.worker.js"), import("./analytics.worker.js"), + import("./webhook.worker.js"), ]); startDistanceWorker(app); startAnalyticsWorker(app); + startWebhookWorker(app); workersStarted = true; } diff --git a/apps/api/src/workers/webhook-event.service.ts b/apps/api/src/workers/webhook-event.service.ts new file mode 100644 index 0000000..384f2b5 --- /dev/null +++ b/apps/api/src/workers/webhook-event.service.ts @@ -0,0 +1,232 @@ +/** + * webhook-event.service.ts — Domain event → webhook delivery fan-out. + * + * This service has two responsibilities: + * + * 1. PERSISTENCE: When a domain event is emitted and WORKERS_ENABLED=true, + * insert a row into webhook_events so there is a permanent, queryable + * audit trail of every event regardless of webhook registration status. + * + * 2. FAN-OUT: Find all active webhooks for the event's org that subscribe to + * the event type, create a webhook_delivery row for each, and enqueue a + * BullMQ delivery job. + * + * Wiring happens in webhook.worker.ts (subscribeToEventBus()), which calls + * registerWebhookListener() once per event name at startup. + * + * Safety contracts: + * - This module MUST NOT throw into the event bus — all errors are caught + * and logged so a database or Redis failure never crashes the emitting + * request lifecycle. + * - No synchronous HTTP calls — delivery is always async via BullMQ. + * - worker-only: the listener is only registered when shouldStartWorkers() + * returns true (WORKERS_ENABLED=true AND not in test env). + */ + +import type { FastifyBaseLogger } from "fastify"; +import { supabaseServiceClient as supabase } from "../config/supabase.js"; +import { eventBus, type EventName, type EventEnvelope } from "../utils/event-bus.js"; +import { + enqueueWebhookDelivery, + WEBHOOK_RETRY_DELAYS_MS, + type WebhookDeliveryJobData, +} from "./webhook.queue.js"; + +// ─── DB row shapes (subset) ─────────────────────────────────────────────────── + +interface WebhookRow { + id: string; + url: string; + secret: string; +} + +// ─── Core fan-out logic ─────────────────────────────────────────────────────── + +/** + * Process a domain event envelope: + * 1. Persist it in webhook_events. + * 2. Fan out to matching active webhooks. + */ +export async function processEventForWebhooks( + envelope: EventEnvelope, + log: FastifyBaseLogger, +): Promise { + const { id: eventId, type, organization_id: orgId } = envelope; + + // ── 1. Persist the event ────────────────────────────────────────────────── + const { data: eventRow, error: insertError } = await supabase + .from("webhook_events") + .insert({ + id: eventId, + organization_id: orgId, + event_type: type, + payload: envelope as unknown as Record, + }) + .select("id") + .single(); + + if (insertError) { + log.error( + { eventId, eventType: type, orgId, error: insertError.message }, + "webhook-event.service: failed to persist webhook_event", + ); + // Do not abort — we still attempt fan-out even if persistence failed, + // so active webhooks receive the event. The missing audit row is an + // observability gap, not a correctness failure. + } + + const persistedEventId = eventRow?.id ?? eventId; + + // ── 2. Find matching active webhooks ────────────────────────────────────── + const { data: webhooks, error: fetchError } = await supabase + .from("webhooks") + .select("id, url, secret") + .eq("organization_id", orgId) + .eq("is_active", true) + .contains("events", [type]); + + if (fetchError) { + log.error( + { eventId, eventType: type, orgId, error: fetchError.message }, + "webhook-event.service: failed to fetch matching webhooks", + ); + return; + } + + if (!webhooks || webhooks.length === 0) { + log.debug( + { eventId, eventType: type, orgId }, + "webhook-event.service: no matching webhooks, skipping fan-out", + ); + return; + } + + log.info( + { eventId, eventType: type, orgId, webhookCount: webhooks.length }, + "webhook-event.service: fanning out to webhooks", + ); + + // ── 3. Create delivery rows and enqueue jobs ────────────────────────────── + await Promise.allSettled( + (webhooks as WebhookRow[]).map((webhook) => + createAndEnqueueDelivery(persistedEventId, webhook, orgId, type, log), + ), + ); +} + +async function createAndEnqueueDelivery( + eventId: string, + webhook: WebhookRow, + orgId: string, + eventType: string, + log: FastifyBaseLogger, +): Promise { + // Insert the delivery row — ON CONFLICT DO NOTHING ensures idempotency + // if the fan-out function is called more than once for the same event + // (e.g. due to a process restart mid-flight). + const { data: delivery, error: deliveryInsertError } = await supabase + .from("webhook_deliveries") + .insert({ + webhook_id: webhook.id, + event_id: eventId, + organization_id: orgId, + status: "pending", + attempt_count: 0, + }) + .select("id") + .single(); + + if (deliveryInsertError) { + // Unique constraint violation = already created. Log and skip. + if (deliveryInsertError.code === "23505") { + log.debug( + { eventId, webhookId: webhook.id }, + "webhook-event.service: delivery already exists, skipping duplicate", + ); + return; + } + log.error( + { eventId, webhookId: webhook.id, error: deliveryInsertError.message }, + "webhook-event.service: failed to create delivery row", + ); + return; + } + + if (!delivery) { + log.error( + { eventId, webhookId: webhook.id }, + "webhook-event.service: delivery insert returned no data", + ); + return; + } + + const jobData: WebhookDeliveryJobData = { + delivery_id: delivery.id, + webhook_id: webhook.id, + event_id: eventId, + url: webhook.url, + secret: webhook.secret, + attempt_number: 1, + }; + + try { + await enqueueWebhookDelivery(jobData, WEBHOOK_RETRY_DELAYS_MS[0]); + log.info( + { deliveryId: delivery.id, webhookId: webhook.id, eventType, orgId }, + "webhook-event.service: enqueued delivery job", + ); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + log.error( + { deliveryId: delivery.id, webhookId: webhook.id, error: message }, + "webhook-event.service: failed to enqueue delivery job", + ); + // Mark delivery as failed so the admin UI shows the issue. + await supabase + .from("webhook_deliveries") + .update({ + status: "failed", + response_body: `Enqueue error: ${message}`, + last_attempt_at: new Date().toISOString(), + }) + .eq("id", delivery.id) + .then(() => undefined); // fire-and-forget + } +} + +// ─── Event bus subscription ─────────────────────────────────────────────────── + +const EVENT_NAMES: ReadonlyArray = [ + "employee.checked_in", + "employee.checked_out", + "expense.created", + "expense.approved", + "expense.rejected", + "employee.created", +]; + +/** + * Subscribe to all known domain events and route them through + * processEventForWebhooks(). + * + * Called once at worker startup (webhook.worker.ts → startWebhookWorker()). + * Must NOT be called in test environments or when WORKERS_ENABLED=false. + */ +export function subscribeToEventBus(log: FastifyBaseLogger): void { + for (const eventName of EVENT_NAMES) { + eventBus.on(eventName, (envelope) => { + processEventForWebhooks(envelope, log).catch((err: unknown) => { + const message = err instanceof Error ? err.message : String(err); + log.error( + { event: eventName, error: message }, + "webhook-event.service: unhandled error in event fan-out", + ); + }); + }); + } + + log.info( + { events: EVENT_NAMES }, + "webhook-event.service: subscribed to domain event bus", + ); +} diff --git a/apps/api/src/workers/webhook.queue.ts b/apps/api/src/workers/webhook.queue.ts new file mode 100644 index 0000000..6cb80a5 --- /dev/null +++ b/apps/api/src/workers/webhook.queue.ts @@ -0,0 +1,159 @@ +/** + * webhook.queue.ts — BullMQ queue for async webhook delivery. + * + * Design follows the existing distance.queue.ts lazy-singleton pattern: + * - Queue object is created on first use, not at import time. + * - This prevents Redis connections from being opened in CI / test + * environments where Redis is not available. + * + * Job payload contains everything the worker needs to sign and deliver + * the request without additional DB round-trips in the hot path. + */ + +import { Queue } from "bullmq"; +import { getRedisConnectionOptions } from "../config/redis.js"; +import { env } from "../config/env.js"; +import { QueueOverloadedError } from "../utils/errors.js"; + +// ─── Job Payload ────────────────────────────────────────────────────────────── + +export interface WebhookDeliveryJobData { + /** Delivery row id in webhook_deliveries — used for idempotent updates */ + delivery_id: string; + /** Webhook registration id */ + webhook_id: string; + /** Event row id in webhook_events */ + event_id: string; + /** Target endpoint URL (HTTPS, already validated at registration) */ + url: string; + /** + * Per-webhook signing secret. + * + * NOTE: This travels through Redis. In a high-security environment the + * secret should instead be fetched from the DB inside the worker on each + * attempt. We accept the Redis-in-transit risk here because the Redis + * connection is TLS-encrypted in production (rediss://) and the secret is + * only used for HMAC signing — it does NOT grant DB access. + */ + secret: string; + /** Current delivery attempt number (1-based) */ + attempt_number: number; +} + +// ─── Queue name constant ────────────────────────────────────────────────────── + +export const WEBHOOK_QUEUE_NAME = "webhook-delivery" as const; + +// ─── Retry back-off delays (milliseconds) ──────────────────────────────────── +// +// Attempt 1 → immediate (delay = 0, handled as first-try in BullMQ) +// Attempt 2 → 30 s +// Attempt 3 → 2 min +// Attempt 4 → 10 min +// Attempt 5 → 1 h +// +// This matches the spec. BullMQ's built-in exponential backoff is not used +// here because the spec defines specific absolute delays (not a geometric +// series), so we supply a custom `delay` per job via the retry handler. + +export const WEBHOOK_RETRY_DELAYS_MS: ReadonlyArray = [ + 0, // attempt 1 — immediate + 30_000, // attempt 2 — 30 s + 120_000, // attempt 3 — 2 min + 600_000, // attempt 4 — 10 min + 3_600_000, // attempt 5 — 1 h +]; + +export const WEBHOOK_MAX_ATTEMPTS = WEBHOOK_RETRY_DELAYS_MS.length; + +/** + * Calculate retry delay with ±10% jitter to prevent thundering herd. + * + * Without jitter, 100 failed deliveries all retry at the same time, + * creating a synchronized spike that can cascade. Jitter spreads retries + * across a window, stabilizing the system. + * + * Example: baseDelay=30s → 27-33s range (±10% jitter) + * + * @param attemptNumber 1-based attempt number (1=first retry, 2=second, etc.) + * @returns delay in milliseconds for this retry + */ +export function calculateRetryDelay(attemptNumber: number): number { + const baseDelay = WEBHOOK_RETRY_DELAYS_MS[attemptNumber - 1]; + // ±10% jitter: add/subtract up to 10% of base delay + const jitterRange = baseDelay * 0.1; + const jitterMs = jitterRange * (Math.random() * 2 - 1); // [-jitterRange, +jitterRange] + return Math.round(baseDelay + jitterMs); +} + +// ─── Lazy Queue Singleton ───────────────────────────────────────────────────── + +let _webhookQueue: Queue | undefined; + +function getWebhookQueue(): Queue { + if (_webhookQueue) return _webhookQueue; + + _webhookQueue = new Queue(WEBHOOK_QUEUE_NAME, { + connection: getRedisConnectionOptions(), + defaultJobOptions: { + // Each job is attempted once by BullMQ — retry scheduling is managed + // manually by the worker so we can record attempt state in the DB and + // implement exact delays without relying on BullMQ's built-in backoff. + attempts: 1, + removeOnComplete: true, + removeOnFail: { count: 500 }, + }, + }); + + return _webhookQueue; +} + +// ─── Public API ─────────────────────────────────────────────────────────────── + +/** + * Enqueue a webhook delivery job. + * + * The job ID is `delivery:{delivery_id}:{attempt_number}` to ensure + * each attempt is a distinct job while allowing the delivery_id to + * correlate all attempts for a single delivery row. + * + * @throws {QueueOverloadedError} when the queue depth exceeds MAX_QUEUE_DEPTH. + */ +export async function enqueueWebhookDelivery( + data: WebhookDeliveryJobData, + delayMs = 0, +): Promise { + const queue = getWebhookQueue(); + + const [waiting, delayed] = await Promise.all([ + queue.getWaitingCount(), + queue.getDelayedCount(), + ]); + + const depth = waiting + delayed; + if (depth >= env.MAX_QUEUE_DEPTH) { + throw new QueueOverloadedError(WEBHOOK_QUEUE_NAME, depth, env.MAX_QUEUE_DEPTH); + } + + await queue.add( + "deliver", + data, + { + jobId: `delivery:${data.delivery_id}:${data.attempt_number}`, + delay: delayMs, + }, + ); +} + +/** + * Return the combined waiting + delayed count. + * Exposed for metrics and admin health checks. + */ +export async function getWebhookQueueDepth(): Promise { + const queue = getWebhookQueue(); + const [waiting, delayed] = await Promise.all([ + queue.getWaitingCount(), + queue.getDelayedCount(), + ]); + return waiting + delayed; +} diff --git a/apps/api/src/workers/webhook.worker.ts b/apps/api/src/workers/webhook.worker.ts new file mode 100644 index 0000000..e11dd55 --- /dev/null +++ b/apps/api/src/workers/webhook.worker.ts @@ -0,0 +1,344 @@ +/** + * webhook.worker.ts — BullMQ worker for async webhook delivery. + * + * Lifecycle per job: + * 1. Fetch the event payload from webhook_events. + * 2. Serialize the envelope to a stable JSON string. + * 3. Generate HMAC-SHA256 signature over the raw body. + * 4. POST to the webhook URL with a 5 s timeout. + * 5. On success → mark delivery as `success`. + * 6. On failure → schedule a retry (exponential delays) up to MAX_ATTEMPTS. + * After max attempts → mark delivery as `failed`. + * + * Security: + * - DNS rebinding defence: The hostname is resolved immediately before the + * HTTP request and checked against private IP ranges. + * - Request timeout enforced at 5 s. + * - Signature is HMAC-SHA256(secret, rawBody), header: X-FieldTrack-Signature. + * + * Worker gate: `startWebhookWorker()` is only called when + * `shouldStartWorkers()` returns true (WORKERS_ENABLED=true AND not test env). + */ + +import { Worker } from "bullmq"; +import type { Job } from "bullmq"; +import type { FastifyInstance } from "fastify"; +import dns from "node:dns/promises"; +import { redisConnectionOptions } from "../config/redis.js"; +import { supabaseServiceClient as supabase } from "../config/supabase.js"; +import { generateSignature } from "../utils/hmac.js"; +import { subscribeToEventBus } from "./webhook-event.service.js"; +import { + WEBHOOK_QUEUE_NAME, + WEBHOOK_MAX_ATTEMPTS, + enqueueWebhookDelivery, + calculateRetryDelay, + type WebhookDeliveryJobData, +} from "./webhook.queue.js"; + +// ─── Private IP ranges (DNS rebinding defence) ─────────────────────────────── + +const PRIVATE_IP_PATTERNS: ReadonlyArray = [ + /^127\./, + /^10\./, + /^172\.(1[6-9]|2\d|3[01])\./, + /^192\.168\./, + /^169\.254\./, + /^::1$/, + /^fc[0-9a-f]{2}:/i, + /^fe80:/i, + /^0\.0\.0\.0$/, +]; + +function isPrivateAddress(ip: string): boolean { + const lower = ip.toLowerCase(); + return PRIVATE_IP_PATTERNS.some((re) => re.test(lower)); +} + +// ─── HTTP delivery ──────────────────────────────────────────────────────────── + +const DELIVERY_TIMEOUT_MS = 5_000; + +/** + * Perform one HTTP delivery attempt. + * + * Returns `{ status: number; body: string }` on any completed response + * (including 4xx/5xx — those are caller's problem to interpret). + * + * Throws on network error, timeout, or SSRF block. + */ +async function deliverWebhook( + url: string, + rawBody: string, + signature: string, +): Promise<{ status: number; body: string }> { + // ── DNS rebinding defence ────────────────────────────────────────────────── + const parsed = new URL(url); + let resolvedAddress: string; + try { + const { address } = await dns.lookup(parsed.hostname, { family: 4 }); + resolvedAddress = address; + } catch { + // IPv6 fallback + try { + const { address } = await dns.lookup(parsed.hostname, { family: 6 }); + resolvedAddress = address; + } catch { + throw new Error(`DNS lookup failed for hostname: ${parsed.hostname}`); + } + } + + if (isPrivateAddress(resolvedAddress)) { + throw new Error( + `SSRF blocked: ${parsed.hostname} resolved to private address ${resolvedAddress}`, + ); + } + + // ── HTTP POST with timeout ───────────────────────────────────────────────── + const controller = new AbortController(); + const timeoutHandle = setTimeout( + () => controller.abort(), + DELIVERY_TIMEOUT_MS, + ); + + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-FieldTrack-Signature": signature, + "X-FieldTrack-Event": "webhook-delivery", + "User-Agent": "FieldTrack-Webhooks/1.0", + }, + body: rawBody, + signal: controller.signal, + }); + + const body = await response.text().catch(() => ""); + return { status: response.status, body: body.slice(0, 4_096) }; + } finally { + clearTimeout(timeoutHandle); + } +} + +// ─── Delivery result handler ────────────────────────────────────────────────── + +/** + * Mark a delivery row as succeeded in the database. + */ +async function markSuccess( + deliveryId: string, + responseStatus: number, + responseBody: string, +): Promise { + await supabase + .from("webhook_deliveries") + .update({ + status: "success", + response_status: responseStatus, + response_body: responseBody, + last_attempt_at: new Date().toISOString(), + }) + .eq("id", deliveryId); +} + +/** + * Increment attempt count, record response, and schedule the next retry. + * If max attempts reached, marks status as `failed`. + */ +async function scheduleRetryOrFail( + deliveryId: string, + webhook_id: string, + event_id: string, + url: string, + secret: string, + attemptNumber: number, + responseStatus: number | null, + responseBody: string, + app: FastifyInstance, +): Promise { + const nextAttempt = attemptNumber + 1; + const canRetry = nextAttempt <= WEBHOOK_MAX_ATTEMPTS; + const delayMs = canRetry ? calculateRetryDelay(nextAttempt) : 0; + const nextRetryAt = canRetry + ? new Date(Date.now() + delayMs).toISOString() + : null; + + await supabase + .from("webhook_deliveries") + .update({ + status: canRetry ? "pending" : "failed", + attempt_count: attemptNumber, + response_status: responseStatus, + response_body: responseBody.slice(0, 4_096), + last_attempt_at: new Date().toISOString(), + next_retry_at: nextRetryAt, + }) + .eq("id", deliveryId); + + if (canRetry) { + try { + await enqueueWebhookDelivery( + { + delivery_id: deliveryId, + webhook_id, + event_id, + url, + secret, + attempt_number: nextAttempt, + }, + delayMs, + ); + app.log.info( + { deliveryId, webhookId: webhook_id, attemptNumber, nextAttempt, delayMs }, + "webhook.worker: scheduled retry with jitter", + ); + } catch (enqueueErr: unknown) { + const msg = enqueueErr instanceof Error ? enqueueErr.message : String(enqueueErr); + app.log.error( + { deliveryId, webhookId: webhook_id, error: msg }, + "webhook.worker: failed to enqueue retry — delivery marked failed", + ); + // Cannot retry — mark as failed to avoid phantom pending records. + await supabase + .from("webhook_deliveries") + .update({ status: "failed" }) + .eq("id", deliveryId); + } + } else { + app.log.warn( + { deliveryId, webhookId: webhook_id, attemptNumber }, + "webhook.worker: max attempts reached, delivery permanently failed", + ); + } +} + +// ─── Worker ─────────────────────────────────────────────────────────────────── + +let workerStarted = false; + +export function startWebhookWorker(app: FastifyInstance): Worker | null { + if (workerStarted) { + app.log.warn("startWebhookWorker called more than once — ignoring duplicate start"); + return null; + } + + workerStarted = true; + + // Subscribe to the in-process event bus so domain events are persisted and + // fanned out to registered webhooks as soon as they are emitted. + subscribeToEventBus(app.log); + + const worker = new Worker( + WEBHOOK_QUEUE_NAME, + async (job: Job): Promise => { + const { delivery_id, webhook_id, event_id, url, secret, attempt_number } = + job.data; + + app.log.info( + { deliveryId: delivery_id, webhookId: webhook_id, attemptNumber: attempt_number }, + "webhook.worker: processing delivery job", + ); + + // ── Fetch event payload ────────────────────────────────────────────── + const { data: eventRow, error: fetchError } = await supabase + .from("webhook_events") + .select("payload") + .eq("id", event_id) + .single(); + + if (fetchError || !eventRow) { + app.log.error( + { eventId: event_id, error: fetchError?.message }, + "webhook.worker: cannot fetch event payload — marking delivery failed", + ); + await supabase + .from("webhook_deliveries") + .update({ + status: "failed", + response_body: "Event payload not found", + last_attempt_at: new Date().toISOString(), + }) + .eq("id", delivery_id); + return; + } + + // ── Build and sign the request body ─────────────────────────────────── + const rawBody = JSON.stringify(eventRow.payload); + const signature = generateSignature(secret, rawBody); + + // ── Deliver ─────────────────────────────────────────────────────────── + try { + const { status, body } = await deliverWebhook(url, rawBody, signature); + const succeeded = status >= 200 && status < 300; + + if (succeeded) { + await markSuccess(delivery_id, status, body); + app.log.info( + { deliveryId: delivery_id, webhookId: webhook_id, responseStatus: status }, + "webhook.worker: delivery succeeded", + ); + } else { + app.log.warn( + { + deliveryId: delivery_id, + webhookId: webhook_id, + responseStatus: status, + attemptNumber: attempt_number, + }, + "webhook.worker: delivery got non-2xx response, scheduling retry", + ); + await scheduleRetryOrFail( + delivery_id, + webhook_id, + event_id, + url, + secret, + attempt_number, + status, + body, + app, + ); + } + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + app.log.error( + { + deliveryId: delivery_id, + webhookId: webhook_id, + attemptNumber: attempt_number, + error: message, + }, + "webhook.worker: delivery attempt threw error, scheduling retry", + ); + await scheduleRetryOrFail( + delivery_id, + webhook_id, + event_id, + url, + secret, + attempt_number, + null, + message, + app, + ); + } + }, + { + connection: redisConnectionOptions, + concurrency: 5, + lockDuration: 30_000, + }, + ); + + worker.on("failed", (job, err) => { + const jobId = job?.id ?? "(unknown)"; + app.log.error( + { jobId, error: err.message }, + "webhook.worker: BullMQ job permanently failed", + ); + }); + + app.log.info("webhook.worker: started"); + return worker; +} diff --git a/apps/api/tests/integration/admin/webhooks.integration.test.ts b/apps/api/tests/integration/admin/webhooks.integration.test.ts new file mode 100644 index 0000000..70a40a0 --- /dev/null +++ b/apps/api/tests/integration/admin/webhooks.integration.test.ts @@ -0,0 +1,440 @@ +/** + * webhooks.integration.test.ts — Integration tests for the webhooks admin API. + * + * Tests cover: + * - POST /admin/webhooks — create (auth, validation, SSRF) + * - GET /admin/webhooks — list + * - PATCH /admin/webhooks/:id — update + * - DELETE /admin/webhooks/:id — delete + * - GET /admin/webhook-deliveries — list deliveries + * - POST /admin/webhook-deliveries/:id/retry — manual retry + */ + +import { describe, it, expect, vi, beforeAll, afterAll, beforeEach } from "vitest"; +import type { FastifyInstance } from "fastify"; + +// ─── Module mocks (hoisted) ─────────────────────────────────────────────────── + +vi.mock("../../../src/config/redis.js", () => ({ + redisClient: { on: vi.fn(), quit: vi.fn(), disconnect: vi.fn() }, + getRedisConnectionOptions: vi.fn().mockReturnValue({}), + redisConnectionOptions: {}, +})); + +vi.mock("../../../src/workers/distance.queue.js", () => ({ + enqueueDistanceJob: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../../../src/workers/analytics.queue.js", () => ({ + enqueueAnalyticsJob: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../../../src/workers/webhook.queue.js", () => ({ + enqueueWebhookDelivery: vi.fn().mockResolvedValue(undefined), + WEBHOOK_QUEUE_NAME: "webhook-delivery", + WEBHOOK_RETRY_DELAYS_MS: [0, 30_000, 120_000, 600_000, 3_600_000], + WEBHOOK_MAX_ATTEMPTS: 5, + getWebhookQueueDepth: vi.fn().mockResolvedValue(0), +})); + +vi.mock("../../../src/modules/webhooks/webhooks.repository.js", () => ({ + webhooksRepository: { + create: vi.fn(), + list: vi.fn(), + findById: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + listDeliveries: vi.fn(), + findDeliveryById: vi.fn(), + findWebhookSecretById: vi.fn(), + resetDeliveryForRetry: vi.fn(), + }, +})); + +vi.mock("../../../src/config/supabase.js", () => ({ + supabaseServiceClient: { + from: vi.fn().mockReturnValue({ + select: vi.fn().mockReturnThis(), + insert: vi.fn().mockReturnThis(), + update: vi.fn().mockReturnThis(), + delete: vi.fn().mockReturnThis(), + eq: vi.fn().mockReturnThis(), + single: vi.fn().mockResolvedValue({ data: null, error: null }), + }), + }, +})); + +vi.mock("../../../src/auth/jwtVerifier.js", () => ({ + verifySupabaseToken: vi.fn().mockImplementation(async (token: string) => { + const parts = token.split("."); + if (parts.length !== 3) throw new Error("Invalid JWT structure"); + return JSON.parse(Buffer.from(parts[1], "base64url").toString("utf8")); + }), +})); + +import { + buildTestApp, + signAdminToken, + signEmployeeToken, + TEST_ORG_ID, +} from "../../setup/test-server.js"; +import { webhooksRepository } from "../../../src/modules/webhooks/webhooks.repository.js"; + +// ─── Shared fixtures ────────────────────────────────────────────────────────── + +const WEBHOOK_ID = "dddddddd-dddd-4ddd-8ddd-dddddddddddd"; +const DELIVERY_ID = "eeeeeeee-eeee-4eee-8eee-eeeeeeeeeeee"; +const EVENT_ID = "ffffffff-ffff-4fff-8fff-ffffffffffff"; +const now = new Date().toISOString(); + +const webhookRow = { + id: WEBHOOK_ID, + organization_id: TEST_ORG_ID, + url: "https://example.com/hook", + is_active: true, + events: ["expense.created", "expense.approved"], + created_at: now, + updated_at: now, +}; + +const deliveryRow = { + id: DELIVERY_ID, + webhook_id: WEBHOOK_ID, + event_id: EVENT_ID, + organization_id: TEST_ORG_ID, + status: "failed" as const, + attempt_count: 3, + response_status: 500, + response_body: "Internal Server Error", + last_attempt_at: now, + next_retry_at: null, + created_at: now, +}; + +// ─── Test suite ─────────────────────────────────────────────────────────────── + +describe("Webhooks Admin API", () => { + let app: FastifyInstance; + let adminToken: string; + let employeeToken: string; + + beforeAll(async () => { + app = await buildTestApp(); + adminToken = signAdminToken(app); + employeeToken = signEmployeeToken(app); + }); + + afterAll(async () => { + await app.close(); + }); + + beforeEach(() => { + vi.clearAllMocks(); + }); + + // ─── GET /admin/webhooks ───────────────────────────────────────────────────── + + describe("GET /admin/webhooks", () => { + it("returns 200 with list for ADMIN", async () => { + vi.mocked(webhooksRepository.list).mockResolvedValueOnce([webhookRow]); + + const res = await app.inject({ + method: "GET", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(200); + const body = res.json<{ success: boolean; data: typeof webhookRow[] }>(); + expect(body.success).toBe(true); + expect(body.data).toHaveLength(1); + expect(body.data[0].id).toBe(WEBHOOK_ID); + // Secret MUST NOT appear in the response + expect(JSON.stringify(body)).not.toContain("secret"); + }); + + it("returns 403 for EMPLOYEE role", async () => { + const res = await app.inject({ + method: "GET", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${employeeToken}` }, + }); + expect(res.statusCode).toBe(403); + }); + + it("returns 401 with no token", async () => { + const res = await app.inject({ method: "GET", url: "/admin/webhooks" }); + expect(res.statusCode).toBe(401); + }); + }); + + // ─── POST /admin/webhooks ──────────────────────────────────────────────────── + + describe("POST /admin/webhooks", () => { + it("creates a webhook and returns 201", async () => { + vi.mocked(webhooksRepository.create).mockResolvedValueOnce(webhookRow); + + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://example.com/hook", + events: ["expense.created"], + secret: "super-secret-value-at-least-16-chars", + }), + }); + + expect(res.statusCode).toBe(201); + const body = res.json<{ success: boolean; data: typeof webhookRow }>(); + expect(body.success).toBe(true); + expect(body.data.url).toBe("https://example.com/hook"); + }); + + it("rejects HTTP (non-HTTPS) URLs with 400", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "http://example.com/hook", + events: ["expense.created"], + secret: "super-secret-value-at-least-16-chars", + }), + }); + expect(res.statusCode).toBe(400); + }); + + it("rejects private/loopback URLs with 400", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://192.168.1.1/hook", + events: ["expense.created"], + secret: "super-secret-value-at-least-16-chars", + }), + }); + expect(res.statusCode).toBe(400); + }); + + it("rejects empty events array", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://example.com/hook", + events: [], + secret: "super-secret-value-at-least-16-chars", + }), + }); + expect(res.statusCode).toBe(400); + }); + + it("rejects short secrets (<16 chars)", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://example.com/hook", + events: ["expense.created"], + secret: "short", + }), + }); + expect(res.statusCode).toBe(400); + }); + + it("rejects unknown event types", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://example.com/hook", + events: ["unknown.event"], + secret: "super-secret-value-at-least-16-chars", + }), + }); + expect(res.statusCode).toBe(400); + }); + + it("returns 403 for EMPLOYEE role", async () => { + const res = await app.inject({ + method: "POST", + url: "/admin/webhooks", + headers: { authorization: `Bearer ${employeeToken}`, "content-type": "application/json" }, + body: JSON.stringify({ + url: "https://example.com/hook", + events: ["expense.created"], + secret: "super-secret-value-at-least-16-chars", + }), + }); + expect(res.statusCode).toBe(403); + }); + }); + + // ─── PATCH /admin/webhooks/:id ─────────────────────────────────────────────── + + describe("PATCH /admin/webhooks/:id", () => { + it("updates a webhook", async () => { + vi.mocked(webhooksRepository.findById).mockResolvedValueOnce(webhookRow); + vi.mocked(webhooksRepository.update).mockResolvedValueOnce({ + ...webhookRow, + is_active: false, + }); + + const res = await app.inject({ + method: "PATCH", + url: `/admin/webhooks/${WEBHOOK_ID}`, + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ is_active: false }), + }); + + expect(res.statusCode).toBe(200); + const body = res.json<{ success: boolean; data: { is_active: boolean } }>(); + expect(body.data.is_active).toBe(false); + }); + + it("returns 404 when webhook not found", async () => { + vi.mocked(webhooksRepository.findById).mockResolvedValueOnce(null); + + const res = await app.inject({ + method: "PATCH", + url: `/admin/webhooks/${WEBHOOK_ID}`, + headers: { authorization: `Bearer ${adminToken}`, "content-type": "application/json" }, + body: JSON.stringify({ is_active: false }), + }); + + expect(res.statusCode).toBe(404); + }); + }); + + // ─── DELETE /admin/webhooks/:id ────────────────────────────────────────────── + + describe("DELETE /admin/webhooks/:id", () => { + it("deletes a webhook and returns 204", async () => { + vi.mocked(webhooksRepository.findById).mockResolvedValueOnce(webhookRow); + vi.mocked(webhooksRepository.delete).mockResolvedValueOnce(undefined); + + const res = await app.inject({ + method: "DELETE", + url: `/admin/webhooks/${WEBHOOK_ID}`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(204); + }); + + it("returns 404 when webhook not found", async () => { + vi.mocked(webhooksRepository.findById).mockResolvedValueOnce(null); + + const res = await app.inject({ + method: "DELETE", + url: `/admin/webhooks/${WEBHOOK_ID}`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(404); + }); + }); + + // ─── GET /admin/webhook-deliveries ────────────────────────────────────────── + + describe("GET /admin/webhook-deliveries", () => { + it("returns paginated delivery list", async () => { + vi.mocked(webhooksRepository.listDeliveries).mockResolvedValueOnce({ + data: [deliveryRow], + total: 1, + }); + + const res = await app.inject({ + method: "GET", + url: "/admin/webhook-deliveries", + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(200); + const body = res.json<{ success: boolean; data: typeof deliveryRow[]; pagination: unknown }>(); + expect(body.success).toBe(true); + expect(body.data).toHaveLength(1); + expect(body.data[0].status).toBe("failed"); + }); + + it("accepts webhook_id filter", async () => { + vi.mocked(webhooksRepository.listDeliveries).mockResolvedValueOnce({ + data: [], + total: 0, + }); + + const res = await app.inject({ + method: "GET", + url: `/admin/webhook-deliveries?webhook_id=${WEBHOOK_ID}`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(200); + }); + }); + + // ─── POST /admin/webhook-deliveries/:id/retry ─────────────────────────────── + + describe("POST /admin/webhook-deliveries/:id/retry", () => { + it("re-enqueues a failed delivery", async () => { + const { enqueueWebhookDelivery } = await import( + "../../../src/workers/webhook.queue.js" + ); + + const webhookWithSecret = { + id: WEBHOOK_ID, + url: "https://example.com/hook", + secret: "s3cr3t_value_long_enough", + }; + const updatedDelivery = { ...deliveryRow, status: "pending" }; + + vi.mocked(webhooksRepository.findDeliveryById).mockResolvedValueOnce(deliveryRow); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + vi.mocked((webhooksRepository as any).findWebhookSecretById).mockResolvedValueOnce(webhookWithSecret); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + vi.mocked((webhooksRepository as any).resetDeliveryForRetry).mockResolvedValueOnce(updatedDelivery); + + const res = await app.inject({ + method: "POST", + url: `/admin/webhook-deliveries/${DELIVERY_ID}/retry`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(200); + expect(vi.mocked(enqueueWebhookDelivery)).toHaveBeenCalledOnce(); + }); + + it("returns 404 when delivery not found", async () => { + vi.mocked(webhooksRepository.findDeliveryById).mockResolvedValueOnce(null); + + const res = await app.inject({ + method: "POST", + url: `/admin/webhook-deliveries/${DELIVERY_ID}/retry`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(404); + }); + + it("returns 400 when delivery is already pending", async () => { + vi.mocked(webhooksRepository.findDeliveryById).mockResolvedValueOnce({ + ...deliveryRow, + status: "pending", + }); + + const res = await app.inject({ + method: "POST", + url: `/admin/webhook-deliveries/${DELIVERY_ID}/retry`, + headers: { authorization: `Bearer ${adminToken}` }, + }); + + expect(res.statusCode).toBe(400); + }); + }); +}); diff --git a/apps/api/tests/unit/utils/webhook.unit.test.ts b/apps/api/tests/unit/utils/webhook.unit.test.ts new file mode 100644 index 0000000..54d8b02 --- /dev/null +++ b/apps/api/tests/unit/utils/webhook.unit.test.ts @@ -0,0 +1,154 @@ +/** + * webhook.unit.test.ts — Unit tests for Phase 25 webhook system. + * + * Covers: + * - HMAC signature generation and verification (utils/hmac.ts) + * - URL validation (utils/url-validator.ts) + * - Retry delay schedule (workers/webhook.queue.ts) + * - WEBHOOK_EVENT_TYPES schema coverage + * + * Note: processEventForWebhooks fan-out logic is covered by integration tests + * (tests/integration/admin/webhooks.integration.test.ts) where the full module + * graph can be wired up without vi.doMock / vi.resetModules complications. + */ + +import { describe, it, expect } from "vitest"; + +// ─── hmac.ts ───────────────────────────────────────────────────────────────── + +describe("generateSignature", () => { + it("should produce sha256= prefixed hex string", async () => { + const { generateSignature } = await import("../../../src/utils/hmac.js"); + const sig = generateSignature("secret", "hello"); + expect(sig).toMatch(/^sha256=[0-9a-f]{64}$/); + }); + + it("should produce consistent output for same inputs", async () => { + const { generateSignature } = await import("../../../src/utils/hmac.js"); + expect(generateSignature("s", "p")).toBe(generateSignature("s", "p")); + }); + + it("should produce different signatures for different secrets", async () => { + const { generateSignature } = await import("../../../src/utils/hmac.js"); + expect(generateSignature("secret1", "payload")).not.toBe( + generateSignature("secret2", "payload"), + ); + }); + + it("should produce different signatures for different payloads", async () => { + const { generateSignature } = await import("../../../src/utils/hmac.js"); + expect(generateSignature("secret", "payload1")).not.toBe( + generateSignature("secret", "payload2"), + ); + }); +}); + +describe("verifySignature", () => { + it("should return true for a correctly generated signature", async () => { + const { generateSignature, verifySignature } = await import("../../../src/utils/hmac.js"); + const secret = "my-signing-secret"; + const payload = JSON.stringify({ id: "evt_123", type: "expense.created" }); + const sig = generateSignature(secret, payload); + expect(verifySignature(secret, payload, sig)).toBe(true); + }); + + it("should return false for a tampered payload", async () => { + const { generateSignature, verifySignature } = await import("../../../src/utils/hmac.js"); + const secret = "my-signing-secret"; + const sig = generateSignature(secret, '{"amount":100}'); + expect(verifySignature(secret, '{"amount":999}', sig)).toBe(false); + }); + + it("should return false for a different secret", async () => { + const { generateSignature, verifySignature } = await import("../../../src/utils/hmac.js"); + const payload = "test-payload"; + const sig = generateSignature("secret-a", payload); + expect(verifySignature("secret-b", payload, sig)).toBe(false); + }); + + it("should return false for length-differing strings without crashing", async () => { + const { verifySignature } = await import("../../../src/utils/hmac.js"); + expect(verifySignature("s", "p", "sha256=short")).toBe(false); + }); +}); + +// ─── url-validator.ts ───────────────────────────────────────────────────────── + +describe("validateWebhookUrl", () => { + it("should pass for a public HTTPS URL", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("https://example.com/webhook")).not.toThrow(); + }); + + it("should reject http:// URLs", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("http://example.com/webhook")).toThrow( + /Only HTTPS URLs are permitted/, + ); + }); + + it("should reject localhost URLs", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("https://localhost/hook")).toThrow( + /private.*not permitted/i, + ); + }); + + it("should reject 127.x.x.x IPs", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("https://127.0.0.1/hook")).toThrow(); + }); + + it("should reject AWS metadata endpoint", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("https://169.254.169.254/latest/meta-data")).toThrow(); + }); + + it("should reject private 192.168.x.x IPs", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("https://192.168.1.100/hook")).toThrow(); + }); + + it("should reject malformed URLs", async () => { + const { validateWebhookUrl } = await import("../../../src/utils/url-validator.js"); + expect(() => validateWebhookUrl("not-a-url")).toThrow(); + }); +}); + +// ─── webhook.queue.ts — retry schedule ─────────────────────────────────────── + +describe("WEBHOOK_RETRY_DELAYS_MS", () => { + it("should have 5 delay slots matching spec", async () => { + const { WEBHOOK_RETRY_DELAYS_MS, WEBHOOK_MAX_ATTEMPTS } = await import( + "../../../src/workers/webhook.queue.js" + ); + expect(WEBHOOK_MAX_ATTEMPTS).toBe(5); + expect(WEBHOOK_RETRY_DELAYS_MS).toHaveLength(5); + expect(WEBHOOK_RETRY_DELAYS_MS[0]).toBe(0); // attempt 1 immediate + expect(WEBHOOK_RETRY_DELAYS_MS[1]).toBe(30_000); // attempt 2 → 30 s + expect(WEBHOOK_RETRY_DELAYS_MS[2]).toBe(120_000); // attempt 3 → 2 min + expect(WEBHOOK_RETRY_DELAYS_MS[3]).toBe(600_000); // attempt 4 → 10 min + expect(WEBHOOK_RETRY_DELAYS_MS[4]).toBe(3_600_000); // attempt 5 → 1 h + }); +}); + +// ─── WEBHOOK_EVENT_TYPES coverage ───────────────────────────────────────────── + +describe("WEBHOOK_EVENT_TYPES", () => { + it("should include all required event types", async () => { + const { WEBHOOK_EVENT_TYPES } = await import( + "../../../src/modules/webhooks/webhooks.schema.js" + ); + const required = [ + "employee.checked_in", + "employee.checked_out", + "expense.created", + "expense.approved", + "expense.rejected", + "employee.created", + ] as const; + for (const evt of required) { + expect(WEBHOOK_EVENT_TYPES).toContain(evt); + } + }); +}); diff --git a/supabase/migrations/20260326000200_phase25_webhooks.sql b/supabase/migrations/20260326000200_phase25_webhooks.sql new file mode 100644 index 0000000..f2b0f41 --- /dev/null +++ b/supabase/migrations/20260326000200_phase25_webhooks.sql @@ -0,0 +1,116 @@ +-- ============================================================================ +-- Migration: Phase 25 — Webhooks & Integrations +-- +-- Creates three tables: +-- webhooks — per-org webhook registrations (url + secret + event filter) +-- webhook_events — immutable log of every domain event emitted +-- webhook_deliveries — delivery attempt tracking (status, retry, response) +-- +-- Design notes: +-- • organization_id on every table — enforces tenant isolation at the DB layer. +-- • webhooks.secret is stored as plaintext (ideally encrypted-at-rest by Supabase +-- vault in a future phase); never returned in API responses. +-- • webhook_deliveries.status uses a check constraint for safe state transitions. +-- • Indexes target the query patterns used by the delivery worker and admin UI. +-- ============================================================================ + +BEGIN; + +-- ─── webhooks ───────────────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS public.webhooks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES public.organizations(id) ON DELETE CASCADE, + url TEXT NOT NULL, + secret TEXT NOT NULL, + is_active BOOLEAN NOT NULL DEFAULT TRUE, + events TEXT[] NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Enforce HTTPS-only URLs at the database layer (defence-in-depth behind the +-- application-layer validateWebhookUrl() check). +ALTER TABLE public.webhooks + ADD CONSTRAINT webhooks_url_https + CHECK (url LIKE 'https://%'); + +-- Org lookup — used by emitEvent fan-out: WHERE org_id = $1 AND is_active = true +CREATE INDEX IF NOT EXISTS idx_webhooks_org_active + ON public.webhooks (organization_id, is_active); + +-- Trigger: keep updated_at current on every row update. +CREATE OR REPLACE FUNCTION public.set_updated_at() +RETURNS TRIGGER LANGUAGE plpgsql AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_trigger + WHERE tgname = 'trg_webhooks_updated_at' + AND tgrelid = 'public.webhooks'::regclass + ) THEN + CREATE TRIGGER trg_webhooks_updated_at + BEFORE UPDATE ON public.webhooks + FOR EACH ROW EXECUTE FUNCTION public.set_updated_at(); + END IF; +END +$$; + +-- ─── webhook_events ─────────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS public.webhook_events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + organization_id UUID NOT NULL REFERENCES public.organizations(id) ON DELETE CASCADE, + event_type TEXT NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Delivery worker looks up the event payload by id. +CREATE INDEX IF NOT EXISTS idx_webhook_events_org_type + ON public.webhook_events (organization_id, event_type, created_at DESC); + +-- ─── webhook_deliveries ─────────────────────────────────────────────────────── + +CREATE TABLE IF NOT EXISTS public.webhook_deliveries ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + webhook_id UUID NOT NULL REFERENCES public.webhooks(id) ON DELETE CASCADE, + event_id UUID NOT NULL REFERENCES public.webhook_events(id) ON DELETE CASCADE, + organization_id UUID NOT NULL REFERENCES public.organizations(id) ON DELETE CASCADE, + status TEXT NOT NULL DEFAULT 'pending', + attempt_count INTEGER NOT NULL DEFAULT 0, + response_status INTEGER, + response_body TEXT, + last_attempt_at TIMESTAMPTZ, + next_retry_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT webhook_deliveries_status_check + CHECK (status IN ('pending', 'success', 'failed')), + + -- At-most-once delivery per (event, webhook) pair. + -- The delivery worker uses INSERT … ON CONFLICT DO NOTHING for safe replay. + CONSTRAINT webhook_deliveries_event_webhook_unique + UNIQUE (event_id, webhook_id) +); + +-- Admin UI query: WHERE org_id = $1 ORDER BY created_at DESC +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_org_created + ON public.webhook_deliveries (organization_id, created_at DESC); + +-- Retry worker query: WHERE status = 'pending' AND next_retry_at <= NOW() +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_pending_retry + ON public.webhook_deliveries (status, next_retry_at) + WHERE status = 'pending'; + +-- Lookup by webhook for admin drilldown +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_webhook_id + ON public.webhook_deliveries (webhook_id, created_at DESC); + +COMMIT; From ca8fa0765180c248222a60243ca8ba46d8df9efd Mon Sep 17 00:00:00 2001 From: rajashish147 Date: Fri, 27 Mar 2026 14:40:14 +0530 Subject: [PATCH 2/2] refactor: clean up webhook service imports and adjust database type definition --- apps/api/src/modules/webhooks/webhooks.service.ts | 5 +---- apps/api/src/types/database.ts | 3 ++- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/apps/api/src/modules/webhooks/webhooks.service.ts b/apps/api/src/modules/webhooks/webhooks.service.ts index 19434d5..da7ad3b 100644 --- a/apps/api/src/modules/webhooks/webhooks.service.ts +++ b/apps/api/src/modules/webhooks/webhooks.service.ts @@ -14,10 +14,7 @@ import type { FastifyRequest } from "fastify"; import { webhooksRepository } from "./webhooks.repository.js"; import { validateWebhookUrl, InvalidWebhookUrlError } from "../../utils/url-validator.js"; import { BadRequestError, NotFoundError } from "../../utils/errors.js"; -import { - enqueueWebhookDelivery, - WEBHOOK_MAX_ATTEMPTS, -} from "../../workers/webhook.queue.js"; +import { enqueueWebhookDelivery } from "../../workers/webhook.queue.js"; import type { CreateWebhookBody, UpdateWebhookBody, diff --git a/apps/api/src/types/database.ts b/apps/api/src/types/database.ts index a8e0ae5..2fd5fd3 100644 --- a/apps/api/src/types/database.ts +++ b/apps/api/src/types/database.ts @@ -629,7 +629,8 @@ export type Database = { referencedColumns: ["id"] }, ] - } webhook_deliveries: { + } + webhook_deliveries: { Row: { id: string webhook_id: string