River is a robust high-performance job processing system for Postgres (and SQLite). This repository is a behavior-faithful Crystal port preserving upstream semantics exactly.
Upstream ref: v0.37.1, pinned via git submodule at vendor/river/
Being built for Postgres, River encourages the use of the same database for application data and job queue. By enqueueing jobs transactionally along with other database changes, whole classes of distributed systems problems are avoided. Jobs are guaranteed to be enqueued if their transaction commits, are removed if their transaction rolls back, and aren't visible for work until commit.
# shard.yml
dependencies:
jobs:
github: dsisnero/jobsRequires Crystal >= 1.20.2 and PostgreSQL (or SQLite).
require "jobs"
require "db"
require "pg"
# Define a job
struct SortArgs
include JSON::Serializable
include Jobs::Core::JobArgs
property strings : Array(String)
def initialize(@strings : Array(String) = [] of String); end
def kind : String; "sort"; end
end
class SortWorker
include Jobs::Core::Worker(SortArgs)
def work(job : Jobs::Core::Job(SortArgs)) : Nil
job.args.strings.sort!
puts "Sorted: #{job.args.strings}"
end
end
# Setup
db = DB.open("postgres://localhost/river_test?sslmode=disable")
driver = RealPostgresDriver.new(db)
workers = Jobs::Core::Workers.new
workers.add(SortWorker.new)
config = Jobs::Client::Config.new(
queues: {"default" => Jobs::Client::QueueConfig.new(max_workers: 100)},
workers: workers,
).with_defaults
client = Jobs::Client::Client.new(config, driver)
# Insert and work
client.insert("sort", SortArgs.new(strings: ["whale", "tiger", "bear"]).to_json.to_slice)
client.start
# Subscribe to events
ch, cancel = client.subscribe(Jobs::Core::EventKind::JobCompleted)
spawn do
loop do
event = ch.receive
puts "Job ##{event.job.try(&.id)} completed"
end
end
# Graceful shutdown
client.stop
cancel.call# Build
crystal build src/cli/jobs.cr -o bin/jobs
# Run migrations
./bin/jobs up --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs list --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs validate --database-url postgres://localhost/river_test?sslmode=disable
./bin/jobs versionSee CLI Tools for full usage.
| Document | Description |
|---|---|
| Getting Started | First job, client setup, subscriptions |
| Job Insertion | Batch insert, unique jobs, scheduled, listing |
| Workers | Workers, hooks, middleware, retry policies, snoozing |
| Periodic Jobs | Cron-style scheduling, intervals |
| Subscriptions | Event monitoring and metrics |
| Error Handling | Error handlers, retries, cancellation |
| Graceful Shutdown | Soft/hard stop, shutdown sequences |
| CLI Tools | Migration commands |
| State Machine | Job state transitions diagram |
| Architecture | Package structure and design |
| Development | Setup, testing, contributing |
| Testing | Test patterns and helpers |
| Coding Guidelines | Code conventions |
| PR Workflow | Pull request process |
src/jobs/
├── client.cr # Config, QueueConfig, validation
├── client_cls.cr # Client: insert, subscribe, start/stop
├── core/ # Job, Worker, RetryPolicy, Event, etc.
├── driver/ # Driver interface, params, real_pg, sqlite
├── internal/ # Producer, JobExecutor, JobCompleter, etc.
│ ├── maintenance.cr # QueueCleaner, JobCleaner, JobRescuer, Reindexer, etc.
│ └── leadership.cr # Elector (leader election)
├── migrate/ # Migration framework + SQL files
├── shared/ # StartStop, CircuitBreaker, ServiceUtil, TimeUtil
└── types/ # JobRow, JobState, error types
- Transactional enqueueing — Jobs enqueued in a transaction are guaranteed to exist after commit and not before.
- Job Args + Workers — Define jobs with
JobArgsfor serialization andWorkerfor execution. - Multiple queues, priority, scheduling — Queue isolation, periodic/cron jobs, scheduled jobs, and snoozing.
- Middleware + Hooks — Global and per-job lifecycle interception.
- Error handling — Error handlers, retry policies (exponential backoff: attempt^4 seconds), and cancellation support.
- Graceful shutdown — Soft stop (wait for jobs) and hard stop modes.
- Subscriptions — Event-driven monitoring for logging, metrics, and alerts.
- Maintenance services — All 7 services: QueueCleaner, JobCleaner, JobRescuer, Reindexer, JobScheduler, PeriodicJobEnqueuer, QueueMaintainer.
- Leader election — Postgres advisory lock-based leader election.
- Unique jobs — By args, period, queue, and state (SHA256 key generation).
- Migrations — CLI tool (
jobs) and programmatic migration API. - Dual drivers — PostgreSQL (production) and SQLite (in-memory testing).
| Driver | File | Status |
|---|---|---|
| PostgreSQL | src/jobs/driver/real_pg.cr |
Complete (993 lines) |
| SQLite | src/jobs/driver/sqlite.cr |
Complete (48 methods, 25 TDD specs) |
make install # Install dependencies
make format # Format Crystal source
make lint # Run ameba linter
make test # Run Crystal specs
make clean # Remove build artifactsMIT — Port of River upstream code.