Skip to content

Yatsuiii/rivet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rivet

A Postgres-backed task queue for Go. At-least-once delivery, exponential backoff retries, visibility timeouts to recover crashed workers, and SELECT FOR UPDATE SKIP LOCKED for safe concurrent dequeue.

No Redis. No separate broker. One table in your existing database.

Why Postgres

Most Go queues lean on Redis (Asynq) or RabbitMQ. Those work, but they add a second piece of infrastructure to operate, monitor, and back up. If your app already runs on Postgres, a queue on the same database means:

  • One backup and restore strategy
  • Transactional enqueue: insert a row and enqueue a job in the same transaction; either both happen or neither does
  • Familiar tooling: psql, pg_dump, EXPLAIN, Grafana dashboards
  • No new failure mode to learn

The two queries that make this work — FOR UPDATE SKIP LOCKED for contention-free dequeue, and a partial index on the ready-state — both ship with Postgres 9.5+. Rivet leans on them.

Status

Early. The store and worker are functional. Integration tests cover the happy path, retry-then-dead, SKIP LOCKED under contention, and reaping stuck jobs. There is no web UI, no cron scheduler, and no Redis backend yet. The Store interface is designed so Redis can be added later without touching client/worker code.

Quickstart

# 1. Install Postgres locally (Arch shown; adjust for your distro).
sudo pacman -S postgresql
sudo -iu postgres initdb -D /var/lib/postgres/data
sudo systemctl enable --now postgresql
sudo -iu postgres createuser -s $USER
createdb rivet_dev

# 2. Apply migrations.
psql rivet_dev -f migrations/001_initial.sql

# 3. Run the example. Enqueues 5 emails and one flaky job, then processes
#    them with a 2-goroutine worker pool.
DATABASE_URL=postgres://$USER@localhost/rivet_dev?sslmode=disable \
    go run ./examples/basic

You should see jobs processed in the log, and the flaky job retrying twice before succeeding on the third attempt.

Library usage

import "github.com/Yatsuiii/rivet/pkg/rivet"

store, _ := rivet.NewPostgresStore(ctx, os.Getenv("DATABASE_URL"))
defer store.Close()

// Enqueue from anywhere in your app.
store.Enqueue(ctx, rivet.EnqueueParams{
    Kind:    "send_email",
    Payload: json.RawMessage(`{"to":"a@b.com"}`),
})

// Run a worker pool elsewhere (or in the same process).
w := rivet.NewWorker(store, rivet.Config{Concurrency: 4})
w.Register("send_email", func(ctx context.Context, j *rivet.Job) error {
    // ... do work ...
    return nil
})
w.Run(ctx) // blocks until ctx cancelled

CLI

For operator tasks; production worker code should use the library directly.

DATABASE_URL=postgres://... go build -o bin/rivet ./cmd/rivet

bin/rivet enqueue send_email -payload '{"to":"a@b.com"}'
bin/rivet inspect            # counts by queue and status

Design choices

Dequeue uses SKIP LOCKED. Multiple workers polling the same queue do not block each other. Each grabs a different row in the same transaction or gets nothing and sleeps. No coordination service required.

Visibility timeout, not heartbeats. When a worker claims a job, the row's locked_until is set to NOW() + visibility. If the worker dies mid-job, the reaper goroutine (running on every worker) finds the expired lock and moves the job back to retry. There is no long-poll heartbeat to maintain; jobs longer than the visibility window must be chunked or the visibility raised.

Status is TEXT with a CHECK constraint, not a Postgres ENUM. ENUM ALTERs require schema rewrites. TEXT + CHECK is freely additive.

Exponential backoff without jitter. base * 2^(attempts-1), capped at a max. Jitter is omitted in v1; if thundering-herd shows up it goes in.

Worker writes use a detached context. Handler cancellation should not prevent the final status from committing. When the worker context is cancelled, in-flight handlers receive cancellation but the MarkSucceeded or MarkRetry call uses context.Background() so the result lands.

What's NOT here yet

  • Cron / scheduled-job UI (you can manually Delay: an enqueue, but there is no recurring-schedule abstraction)
  • Web dashboard
  • Redis backend (the Store interface is ready for it)
  • Job dependencies / DAG-style workflows
  • Per-queue rate limits
  • Prometheus metrics adapter (the worker logs; metrics would go through slog handler today)

Pull requests welcome on any of these.

Running tests

Unit tests are pure-Go and run with go test ./.... The interesting tests are integration tests that hit a real Postgres; they are tag-gated:

RIVET_TEST_DATABASE_URL=postgres://$USER@localhost/rivet_test?sslmode=disable \
    go test -tags=integration ./pkg/rivet -v

The integration suite drops and recreates the schema on each run. Point it at a database you do not mind losing.

License

MIT.

About

A Postgres-backed task queue for Go. At-least-once delivery, exponential backoff, SKIP LOCKED dequeue. No Redis, no broker.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors