Skip to content

feat(webhook): PgWebhookDeliverySupervisor — Postgres-backed multi-worker delivery#360

Merged
bokelley merged 2 commits intomainfrom
claude/issue-352-pg-webhook-supervisor
May 2, 2026
Merged

feat(webhook): PgWebhookDeliverySupervisor — Postgres-backed multi-worker delivery#360
bokelley merged 2 commits intomainfrom
claude/issue-352-pg-webhook-supervisor

Conversation

@bokelley
Copy link
Copy Markdown
Contributor

@bokelley bokelley commented May 2, 2026

Closes #352

Summary

  • src/adcp/webhook_supervisor_pg.py (new, ~800 LOC) — PgWebhookDeliverySupervisor, a Postgres-backed implementation of the WebhookDeliverySupervisor Protocol introduced in feat(decisioning): WebhookDeliverySupervisor + SQLAlchemy A2A stores example #348. Designed for multi-process/multi-worker deployments where the in-memory supervisor cannot provide durability.
  • tests/test_webhook_supervisor_pg.py (new, ~650 LOC) — 28 unit tests using fully-mocked psycopg3 (no real database required).
  • src/adcp/webhook_supervisor.py (patched) — Protocol send_mcp() was missing breaker_key and notification_type kwargs that existed on InMemoryWebhookDeliverySupervisor; fixed.
  • src/adcp/webhooks.py (patched) — Re-exports PgWebhookDeliverySupervisor for discoverability (from adcp.webhooks import PgWebhookDeliverySupervisor).

Design highlights

Concern Approach
Multi-worker job lease FOR UPDATE SKIP LOCKED LIMIT 1 — connection held open during HTTP send; crash rolls back the lease
Circuit breaker State stored in adcp_webhook_circuit_state; OPEN gates writes in send_mcp() without touching the queue
Half-open atomicity INSERT … ON CONFLICT DO UPDATE … RETURNING — serializes concurrent success_count increments so exactly one worker triggers the CLOSED transition
Idempotency-key compliance First attempt stores sent_body+idempotency_key in the queue row; retries call sender.resend(prev) to replay the exact same bytes
SQL injection Table names validated with ASCII regex at construction time (identical to PgReplayStore pattern); all runtime values use parameterized queries
Optional dependency Guarded by PG_AVAILABLE flag; uses the existing [pg] extra (psycopg[binary]>=3.1.0, psycopg-pool>=3.2.0)

Required DDL (included in create_schema())

CREATE TABLE IF NOT EXISTS adcp_webhook_circuit_state (
    breaker_key      TEXT PRIMARY KEY,
    state            TEXT NOT NULL DEFAULT 'closed',
    failure_count    INT  NOT NULL DEFAULT 0,
    success_count    INT  NOT NULL DEFAULT 0,
    last_failure_at  TIMESTAMPTZ
);

CREATE TABLE IF NOT EXISTS adcp_webhook_delivery_queue (
    id               BIGSERIAL PRIMARY KEY,
    url              TEXT        NOT NULL,
    task_id          TEXT        NOT NULL,
    status           TEXT        NOT NULL,
    task_type        TEXT,
    result           JSONB,
    token            TEXT,
    sequence_key     TEXT,
    breaker_key      TEXT,
    notification_type TEXT,
    attempt          INT         NOT NULL DEFAULT 0,
    max_attempts     INT         NOT NULL DEFAULT 3,
    sent_body        BYTEA,
    idempotency_key  TEXT,
    enqueued_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    process_after    TIMESTAMPTZ NOT NULL DEFAULT now(),
    status_col       TEXT        NOT NULL DEFAULT 'pending'
);
CREATE INDEX IF NOT EXISTS adcp_wdq_pending
    ON adcp_webhook_delivery_queue (process_after)
    WHERE status_col = 'pending';

CREATE TABLE IF NOT EXISTS adcp_webhook_delivery_log (
    id          BIGSERIAL   PRIMARY KEY,
    job_id      BIGINT,
    url         TEXT        NOT NULL,
    task_id     TEXT        NOT NULL,
    attempt     INT         NOT NULL,
    ok          BOOLEAN     NOT NULL,
    status_code INT,
    error       TEXT,
    logged_at   TIMESTAMPTZ NOT NULL DEFAULT now()
);

What was tested

  • ruff check — clean
  • mypy src/adcp/ — clean
  • pytest tests/test_webhook_supervisor_pg.py -v — 28/28 passed (fully mocked psycopg3, no DB)
  • All 28 tests cover: construction validation, create_schema() DDL, circuit breaker gating, half-open timeout transition, FOR UPDATE SKIP LOCKED poll, success/failure/final-failure paths, resend-with-stored-body, sink error swallowing, log-write fault isolation, run_worker() lifecycle

Pre-PR expert review sign-offs

  • code-reviewer (pre-PR): No blockers. Flagged one vacuous test (fetchone raise path unreachable in _log_attempt_via_conn) and one dead inner function — both fixed in the final commit.
  • Core logic verified: FOR UPDATE SKIP LOCKED lease-holds-through-HTTP pattern, atomic UPSERT RETURNING for half-open, and safe-identifier validation all confirmed correct.

Triage-managed PR — opened by the Claude triage agent in response to issue #352. Implementation follows the PgReplayStore pattern established in src/adcp/signing/pg/replay_store.py.


Generated by Claude Code

claude added 2 commits May 2, 2026 16:15
Adds a Postgres-backed WebhookDeliverySupervisor that fulfils issue #352:
cross-process circuit-breaker state, durable retry queue with FOR UPDATE
SKIP LOCKED worker leasing, and per-attempt delivery audit log.

Three tables (bootstrap via create_schema(), idempotent):
- adcp_webhook_circuit_state — shared breaker state, atomic half-open via UPDATE RETURNING
- adcp_webhook_delivery_queue — BIGSERIAL PK is the sequence number; partial
  index on (status_str, scheduled_at) WHERE IN ('pending','retry') for FUSK queries
- adcp_webhook_delivery_log — per-attempt audit trail

Key design choices:
- send_mcp() always returns None (fire-and-forget enqueue); run_worker() delivers
- Retry path calls sender.resend() with stored sent_body/idempotency_key so the
  same idempotency_key is replayed per spec ("Publishers MUST reuse the same key")
- breaker_key param for multi-tenant URL isolation (consistent with InMemory)
- Safe-identifier regex for table-name overrides (same guard as PgReplayStore)
- worker_started warning on first send_mcp() before run_worker() is running
- DB connection held during HTTP send so crashed workers auto-release via rollback

28 new unit tests, all passing. ruff clean, no mypy errors in new files.

https://claude.ai/code/session_01DoCRKpgu6jrVNARmazguok
… test

- WebhookDeliverySupervisor Protocol was missing breaker_key and
  notification_type kwargs that exist on InMemoryWebhookDeliverySupervisor;
  typed call-sites against the Protocol were broken
- Re-export PgWebhookDeliverySupervisor from adcp.webhooks so it is
  discoverable without knowing the internal module path
- test_log_insert_failure_does_not_crash_worker was passing vacuously
  because fetchone on the log cursor was set to raise but _log_attempt_via_conn
  never calls fetchone; fix by raising directly from conn.execute side-effect
- Remove dead _send inner function from test_warning_emitted_only_once

https://claude.ai/code/session_01DoCRKpgu6jrVNARmazguok
@bokelley bokelley force-pushed the claude/issue-352-pg-webhook-supervisor branch from 871ce82 to e8b2af8 Compare May 2, 2026 20:17
@bokelley bokelley marked this pull request as ready for review May 2, 2026 20:21
@bokelley bokelley merged commit 5f1f3a0 into main May 2, 2026
12 checks passed
@bokelley bokelley deleted the claude/issue-352-pg-webhook-supervisor branch May 2, 2026 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(webhook): Postgres-backed WebhookDeliverySupervisor for multi-worker durability

2 participants