Skip to content

Taure/shigoto

Repository files navigation

Shigoto 仕事

PostgreSQL-backed background job processing for Erlang.

Shigoto (仕事, "work") is a feature-rich job queue built on PostgreSQL's FOR UPDATE SKIP LOCKED for safe multi-node operation. No Redis or external broker needed — if you have PostgreSQL, you have a job queue.

Features

Core

  • PostgreSQL-backed — Jobs stored in PostgreSQL via pgo. Transactional enqueue using the same pool as your application.
  • Multi-node safeFOR UPDATE SKIP LOCKED ensures each job is claimed by exactly one node. No coordination required.
  • Priority queues — Higher priority jobs are claimed first. Multiple queues with independent concurrency limits.
  • Dynamic queues — Add and remove queues at runtime without restart.
  • Exponential backoff — Failed jobs retry with min(attempt^4 + jitter, 1800) second delays, or provide a custom backoff/2 callback.

Scheduling

  • Cron scheduling — Built-in 5-field cron parser with leader election via advisory locks. Catches up on missed intervals after restarts.
  • Scheduled jobs — Enqueue jobs for future execution with scheduled_at.
  • Job snoozing — Workers can return {snooze, Seconds} to reschedule without consuming a retry attempt.

Reliability

  • Job dependenciesdepends_on chains ensure jobs execute in order. Cycle detection prevents deadlocks.
  • Batches — Group jobs with completion callbacks. Batch state tracks completed/discarded counts.
  • Unique jobs — Prevent duplicates with configurable keys, states, time windows, debounce, and field replacement on conflict.
  • Stale job rescue — Heartbeat-based detection of zombie jobs with automatic rescheduling.
  • Graceful shutdown — Waits for in-flight jobs to finish before stopping.

Resilience (seki integration)

  • Rate limiting — Per-worker token bucket, sliding window, GCRA, or leaky bucket.
  • Circuit breaking — Auto-opens on repeated failures, configurable per worker.
  • Bulkhead — Per-worker concurrency limits (local node) and global concurrency limits (across nodes via PostgreSQL).
  • Load shedding — CoDel-based system-level protection, shedding low-priority jobs first.

Operations

  • Encryption — AES-256-GCM encryption for job args at rest with key rotation support.
  • Middleware — Composable before/after hooks for logging, metrics, authorization.
  • Telemetry — 16+ telemetry events covering job lifecycle, queue operations, resilience, batches, and cron.
  • Health checkshigoto:health/0 reports pool status, job counts, stale jobs, and queue health.
  • Bulk operationsinsert_all/1, cancel_by/2, retry_by/2 for batch operations.
  • Auto-archival — Old jobs archived to shigoto_jobs_archive table, then pruned.
  • Dashboardshigoto_board provides a real-time web dashboard.

Testing

  • Synchronous draindrain_queue/1 processes all jobs synchronously for deterministic tests.
  • 74 tests — Comprehensive test coverage across 3 Common Test suites.

Quick Start

Add to your deps:

{deps, [
    {shigoto, {git, "https://github.com/Taure/shigoto.git", {branch, "main"}}}
]}.

Configure in sys.config:

{shigoto, [
    {pool, my_app_db},
    {queues, [{<<"default">>, 10}, {<<"emails">>, 5}]},
    {poll_interval, 5000},
    {cron, [
        {<<"daily_cleanup">>, <<"0 3 * * *">>, my_cleanup_worker, #{}}
    ]}
]}

Run the migration:

shigoto_migration:up(my_app_db).

Define a worker:

-module(my_email_worker).
-behaviour(shigoto_worker).
-export([perform/1]).

perform(#{<<"to">> := To, <<"subject">> := Subject}) ->
    send_email(To, Subject),
    ok.

Enqueue jobs:

%% Simple insert
shigoto:insert(#{
    worker => my_email_worker,
    args => #{<<"to">> => <<"user@example.com">>, <<"subject">> => <<"Welcome">>}
}).

%% Scheduled for later
shigoto:insert(#{
    worker => my_cleanup_worker,
    scheduled_at => {{2026, 3, 20}, {3, 0, 0}}
}).

%% With priority and queue
shigoto:insert(#{
    worker => my_urgent_worker,
    args => #{},
    priority => 10,
    queue => <<"critical">>
}).

Job Lifecycle

available → executing → completed
                     ↘ retryable → available (retry with backoff)
                     ↘ discarded (max attempts reached)
                     ↘ snoozed → available (rescheduled, attempt preserved)

Jobs can also be cancelled via shigoto:cancel/2 and retried via shigoto:retry/2.

Worker Callbacks

All optional except perform/1:

Callback Default Description
perform/1 required Execute the job. Return ok, {error, Reason}, or {snooze, Seconds}
max_attempts/0 3 Maximum retry attempts before discarding
queue/0 <<"default">> Default queue name
priority/0 0 Default priority (higher = claimed first)
timeout/0 300000 Execution timeout in milliseconds
unique/0 Uniqueness constraints
tags/0 [] Default tags for filtering
backoff/2 exponential Custom retry delay: (Attempt, Error) -> Seconds
rate_limit/0 Seki rate limiter config
concurrency/0 Max concurrent executions per node (seki bulkhead)
global_concurrency/0 Max concurrent executions across all nodes
circuit_breaker/0 Per-worker circuit breaker thresholds
middleware/0 [] Worker-specific middleware chain
on_discard/2 Called when a job is permanently discarded

Configuration

Option Default Description
pool required pgo pool name
queues [{<<"default">>, 10}] Queue names and concurrency limits
poll_interval 5000 Milliseconds between polling
cron [] {Name, Schedule, Worker, Args} tuples
prune_after_days 14 Days to keep completed/discarded jobs
shutdown_timeout 15000 Milliseconds to wait for in-flight jobs
middleware [] Global middleware chain
encryption_key 32-byte AES-256-GCM key
encryption_keys [] Ordered key list for rotation (newest first)
heartbeat_interval 30000 Stale job detection interval
load_shedding CoDel config map for seki
queue_weights #{} Weighted polling distribution
fair_queues [] Queues using partition-key fair claiming
notifier LISTEN/NOTIFY connection config

Supervision Tree

shigoto_sup (one_for_one)
  ├─ shigoto_executor_sup    — simple_one_for_one for job execution
  ├─ shigoto_queue_sup       — one gen_server per queue
  │    ├─ shigoto_queue:default
  │    └─ shigoto_queue:emails
  ├─ shigoto_cron            — cron scheduling with leader election
  ├─ shigoto_pruner          — hourly archival and cleanup
  ├─ shigoto_heartbeat       — periodic heartbeat updates
  └─ shigoto_notifier        — LISTEN/NOTIFY (optional)

Guides

Ecosystem

Requirements

  • Erlang/OTP 27+
  • PostgreSQL 9.5+

License

MIT

About

PostgreSQL-backed background job processing for the Nova ecosystem

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages