distributed-task-queue
A minimal distributed task queue built with Go, RabbitMQ, and Postgres. It exposes an HTTP API to enqueue tasks, workers to process them with retries and backoff, and Prometheus metrics for visibility.
- Reliable enqueue with idempotency keys and per-type defaults
- RabbitMQ topology with priorities, retry queues (TTL), and a DLQ
- Postgres persistence with simple task state machine and event log
- Worker example with per-attempt backoff strategies and prefetch control
- Health checks and a focused Prometheus
/metricsendpoint
- Postgres stores task rows and an event log.
- Tasks transition:
ENQUEUED → RUNNING → SUCCEEDED | FAILED(with scheduled retries). - Idempotency via a unique
idempotency_keyontasks.
- Tasks transition:
- RabbitMQ handles delivery:
- Main direct exchange routes to queues (e.g.,
tasks.default,tasks.high). - Per-priority retry queues (e.g.,
tasks.retry.default) dead-letter back to the main exchange after a TTL. - DLX (
tasks.dlx) routes terminal failures to a DLQ for inspection.
- Main direct exchange routes to queues (e.g.,
- API service:
POST /enqueuewrites to Postgres and publishes a small message envelope to RabbitMQ.GET /tasks/{id}returns current task state.GET /healthzchecks DB + RabbitMQ topology.GET /metricsexposes Prometheus metrics from a custom registry.
- Worker(s):
- Consume from each priority queue, lock the task row, mark running, execute handler, record result or schedule retry/failure.
Key code entry points:
- API bootstrap:
cmd/api/main.goandcmd/api/main.go - Enqueue handler:
internal/api/enqueue.go - Task read handler:
internal/api/tasks.go - Health handler:
internal/api/health.go - Metrics:
internal/metrics/metrics.go - RMQ topology:
internal/rmq/topology.go, initializercmd/rmq-init/main.go - Worker example:
examples/worker/main.go
Tables (see db/*.sql):
task_type: registry of allowed task types with defaults and optional schema (db/type_registry.sql).tasks: persisted tasks with status, attempts, result, payload, andidempotency_key(db/schema.sql).task_events: append-only per-task event log with triggers on insert/update (db/events.sql).
Statuses: ENQUEUED, RUNNING, SUCCEEDED, FAILED (and DLQ enumerated for completeness).
Triggers populate task_events on inserts and on status changes, including RETRY notes with last_error.
Prerequisites:
- Go 1.23+
- Docker + Docker Compose
- Make
- Copy env and adjust as needed
cp .env.example .env
# edit .env as needed (DB, RMQ, QUEUES, etc.)
- Start Postgres and RabbitMQ
make up
- Apply the database schema
make db-apply
- Seed a sample task type (optional but handy for testing)
make db-seed-type
- Ensure RabbitMQ topology (exchanges/queues/bindings based on env)
make init
- Run the API
make api
# listens on HTTP_PORT (e.g., :8080)
- Run a worker in another terminal
go run examples/worker/main.go
- Enqueue a task
curl -s -X POST localhost:8080/enqueue \
-H 'content-type: application/json' \
-d '{
"type": "email.send.v1",
"payload": {"to":"a@b.com","subj":"hi"},
"idempotency_key": "demo-1"
}'
- Fetch task status
curl -s localhost:8080/tasks/<task_id>
RabbitMQ Management UI: http://localhost:15672 (default creds in .env).
Required (API/worker):
HTTP_PORT: API listen address, e.g.:8080(cmd/api/main.go)DB_DSN: Postgres DSN, e.g.postgres://postgres:postgres@localhost:5432/appdb?sslmode=disable(cmd/api/main.go)RMQ_USER,RMQ_PASS,RMQ_HOST,RMQ_PORT,RMQ_VHOST: RabbitMQ connection (internal/rmq/url.go)RMQ_NAMESPACE: e.g.tasks(used for exchange/queue names) (internal/rmq/topology.go)QUEUES: CSV list of routing keys, e.g.default,high(internal/rmq/topology.go)
Backoff (worker):
- Strategy select via
BACKOFF_STRATEGY:list(default) |fixed|exponential(internal/backoff/backoff.go:56)- list:
BACKOFFS="5s,30s,2m,10m,1h" - fixed:
BACKOFF_FIXED="30s" - exponential:
BACKOFF_BASE,BACKOFF_FACTOR,BACKOFF_MAX,BACKOFF_JITTER
- list:
WORKER_PREFETCH: unacked message prefetch per consumer (default32).
Compose-only helpers (for make up):
PG_USER,PG_PASSWORD,PG_DATABASEfor the containerized Postgres.
Initializer (cmd/rmq-init) is idempotent and derives names from RMQ_NAMESPACE and QUEUES:
- Exchanges:
<ns>.direct(main),<ns>.dlx(dead-letter) - Queues:
<ns>.<rk>for each routing key;<ns>.dlqfor global dead letters - Retry queues:
<ns>.retry.<rk>withx-dead-letter-exchange=<ns>.directso messages re-enter the main flow after TTL
The worker publishes retries directly to the retry queue with a per-message TTL; the queue then dead-letters back to the main exchange with the original routing key.
- GET
/→ service info - GET
/healthz→ checks DB ping, existence of exchanges/queues (internal/api/health.go) - POST
/enqueue→ create or coalesce a task- Body:
type(string, required)payload(JSON, required)queue(string, optional; must be one ofQUEUESif provided)max_attempts(int, optional; default fromtask_type)idempotency_key(string, optional; coalesces duplicate requests)
- On success: HTTP 201 with
{id,status,queue} - Errors: 400 on validation/unknown type, 503 on RMQ publish, 500 on DB errors
- Source:
internal/api/enqueue.go
- Body:
- GET
/tasks/{id}→ current task state with a parsedresultfield (internal/api/tasks.go)
- Consumes from every priority queue (
examples/worker/main.go). - For each message
{id,type}:- Begin DB tx;
SELECT ... FOR UPDATEthe task row (internal/store/tasks_worker.go). - If already
SUCCEEDED, ack and skip (idempotent re-consume). - Guard
attempts < max_attempts. - Mark
RUNNINGand increment attempts. - Execute handler (
examples/worker/main.go).- On success:
SUCCEEDEDwithresultJSON → ack. - On error with attempts left: set
ENQUEUED+last_error, commit, publish to retry queue with TTL (backoff), ack. - On terminal error: mark
FAILED, publish to DLX with routing key, ack.
- On success:
- Begin DB tx;
Default demo handler implements email.send.v1 with a stub response.
Endpoint: GET /metrics exposes a custom registry only containing app metrics (internal/metrics/metrics.go).
- Registered in
metrics.MustRegisterAll()(cmd/api/main.go). - Exposed via
metrics.Expose(mux, "GET /metrics")(cmd/api/main.go).
Metrics:
dq_enqueue_total{type,queue,status}: Counter of enqueue attempts (ok|error).dq_enqueue_latency_seconds{type,queue}: Histogram of enqueue handler latency.
Examples (PromQL):
- Error rate by type/queue:
sum(rate(dq_enqueue_total{status="error"}[5m])) by (type, queue) - P95 latency:
histogram_quantile(0.95, sum(rate(dq_enqueue_latency_seconds_bucket[5m])) by (le, type, queue))
Note: Runtime/process metrics are not exported by default. To include them, register prometheus.NewGoCollector() and prometheus.NewProcessCollector(...) into the custom registry in internal/metrics/metrics.go.
make up/make down/make destroy– manage Postgres and RabbitMQ containersmake db-apply– apply schema and triggersmake db-seed-type– seed a sample type (email.send.v1 → high)make init– ensure RabbitMQ exchanges/queues/bindings (idempotent)make api– run the API locallymake logs/make ps/make config– inspect containersmake db-shell– psql inside the container
- Idempotency: repeated
POST /enqueuewith the sameidempotency_keyreturns the original task id/queue/status without duplicating work (internal/store/tasks_enqueue.go). - Observability: instrument more endpoints by adding counters/histograms to
internal/metrics/metrics.goand registering them. - Resiliency: worker uses transactional writes-before-ack to avoid losing results in the face of failures.
- Cleanup: data volumes are preserved across
make down; usemake destroyfor a fresh slate.
- Worker SDK (
pkg/worker):- A small runtime that manages DB and AMQP connections, sets QoS/prefetch, and handles graceful shutdown. It lets you register handlers per task type, applies built‑in backoff strategies (list, fixed, exponential) to publish TTL‑based retries, and performs transactional state transitions (RUNNING, SUCCEEDED, RETRY, FAILED) via the store. It will also expose basic metrics like handler latency and success/failure/retry counters, and Prometheus registerer.
- This is gonna reduce boilerplate and repeated wiring across worker apps, standardizes retries and metrics, and defines a clear worker contract so compatible workers can be implemented in any language.
MIT