Skip to content

balyakin/pgrelay

Repository files navigation

PgRelay

PostgreSQL-backed transactional outbox and reliable background jobs for Python services.

PgRelay is for the awkward space between "just call the webhook in the request" and "we need a separate queueing platform". It stores jobs in the same PostgreSQL database your application already commits to, then runs them from a small asyncio worker. The SDK writes into your existing SQLAlchemy AsyncSession, so a domain row and the job that publishes it can commit or roll back together.

It is intentionally not an exactly-once system. PgRelay gives you at-least-once delivery, retries, leases, dead-letter jobs, replay, and enough operator API to see what happened. Any external side effect still needs an idempotency key on the receiving side.

PgRelay admin API screenshot placeholder

The project currently ships an admin API and CLI rather than a separate browser console. The image above is a local placeholder for the admin surface, included so repository pages and generated docs have something concrete to render. Replace it with a real console or /docs screenshot when that UI exists.

What You Get

  • Transactional enqueue from a caller-owned sqlalchemy.ext.asyncio.AsyncSession
  • HTTP jobs and Python handler jobs
  • PostgreSQL queues with pause/resume and per-queue concurrency limits
  • Worker leases, lease heartbeats, expired lease recovery, retries, and dead-letter state
  • Idempotency keys and dedupe keys
  • Replay and cancel operations for job recovery
  • FastAPI admin API with health, readiness, jobs, attempts, queues, stats, and worker heartbeats
  • Typer CLI for migrations, workers, replay, drain, purge, and environment checks
  • Worker counters and histograms built with prometheus-client

When It Fits

PgRelay is a good fit when your service already uses PostgreSQL and SQLAlchemy async, and the job is part of the same business fact you are committing. Typical examples are webhook delivery, search indexing, billing sync, email requests, or a small internal handler that should run after a transaction is durable.

It is not trying to replace Kafka, Temporal, Celery, SQS, or a workflow engine. If you need fan-out streams, long-running durable workflows, cross-language workers, hosted scheduling, or exactly-once effects in another system, use the tool built for that job.

Quick Start

Requirements:

  • Python 3.12 or 3.13
  • PostgreSQL 15+
  • Poetry 2.x for local development
  • Docker Compose if you want the one-command local stack

Run the local stack:

poetry install --with dev
docker compose up --build

The compose file starts PostgreSQL, runs migrations, then launches the admin API and one worker. The API listens on http://localhost:8090.

Check it:

curl http://localhost:8090/healthz
curl http://localhost:8090/readyz
curl -H "Authorization: Bearer dev-token-change-me" http://localhost:8090/v1/jobs

FastAPI's OpenAPI UI is available at:

http://localhost:8090/docs

The default token is for local development only. Set PGRELAY_API_AUTH_TOKENS before running anything that is reachable outside your machine.

Enqueue Inside Your Transaction

The important detail is ownership: PgRelay does not call commit() and does not close your session. Your application keeps the transaction boundary.

from pgrelay.sdk.client import PgRelayClient

client = PgRelayClient.from_env()

async with session_factory() as session:
    async with session.begin():
        session.add(order)

        await client.enqueue_http(
            session=session,
            name="order.webhook",
            url="https://example.com/webhooks/orders",
            json_body={"order_id": str(order.id)},
            idempotency_key=f"order-webhook:{order.id}",
            trace_id=request_id,
        )

If the transaction rolls back, the job rolls back with it. If the transaction commits, a worker can claim the job after commit.

Python handler jobs use the same pattern:

await client.enqueue_handler(
    session=session,
    name="orders.recalculate_totals",
    payload={"order_id": str(order.id)},
    queue_name="default",
    idempotency_key=f"recalculate:{order.id}",
)

CLI

pgrelay migrate upgrade
pgrelay migrate downgrade REVISION
pgrelay api
pgrelay worker
pgrelay replay JOB_ID
pgrelay replay JOB_ID --force
pgrelay drain default --timeout-seconds 300
pgrelay purge
pgrelay doctor

The CLI reads PGRELAY_* settings from the environment or .env.

Admin API

Unauthenticated endpoints:

Method Path Purpose
GET /healthz Process liveness
GET /readyz Database readiness

Authenticated endpoints require Authorization: Bearer <token>:

Method Path Purpose
POST /v1/jobs Enqueue a job through the admin API
GET /v1/jobs List jobs without payload fields
GET /v1/jobs/{job_id} Read one job, including payload
GET /v1/jobs/{job_id}/attempts Read attempt history
POST /v1/jobs/{job_id}/replay Create a fresh pending job from an existing one
POST /v1/jobs/{job_id}/cancel Cancel a pending job
GET /v1/queues List queues
PUT /v1/queues/{queue_name} Create or update a queue
POST /v1/queues/{queue_name}/pause Pause a queue
POST /v1/queues/{queue_name}/resume Resume a queue
GET /v1/stats Read queue and job stats
GET /v1/workers Read worker heartbeat rows

Example:

curl -H "Authorization: Bearer dev-token-change-me" \
  "http://localhost:8090/v1/jobs?status=pending&limit=25"

Configuration

PgRelay uses pydantic-settings with the PGRELAY_ prefix. The sample file is .env.example.

The settings you will usually touch first:

Setting Why it matters
PGRELAY_DATABASE_URL Runtime database URL. It must use postgresql+asyncpg://.
PGRELAY_API_AUTH_TOKENS Comma-separated bearer tokens for the admin API. Required in production.
PGRELAY_WORKER_QUEUES Comma-separated queue names a worker should claim from.
PGRELAY_WORKER_CONCURRENCY Maximum in-flight jobs per worker process.
PGRELAY_WORKER_LEASE_SECONDS Lease duration before another worker may recover a stuck job.
PGRELAY_HTTP_ALLOWED_HOSTS Allowlist for HTTP job targets.
PGRELAY_BLOCK_PRIVATE_NETWORK_TARGETS Blocks HTTP jobs from reaching private network targets by default.
PGRELAY_RETENTION_SUCCEEDED_DAYS How long succeeded jobs are kept before purge.
PGRELAY_RETENTION_DEAD_LETTER_DAYS How long dead-letter jobs are kept before purge.

In production, do not use the development token from docker-compose.yml. PgRelay refuses to start with that token when PGRELAY_ENV=prod.

Job Lifecycle

pending -> leased -> succeeded
                  -> pending      (retryable failure before max_attempts)
                  -> dead_letter  (max attempts reached or permanent failure)
pending -> cancelled
dead_letter/cancelled -> pending  (replay creates a new job id)

Workers claim pending jobs with PostgreSQL row locks and SKIP LOCKED, then heartbeat the lease while the job runs. If a worker dies, lease recovery returns the job to pending or moves it to dead_letter when attempts are exhausted.

Operational Notes

  • Design receivers to handle duplicate delivery. At-least-once is the contract.
  • Use stable idempotency_key values for effects that should not be queued twice.
  • Keep HTTP job hosts allowlisted. The worker follows no redirects and blocks private network targets by default.
  • Watch dead-letter jobs. They are usually either a receiver problem, a bad payload, or a missing idempotency rule.
  • Run more worker processes for throughput, but size the database pool so each worker has room to claim, heartbeat, and finish jobs.
  • Treat payloads as operational data. Job list endpoints omit payloads, but detail endpoints return them to authorized callers.

For state transitions, guarantees, and failure modes, see Architecture.

Project Status

This repository is currently at 0.1.0. The core API, worker, SDK, migrations, and local Docker stack are present, but the project should still be treated as young. Pin versions, test against your own failure modes, and expect the edges to be sharper than a mature hosted queue.

Planned next steps are intentionally modest: better wakeups with PostgreSQL LISTEN/NOTIFY, batch enqueue, and OpenTelemetry integration.

License

MIT. See LICENSE.

Development

This project was developed with AI assistance and is maintained by the author.

Releases

No releases published

Packages

 
 
 

Contributors