Skip to content

bete7512/pulse

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pulse

pulse

A reliable, Postgres-backed distributed job queue in Go — gRPC server, typed SDK, operator CLI, web dashboard.

What it solves

Background work is easy until something fails: a worker crashes mid-job, a downstream dependency flakes, a burst of low-value work buries an urgent task, or ops needs to touch the database while nothing is running. pulse runs jobs across a pool of workers so that:

  • A job is never lost. Submitted work is durable in Postgres before the submit returns.
  • Failures retry with backoff, then dead-letter. attempts² spacing, a bounded attempt cap, and the last error preserved for inspection — poison jobs can't loop forever.
  • Crashed workers are recovered automatically. Claims carry a lease; workers heartbeat to keep it; a watchdog routes lapsed leases back through the retry path. Delivery is honestly at-least-once — handlers get job_id + attempt as an idempotency key.
  • Urgent work jumps the queue. Per-job priority, FIFO within a priority.
  • Work can be scheduled — once (At/After), on an interval (Every), or by Cron, exactly-once per occurrence, safe across server replicas and restarts.
  • Dispatch can be paused for a maintenance window: submits keep landing, running jobs finish, the backlog drains on resume — and the switch survives restarts.

How it works

One jobs row holds each job's state, retry policy, and worker lease. Everything concurrency-critical is a handful of SQL statements:

  • Claims are FOR UPDATE SKIP LOCKED batches over a partial index: concurrent workers take disjoint batches — there is nothing to race — and a dispatch tick costs O(batch) whether the backlog is 1k or 1M jobs.
  • State transitions are guarded UPDATEs: the legality of every transition ("can't complete a job that isn't running") lives in its WHERE clause, enforced by the database, race-free by construction. Any number of workers and any number of server replicas are safe against one Postgres.
  • Handlers run in your process, not the server's: workers stream assignments over gRPC, execute your registered function locally, and report back. The server only ever moves data.

Architecture

   YOUR APP — the SDK                              PULSE SERVER — pulsed
   ┌──────────────────────┐                       ┌─────────────────────────┐
   │ Enqueue(name, args)  │ ──────  SubmitJob ──► │ INSERT job (PENDING)    │
   │ Register(name, fn)   │ ──────  StreamJobs ─► │ claim batch → send      │
   │ Run()                │ ◄─────  assignment ── │  (SKIP LOCKED + lease)  │
   │   fn() runs LOCALLY  │                       │                         │
   │   ReportResult       │ ───  ReportResult ──► │ complete / retry / DLQ  │
   │   Heartbeat (~10s)   │ ──────  Heartbeat ──► │ extend lease (fenced)   │
   └──────────────────────┘                       └─────────────────────────┘
     handlers run in your process           one jobs table holds state,
     — only data crosses the wire           retry policy, and the lease

   background loops inside the server:
     • watchdog   → reaps lapsed leases back into the retry path
     • scheduler  → fires due schedules → inserts new jobs
     • pause loop → converges the dispatch gate to the durable pause switch

Full package layout, request flows, and guarantees: ARCHITECTURE.md. Every non-trivial decision has a record in docs/adr/. Browsable docs: pulse.beteg.dev.

Setup

Configuration — the server takes everything from environment variables:

variable required description
DB_HOST yes Postgres URL, e.g. postgres://user:pass@host:5432/pulse (migrations run on boot)
PULSE_GRPC_ADDR no listen port (default 50051)
PULSE_AUTH_USERS no enable auth: "user:pass,user:$2a$10$…" — plaintext or bcrypt (pulse passwd); unset = open
PULSE_TLS_CERT / PULSE_TLS_KEY no serve TLS from this certificate/key pair (both or neither; clients use WithTLS / --tls)

Run the server — pick one:

# docker compose: postgres + pulsed on :50051 + web dashboard on :8080
curl -fsSL https://raw.githubusercontent.com/bete7512/pulse/main/docker-compose.yml -o docker-compose.yml
docker compose up            # pulls bete7512/pulse; from a clone, `up --build` builds locally
# plain docker with the published image (or build your own: docker build -t pulse .)
docker run -p 50051:50051 \
  -e DB_HOST=postgres://user:pass@host:5432/pulse \
  -e PULSE_AUTH_USERS='ops:s3cret' \
  bete7512/pulse
# from source: .env with DB_HOST=postgres://user:pass@localhost:5432/pulse
make migrate_up && make run

Use the SDK (go get github.com/bete7512/pulse):

type EmailArgs struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
}

p, _ := pulse.New("localhost:50051", pulse.WithConcurrency(20))
defer p.Close()

// register a handler — a plain typed function:
pulse.Register(p, "send-email", func(ctx context.Context, a EmailArgs) error {
	return sendEmail(a.To, a.Subject)
})

// enqueue by name; jump the queue with a priority (default 0):
pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "a@b.com", Subject: "Welcome"})
pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "vip@b.com"}, pulse.WithPriority(10))

// schedule work — once, on an interval, or by cron:
p.Schedule(ctx, "reconcile", payload, pulse.Every(5*time.Minute))
p.Schedule(ctx, "rollup", payload, pulse.Cron("0 * * * *"))

p.Run(ctx) // process jobs until ctx is cancelled

Authentication (optional) — NATS-style username/password. Set PULSE_AUTH_USERS="ops:s3cret,worker:hunter2" on the server and pass pulse.WithUserPass(user, pass) in clients (pair with pulse.WithTLS in production). Store bcrypt hashes instead of plaintext with pulse passwd <password>: PULSE_AUTH_USERS='ops:$2a$10$…' — clients still send the plaintext password. Unset = open, for local dev.

See examples/main.go for a runnable tour (handlers, retries with the attempt idempotency key, priority, schedules, a long job kept alive by heartbeats), and docs/crash-recovery.md for a captured kill -9 recovery run.

Benchmarks

2,000 jobs across 8 workers, one laptop, no-op handler (BENCHMARKS.md):

metric value
submit rate 4,169 jobs/s
end-to-end throughput 1,223 jobs/s
dispatch p50 / p95 773ms / 1.18s
claim conflicts (aborted transactions) 0
database transactions per job ~1.6

Claims stay contention-free at any worker count (SKIP LOCKED batches are disjoint by construction) and O(batch) at any backlog depth (partial-index LIMIT).

CLI

go install github.com/bete7512/pulse/cli/pulse@latest

# one-time: save the server URL and credentials (~/.config/pulse/config.json)
pulse save creds --url localhost:50051 --username ops --password s3cret
$ pulse submit send-email '{"to":"a@b.com"}' --priority 5
019f2ec0-afad-7082-b27f-f7083ccb543d

$ pulse jobs list --status pending
ID                                    TOPIC       STATUS   PRIO  ATTEMPTS  SUBMITTED            ERROR
019f2ec0-afad-7082-b27f-f7083ccb543d  send-email  PENDING  5     0         2026-07-04 23:10:07

$ pulse dispatch pause -m "db maintenance"
dispatch paused

$ pulse dispatch status
dispatch: paused since 2026-07-04 23:12:41
reason:   db maintenance

$ pulse dispatch resume
dispatch resumed

$ pulse config view
config:   /home/bete/.config/pulse/config.json
url:      localhost:50051
tls:      false
username: ops
password: <redacted>
$ pulse stats
TOPIC         PENDING  RUNNING  RETRYING  COMPLETED  DEAD  CANCELED
flaky-report  2        1        3         1204       6     0
send-email    14       8        0         18342      2     5
slow-job      0        4        0         311        0     1
TOTAL         16       13       3         19857      8     6

oldest waiting job: 2026-07-05 17:41:02 (12s)

Also: pulse jobs get|cancel|requeue <id>, pulse jobs list --status dead_lettered --topic report (the dead-letter inbox, with each job's last error — requeue puts one back in line with a fresh retry budget), pulse schedules list, pulse passwd <password>. Precedence for connections: --addr > PULSE_ADDR/PULSE_USER/PULSE_PASSWORD > saved config > localhost:50051.

Web dashboard

pulseui is a self-contained dashboard binary: queue composition per topic and status, a throughput chart (completed vs dead-lettered, last hour), a filterable job browser with payloads and per-job cancel/requeue, schedule controls, and the dispatch pause switch. It is server-rendered Go templates + htmx — no Node, no build step — and talks to pulsed only through the SDK, exactly like any other client.

docker compose up            # included: dashboard on http://localhost:8080
docker run -p 8080:8080 -e PULSE_ADDR=host:50051 bete7512/pulse-ui
go run ./ui/pulseui          # from a clone
variable default description
PULSE_ADDR localhost:50051 pulsed gRPC address
PULSE_USER / PULSE_PASSWORD fixed service credentials: skips the login flow
PULSE_UI_PORT 8080 HTTP listen port

Sign-in follows the server. Without env credentials the dashboard probes pulsed: an open server needs no login; a server with PULSE_AUTH_USERS gets a login page. Credentials are verified against pulsed itself, and each browser session holds its own connection — so cancel/requeue/pause in the server's audit log carry the signed-in username, not a shared service account.

Development

make migrate_up               # apply DB migrations (goose)
make run                      # run the server (cmd/pulsed)
go test -race ./...           # tests (integration via TEST_DB_URL)
go run ./cmd/loadgen --jobs 2000 --streams 8   # load test against a running server

Stack: Go · gRPC · Postgres (pgx) · goose.

License & contributing

MIT — see LICENSE. Contributions welcome: CONTRIBUTING.md.

About

A reliable, Postgres-backed distributed background job queue in Go

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors