Skip to content

feat: complete PgBackend for IdempotencyStore (multi-worker durable dedup) #548

@bokelley

Description

@bokelley

Summary

adcp.server.idempotency.backends.PgBackend is documented as "a scaffold for a SQLAlchemy/asyncpg-backed store that can be wrapped with adopter logic" — i.e. not actually wireable yet. MemoryBackend is the only working backend.

Multi-worker adopters running ≥2 processes have no first-party path for durable idempotency replay. Sellers declaring IdempotencySupported(supported=True, replay_ttl_seconds=86400) in capabilities can't dedupe across workers and silently lie to buyers. salesagent is currently in this state and would test-ship PgBackend the moment it lands.

Migration / schema strategy (answering @bokelley's pre-implementation question)

SDK ships its own create_schema() method, not an Alembic migration. Pattern matches PgWebhookDeliverySupervisor.create_schema() which already exists at webhook_supervisor_pg.py — adopters call it once at boot, idempotently. Specifically:

from adcp.server.idempotency import IdempotencyStore, PgBackend

backend = PgBackend(pool=my_pool)            # asyncpg or psycopg pool
await backend.create_schema()                # idempotent CREATE TABLE IF NOT EXISTS

store = IdempotencyStore(backend=backend, ttl_seconds=86400)
# … wire to serve() as today

Why not Alembic:

  1. No coupling to adopter migration tooling. salesagent uses Alembic for its own schema (tenants, principals, products, media_buys, etc.). If the SDK shipped Alembic migrations, two migration ladders would coexist (adcp_* migrations vs salesagent's), with shared revision-id space and no shared head. Adopters not on Alembic (Django ORM, raw SQL, dbmate) couldn't use the SDK's migrations at all.

  2. Idempotent boot-time creation matches the existing precedent. PgWebhookDeliverySupervisor.create_schema() is the established pattern and it works well. New adopters call it from their serve() lifespan startup. The SQL is shipped as a plain .sql file next to the Python source — adopters who DO use Alembic can copy that SQL into a new revision when they want explicit version control. (salesagent will likely do exactly this — copy adcp/server/idempotency/idempotency.sql into a new salesagent Alembic revision so the schema is captured in our migration history.)

  3. Adopter-driven schema evolution. SDK schema changes (e.g. adding an index in v4.4) go in a new create_schema_v4_4() method or a versioned bump inside create_schema(). Adopters explicitly opt into the migration on their next deploy. No surprise schema drift.

Acceptable variants if the team disagrees:

  • Ship Alembic migrations under adcp/server/idempotency/alembic/ with a documented config that adopters can include via Alembic's version_locations. More work for adopters; cleaner for SDK-managed evolution.
  • Ship both: create_schema() for adopters who want it idempotent, plus the Alembic revision for adopters who want explicit version control. Both read the same SQL file.

I'd start with #2 (the existing pattern) and only add Alembic later if adopter feedback asks for it.

Schema sketch

Mirroring the structure of MemoryBackend (which I read in passing — store is keyed by (tenant_id, caller_identity, idempotency_key) with a wire-shape response payload + expiry):

CREATE TABLE IF NOT EXISTS adcp_idempotency_records (
    tenant_id           TEXT       NOT NULL,
    caller_identity     TEXT       NOT NULL,
    idempotency_key     TEXT       NOT NULL,
    response_payload    JSONB      NOT NULL,    -- the canonical response to replay
    response_status     SMALLINT   NOT NULL,    -- HTTP / JSON-RPC outcome
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    expires_at          TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (tenant_id, caller_identity, idempotency_key)
);

CREATE INDEX IF NOT EXISTS idx_adcp_idempotency_expiry
    ON adcp_idempotency_records (expires_at);

Eviction via background task scanning expires_at < now() - retention, or lazy on read (rejecting expired entries and letting them get reaped by a periodic job). Both work; lazy-on-read is simpler and matches MemoryBackend's behavior.

Critical correctness contract

The most important property: the idempotency record commit must be in the same transaction as the handler's business writes, otherwise a crash between "handler succeeded" and "idempotency record persisted" loses dedup on the next attempt. The MemoryBackend docstring already calls this out as the backend's choice (in-memory has no atomicity guarantee; PG can have one).

Recommended: PgBackend exposes a record(tenant_id, caller_identity, key, response, *, conn: AsyncConnection) method that takes the same asyncpg/psycopg connection the handler used for business writes, so adopters wire it inside their existing transaction. The IdempotencyStore higher layer can default to a fresh connection when adopters don't care.

Memory-leak lens

The current MemoryBackend is unbounded — entries land in a dict and get TTL-evicted on read. Adopters running long-lived processes with high idempotency_key cardinality could see slow growth here. Worth confirming MemoryBackend has a hard size ceiling in addition to TTL (cap entries at e.g. 10K, evict oldest on overflow). That's a separate small fix worth landing as part of this thread.

salesagent's current production memory-leak shape (~3-4 day OOM cycle, linear ramp to 12 GB) doesn't match an idempotency cache profile (entries should turn over faster than replay_ttl_seconds=86400), but the SDK shipping a bounded MemoryBackend is a defensive default regardless.

What salesagent ships when this lands

  • Wire PgBackend(pool=salesagent_pool) to the existing IdempotencyStore plumbing
  • Adopt the create_schema() SQL into a salesagent Alembic revision so the schema is captured in salesagent's migration history
  • Stop having to declare IdempotencySupported(supported=True) while not actually deduping (it's a current footgun in salesagent's core/platforms/mock.py and gam.py)

🤖 Filed via Claude Code

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions