Skip to content

AceCodePt/postgres-js-queue

Repository files navigation

postgres-js-queue

npm version npm downloads license typescript

A minimalist PostgreSQL queuing library for Node.js. Built for high performance and simplicity, designed to be used with a postgres.js instance.

Table of Contents

Features

Core

  • Minimal Runtime Footprint: Lightweight and fast. Optional runtime validation using your choice of Standard Schema-compliant library.
  • Type-Safe Channels: Each queue instance is bound to a specific payload type and channel.
  • Runtime Validation: Optional schema-based validation using Zod, Valibot, ArkType, or any Standard Schema-compliant library.
  • Atomic Processing: Uses SKIP LOCKED for reliable, concurrent-safe job fetching.
  • Multi-Queue Architecture: Run multiple independent queues on the same database.
  • Migration CLI: Built-in tool to set up your database schema using standard PG* env vars.
  • TypeScript First: Written in TypeScript with the strictest configuration for maximum type safety.

Additional Capabilities

  • Broadcasting: One-to-many message distribution via broadcast: true option
  • Unified Listener API: Single listen() method handles both queued jobs and broadcasts
  • Stale Job Recovery: Reclaims stale active jobs when workers crash
  • Reconciliation Polling: Periodic sweep to process ready jobs even if NOTIFY is missed

Operational Features

  • Listener Heartbeat: Track active workers with UUIDv7-based health monitoring
  • Delayed Jobs: Schedule jobs to run at a specific time or after a delay
  • Automatic Retries: Exponential backoff retry policy for failed jobs
  • Graceful Shutdown: Ensure in-flight jobs complete before process exit

Installation

# pnpm
pnpm add postgres-js-queue

# bun
bun add postgres-js-queue

# npm
npm install postgres-js-queue

Migrations

This package includes a CLI tool to set up the necessary PostgreSQL schema and manage your own migrations.

Generate a Migration

Create a new migration file in the migrations/ directory.

npx postgres-js-queue generate

Run Migrations

Apply all pending migrations in the migrations/ directory.

Queue storage model:

  • queue.id is uuid (generated by PostgreSQL).
  • queue.channel is persisted and used for channel-specific fetching.
  • broadcasts is used for broadcast fan-out with LISTEN/NOTIFY.

Using pnpm

PGHOST=localhost PGDATABASE=db pnpm dlx postgres-js-queue migrate

Using bun

PGHOST=localhost PGDATABASE=db bunx postgres-js-queue migrate

Using npx

PGHOST=localhost PGDATABASE=db npx postgres-js-queue migrate

You can also use a connection string:

DATABASE_URL=postgres://user:pass@localhost:5432/db npx postgres-js-queue migrate

Note: Use the full postgres-js-queue binary name for commands.

Quick Start

Basic Usage (Type-only, compile-time validation)

import postgres from 'postgres';
import Queue from 'postgres-js-queue';

interface EmailJob {
  to: string;
  body: string;
}

const sql = postgres('postgres://...');
const emailQueue = Queue<EmailJob>(sql, { channel: 'emails' });

// Enqueue a job (exactly-once processing)
await emailQueue.enqueue({ to: 'alice@example.com', body: 'Hello!' });

// Broadcast a message (all listeners receive it)
await emailQueue.enqueue(
  { to: 'all@example.com', body: 'System maintenance in 10 mins' },
  { broadcast: true }
);

// Single listener handles both queued jobs and broadcasts
await emailQueue.listen(async (payload, job) => {
  // job: { id: string (UUIDv7), type: 'queue' | 'broadcast', createdAt: Date }
  console.log(`Processing ${job.type} job ${job.id}`);
  console.log('Sending email to:', payload.to);
  
  // Job is automatically marked as 'completed' on success
  // or 'failed' if an error is thrown (queued jobs only)
});

With Runtime Validation (Zod)

import postgres from 'postgres';
import Queue from 'postgres-js-queue';
import { z } from 'zod';

// Define schema
const emailJobSchema = z.object({
  to: z.string().email(),
  body: z.string().min(1)
});

const sql = postgres('postgres://...');

// Type is automatically inferred from schema
const emailQueue = Queue(sql, { 
  channel: 'emails',
  schema: emailJobSchema 
});

// Runtime validation on enqueue
await emailQueue.enqueue({ 
  to: 'alice@example.com', 
  body: 'Hello!' 
}); // ✅ Valid

await emailQueue.enqueue({ 
  to: 'invalid-email', 
  body: '' 
}); // ❌ Throws ValidationError

// Payload is validated and strongly typed
await emailQueue.listen(async (payload, job) => {
  // payload is inferred as { to: string; body: string }
  // job: { id: string, type: 'queue' | 'broadcast', createdAt: Date }
  console.log(payload.to); // TypeScript knows this is a valid email
});

With Runtime Validation (Valibot)

import postgres from 'postgres';
import Queue from 'postgres-js-queue';
import * as v from 'valibot';

const reportJobSchema = v.object({
  report_id: v.number(),
  format: v.picklist(['pdf', 'csv'])
});

const sql = postgres('postgres://...');
const reportQueue = Queue(sql, { 
  channel: 'reports',
  schema: reportJobSchema 
});

await reportQueue.enqueue({ report_id: 123, format: 'pdf' });

await reportQueue.listen(async (payload, job) => {
  // payload.format is 'pdf' | 'csv' (type-safe)
  // job: { id: string, type: 'queue' | 'broadcast', createdAt: Date }
  console.log('Processing report job:', job.id);
});

API Reference

Queue<T>(sql, options?)

Creates a new queue instance bound to a specific payload type and channel.

Parameters:

  • sql: A postgres.js instance
  • options (optional):
    • channel (string): PostgreSQL LISTEN/NOTIFY channel. If omitted, a unique ID is auto-generated.
    • schema (StandardSchema): Optional validation schema. Supports Zod, Valibot, ArkType, and any Standard Schema-compliant library.

Returns: QueueInstance<T>

queue.enqueue(payload, options?)

Adds a job to the queue or broadcasts a message to all listeners.

Parameters:

  • payload (T): Job data. If a schema was provided, this is validated at runtime.
  • options (optional):
    • broadcast (boolean): If true, sends the message to all active listeners instead of queueing it for exactly-once processing (default: false)
    • runAt (Date): Schedule the job to run at a specific time
    • delay (number): Delay execution by this many milliseconds

Returns: Promise<void>

Throws: ValidationError if schema validation fails.

Behavior:

  • Queue mode (broadcast: false, default):

    • Inserts job into the queue table with status = 'pending'
    • Sends NOTIFY to wake up listeners
    • One listener atomically claims the job using SKIP LOCKED
    • Job is tracked through pendingactivecompleted/failed states
  • Broadcast mode (broadcast: true):

    • Inserts into broadcasts table and triggers NOTIFY
    • All active listeners receive the message simultaneously
    • No queue status transitions (fire-and-forget semantics)

queue.listen(handler)

Starts listening for messages on the queue's channel. Handles both queued jobs (exactly-once) and broadcasts (at-least-once).

Parameters:

  • handler: async (payload: T, job: Job) => void
    • payload: The message data (validated if schema was provided)
    • job: Minimal job metadata
      • id (string): UUIDv7 identifier
      • type ('queue' | 'broadcast'): Delivery mode
      • createdAt (Date): When the job/broadcast was created

Returns: Promise<{ stop: () => Promise<void>; unlisten: () => Promise<void> }> (resolves when the listener is active)

Behavior:

  • Uses PostgreSQL LISTEN/NOTIFY for instant wakeup
  • Includes reconciliation sweeps (configurable) to process ready jobs even if a NOTIFY event is missed
  • stop() gracefully shuts down the listener: it stops accepting new jobs, waits for the current in-flight handler to finish, then unsubscribes.
  • unlisten() is an alias of stop() for backwards compatibility.
  • For queued jobs: Workers race to atomically claim jobs using SKIP LOCKED. If the handler completes successfully, the job is marked as completed. If the handler throws an error, the job is marked as failed (or retried based on retry config).
  • For broadcasts: All active listeners receive the message. Broadcast payloads are persisted in broadcasts, but there are no queue-style status transitions.

Multi-Queue Architecture

You can run multiple independent queues on the same database. Each queue has its own channel and can have different payload types, validation schemas, and configurations:

import { z } from 'zod';

// Email queue with Zod validation
const emailJobSchema = z.object({
  to: z.string().email(),
  subject: z.string(),
  body: z.string()
});

const emailQueue = Queue(sql, {
  channel: 'emails',
  schema: emailJobSchema
});

// Report queue with different schema
const reportQueue = Queue(sql, { channel: 'reports' });

const notificationQueue = Queue(sql, { channel: 'notifications' });

// Each queue processes only its own jobs
await emailQueue.listen(async (payload, job) => {
  // job: { id: string, type: 'queue' | 'broadcast', createdAt: Date }
  await sendEmail(payload);
});

await reportQueue.listen(async (payload, job) => {
  await generateReport(payload);
});

await notificationQueue.listen(async (payload, job) => {
  await sendNotification(payload);
});

// Queue jobs
await emailQueue.enqueue({ to: 'alice@example.com', subject: 'Hi', body: '...' });
await reportQueue.enqueue({ reportId: 123, format: 'pdf' });

// Broadcast to all notification listeners
await notificationQueue.enqueue(
  { type: 'system_alert', message: 'Maintenance mode' },
  { broadcast: true }
);

Additional Features

Auto-Generated Channels

For testing or ephemeral queues, you can omit the channel option:

const tempQueue = Queue<MyJob>(sql); // Auto-generates a unique channel ID

await tempQueue.enqueue({ data: 'test' });
await tempQueue.listen(async (payload) => {
  // Only this queue instance will receive these jobs
});

Broadcasting

Broadcasting allows sending messages to all active listeners simultaneously using the broadcast: true option.

interface WorkerCommand {
  type: 'shutdown' | 'pause' | 'resume' | 'process';
  data?: any;
}

const workerQueue = Queue<WorkerCommand>(sql, { 
  channel: 'workers' 
});

// Single listener handles both job processing and broadcast commands
await workerQueue.listen(async (payload, job) => {
  // job: { id: string (UUIDv7), type: 'queue' | 'broadcast', createdAt: Date }
  
  if (payload.type === 'shutdown') {
    console.log('Shutdown command received by all workers');
    console.log('Broadcast ID:', job.id); // Even broadcasts have IDs for tracing
    // Initiate graceful shutdown
    await cleanup();
    process.exit(0);
  }
  
  if (payload.type === 'pause') {
    console.log('Pausing job processing');
    // Set a flag to pause
    return;
  }
  
  if (payload.type === 'process') {
    // Normal job processing (job.type === 'queue' means only one worker handles this)
    console.log(`Processing ${job.type} job:`, job.id);
    await processWork(payload.data);
  }
});

// Queue a job for exactly-once processing
await workerQueue.enqueue({ type: 'process', data: { taskId: 123 } });
// ☝️ ONE worker will process this

// Broadcast to ALL active workers
await workerQueue.enqueue(
  { type: 'shutdown' },
  { broadcast: true }
);
// ☝️ ALL workers receive this immediately

// Another broadcast example: pause all workers
await workerQueue.enqueue(
  { type: 'pause' },
  { broadcast: true }
);

Use Cases for Broadcasting:

  • Graceful shutdown signals to all workers
  • Configuration updates pushed to all listeners
  • Rate limit notifications affecting all workers
  • Maintenance mode toggles

Key Differences:

Mode Delivery State Tracking Use Case
Queue (broadcast: false) Exactly-once (one worker via SKIP LOCKED) Full job lifecycle (pendingactivecompleted/failed) Normal job processing
Broadcast (broadcast: true) At-least-once (all active listeners via NOTIFY) Persisted in broadcasts (no status lifecycle) Signals, commands, notifications

Listener Heartbeat & Health Monitoring

Track active listeners using a heartbeat system with UUIDv7 for optimal read performance:

const emailQueue = Queue<EmailJob>(sql, { 
  channel: 'emails',
  heartbeat: {
    enabled: true,
    interval: 5000, // ms (default: 5s)
    timeout: 15000  // ms (default: 15s - mark stale after this)
  }
});

await emailQueue.listen(async (payload, job) => {
  // Heartbeat is automatically sent in the background
  // If this worker crashes, it will be marked as stale after 15s
});

// Query active listeners
const activeListeners = await sql`
  SELECT id, channel, last_seen_at 
  FROM active_listeners 
  WHERE channel = 'emails' 
  AND last_seen_at > now() - interval '15 seconds'
`;

console.log(`Active email workers: ${activeListeners.length}`);

Benefits:

  • Detect Dead Workers: Know when a worker has crashed or disconnected.
  • Load Balancing: See how many workers are processing each channel.
  • UUIDv7 Performance: The active_listeners table uses UUIDv7 as the primary key for sequential inserts and efficient time-ordered queries (leveraging PostgreSQL 17+ optimizations).

Delayed Jobs

Schedule jobs to run at a specific time in the future:

const emailQueue = Queue<EmailJob>(sql, { channel: 'emails' });

// Run immediately (default)
await emailQueue.enqueue({ to: 'alice@example.com', body: 'Now!' });

// Run in 1 hour
await emailQueue.enqueue(
  { to: 'bob@example.com', body: 'Later!' },
  { runAt: new Date(Date.now() + 3600000) }
);

// Run after a delay
await emailQueue.enqueue(
  { to: 'carol@example.com', body: 'After delay' },
  { delay: 60000 } // 60 seconds
);

Job Retries with Exponential Backoff

Automatically retry failed jobs with configurable backoff:

const reportQueue = Queue<ReportJob>(sql, { 
  channel: 'reports',
  retry: {
    maxAttempts: 3,
    backoff: 'exponential', // 1s, 2s, 4s, 8s...
    // or custom: backoff: (attempt) => attempt * 1000
  }
});

await reportQueue.listen(async (payload, job) => {
  // If this throws, the job will be retried up to 3 times
  await generateReport(payload);
});

// Check job status including retry info
const jobs = await sql`
  SELECT id, status, attempts, retry_after 
  FROM queue 
  WHERE channel = 'reports'
`;

Graceful Shutdown

Ensure in-flight jobs complete before the process exits:

const queue = Queue<MyJob>(sql, { channel: 'jobs' });

const listener = await queue.listen(async (payload, job) => {
  // Long-running job
  await processJob(payload);
});

// On SIGTERM/SIGINT
process.on('SIGTERM', async () => {
  console.log('Shutting down gracefully...');
  await listener.stop(); // Waits for current job to finish
  await sql.end();
  process.exit(0);
});

Configuration Reference

Complete Type Definitions

import type { Sql } from 'postgres';
import type { StandardSchemaV1 as StandardSchema } from '@standard-schema/spec';

// Queue factory function
function Queue<T>(
  sql: Sql,
  options?: QueueOptions
): QueueInstance<T>;

function Queue<TSchema extends StandardSchema>(
  sql: Sql,
  options: QueueOptions & { schema: TSchema }
): QueueInstance<StandardSchema.InferOutput<TSchema>>;

// Configuration options
interface QueueOptions {
  // Channel name (auto-generated UUID if omitted)
  channel?: string;
  
  // Optional runtime validation schema (Standard Schema-compliant)
  // Supports: Zod, Valibot, ArkType, TypeBox, Yup (with adapter)
  schema?: StandardSchema;

  // Optional listener heartbeat tracking
  heartbeat?: boolean | {
    enabled?: boolean; // default: true
    interval?: number; // default: 5000
    timeout?: number; // default: 15000 (must be > interval)
  };

  // Optional retry strategy for failed queue jobs
  retry?: {
    maxAttempts?: number; // default: 1
    backoff?: 'exponential' | ((attempt: number) => number);
    baseDelay?: number; // default: 1000
    maxDelay?: number; // default: 60000
  };

  // Optional listener hardening for crash recovery and missed notifications
  recovery?: boolean | {
    enabled?: boolean; // default: true
    leaseTimeoutMs?: number; // default: 300000 (5 min)
    reconcileIntervalMs?: number; // default: 1000
  };
}

// Enqueue options
interface EnqueueOptions {
  // Broadcast to all listeners instead of queueing (default: false)
  broadcast?: boolean;
  
  // Schedule for future execution
  runAt?: Date;
  delay?: number; // ms
}

// Queue instance
interface QueueInstance<T> {
  // Add a job to the queue or broadcast a message
  enqueue(payload: T, options?: EnqueueOptions): Promise<void>;

  // Backwards-compatible alias of enqueue
  push(payload: T, options?: EnqueueOptions): Promise<void>;
  
  // Listen for jobs and broadcasts
  listen(
    handler: (payload: T, job: Job) => Promise<void> | void
  ): Promise<{ stop: () => Promise<void>; unlisten: () => Promise<void> }>;
}

// Job metadata (minimal, always present)
interface Job {
  id: string; // UUIDv7
  type: 'queue' | 'broadcast';
  createdAt: Date;
}

// Validation error
class ValidationError extends Error {
  issues: ReadonlyArray<{ message: string; path?: ReadonlyArray<PropertyKey | { key: PropertyKey }> }>;
}

Example: Current Configuration

import postgres from 'postgres';
import Queue from 'postgres-js-queue';
import { z } from 'zod';

const emailJobSchema = z.object({
  to: z.string().email(),
  subject: z.string().min(1),
  body: z.string(),
  priority: z.enum(['low', 'medium', 'high']).default('medium')
});

const sql = postgres('postgres://...');

const emailQueue = Queue(sql, {
  channel: 'emails',
  schema: emailJobSchema
});

// Type is inferred as z.infer<typeof emailJobSchema>
await emailQueue.enqueue({
  to: 'user@example.com',
  subject: 'Welcome',
  body: 'Thanks for signing up!'
});

// Broadcast (no retry, no state tracking)
await emailQueue.enqueue(
  { to: 'all@example.com', subject: 'Maintenance', body: 'System down' },
  { broadcast: true }
);

const listener = await emailQueue.listen(async (payload, job) => {
  // job: { id: string (UUIDv7), type: 'queue' | 'broadcast', createdAt: Date }
  console.log('Sending email to:', payload.to);
  console.log('Job ID:', job.id, 'Type:', job.type);
  
  await sendEmail(payload);
});

// Stop listening when needed
process.on('SIGTERM', async () => {
  await listener.stop();
  await sql.end();
});

About

Postgres queuing library support both simple and complex queing

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors