Skip to content
Ray Fung edited this page Feb 26, 2026 · 3 revisions

Queue

Razy provides a job queue system for deferring time-consuming tasks (sending emails, processing images, generating reports) to be executed asynchronously. Jobs are dispatched to a persistent store and processed by a worker loop.


Table of Contents


Quick Start

use Razy\Queue\QueueManager;
use Razy\Queue\Store\DatabaseStore;
use Razy\Queue\JobHandlerInterface;

// 1. Create a store
$store = new DatabaseStore($database);
$store->ensureStorage();  // create razy_jobs table if needed

// 2. Create queue manager
$queue = new QueueManager($store);

// 3. Register a handler
$queue->registerHandler('send_email', SendEmailHandler::class);

// 4. Dispatch a job
$queue->dispatch('send_email', [
    'to'      => 'alice@example.com',
    'subject' => 'Welcome!',
    'body'    => 'Thanks for signing up.',
]);

// 5. Process pending jobs
$queue->process();

Architecture

┌─────────────┐     dispatch()     ┌──────────────┐     push()     ┌────────────────┐
│  Application │ ──────────────── → │ QueueManager │ ──────────── → │  QueueStore    │
│  Code        │                    │              │                │  (Database)    │
└─────────────┘                    │              │                └────────────────┘
                                   │  process()   │     reserve()        ↑
                                   │ ─────────── →│ ── ─── ── ─── ── ── ┘
                                   │              │
                                   │  resolve     │     ┌────────────────┐
                                   │  handler ── →│ ── →│ JobHandler     │
                                   │              │     │ handle($job)   │
                                   └──────────────┘     └────────────────┘

QueueManager

The central class that coordinates dispatching, processing, and event handling.

Dispatching Jobs

$queue = new QueueManager($store);

// Dispatch to queue (processed later)
$queue->dispatch('send_email', [
    'to'      => 'bob@example.com',
    'subject' => 'Invoice #123',
]);

// Dispatch with delay (seconds)
$queue->dispatch('send_reminder', ['user_id' => 42], delay: 3600);

// Dispatch with max attempts
$queue->dispatch('process_image', ['path' => '/uploads/photo.jpg'], maxAttempts: 5);

// Dispatch and execute immediately (synchronous)
$queue->dispatchNow('send_email', [
    'to'      => 'urgent@example.com',
    'subject' => 'Alert!',
]);

Processing Jobs

// Process all pending jobs (FIFO)
$processed = $queue->process();
// Returns number of jobs processed

// Process a batch of N jobs
$processed = $queue->processBatch(limit: 10);

// Process a specific job by ID
$queue->processJob($jobId);

The processing flow for each job:

  1. Reserve — mark job status as Reserved (prevents other workers picking it up)
  2. Resolve handler — find the registered handler class for the job type
  3. Execute — call $handler->handle($job)
  4. Complete — on success, mark status as Completed
  5. Fail — on exception, call $handler->failed($job, $e), then:
    • If attempts < maxAttempts → Release back to queue (status Pending)
    • If exhausted → Bury the job (status Buried)

Managing Jobs

// Find a job by ID
$job = $queue->find($jobId);

// Count pending jobs
$pending = $queue->count();
$pending = $queue->count(status: JobStatus::Pending);

// Delete a specific job
$queue->delete($jobId);

// Clear all jobs (or by status)
$queue->clear();
$queue->clear(status: JobStatus::Buried);

// Ensure storage exists (create tables)
$queue->ensureStorage();

Events

$queue->on('dispatched', function (Job $job) {
    Log::info("Job dispatched: {$job->type}");
});

$queue->on('reserved', function (Job $job) {
    Log::info("Job reserved: {$job->id}");
});

$queue->on('completed', function (Job $job) {
    Log::info("Job completed: {$job->id}");
});

$queue->on('failed', function (Job $job, \Throwable $e) {
    Log::error("Job failed: {$job->id}{$e->getMessage()}");
});

$queue->on('buried', function (Job $job) {
    Log::warning("Job buried: {$job->id} (exhausted attempts)");
});

$queue->on('released', function (Job $job) {
    Log::info("Job released for retry: {$job->id}, attempt {$job->attempts}");
});

Job Handlers

JobHandlerInterface

Every handler must implement this interface:

use Razy\Queue\JobHandlerInterface;
use Razy\Queue\Job;

class SendEmailHandler implements JobHandlerInterface
{
    public function handle(Job $job): void
    {
        $payload = $job->payload;
        
        $mailer = new Mailer();
        $mailer->send(
            to:      $payload['to'],
            subject: $payload['subject'],
            body:    $payload['body'] ?? '',
        );
    }

    public function failed(Job $job, \Throwable $e): void
    {
        // Called when the job fails
        Log::error("Email to {$job->payload['to']} failed: {$e->getMessage()}");
        
        // Optionally notify admin
        Notification::send('admin@example.com', "Job #{$job->id} failed");
    }
}

Handler Resolution

Register handlers by job type:

// String class name (instantiated on demand)
$queue->registerHandler('send_email', SendEmailHandler::class);

// Closure handler
$queue->registerHandler('log_event', function (Job $job) {
    file_put_contents('events.log', json_encode($job->payload) . "\n", FILE_APPEND);
});

// With dependency injection (custom resolver)
$queue->setHandlerResolver(function (string $handlerClass) use ($container) {
    return $container->get($handlerClass);
});

Job Value Object

The Job class is an immutable value object representing a queued job. It carries 13 properties:

use Razy\Queue\Job;

$job->id;           // string — unique identifier
$job->type;         // string — handler type key (e.g., 'send_email')
$job->payload;      // array  — job data
$job->status;       // JobStatus enum
$job->attempts;     // int    — current attempt count
$job->maxAttempts;  // int    — maximum attempts before burying
$job->createdAt;    // string — ISO timestamp
$job->updatedAt;    // string — ISO timestamp
$job->reservedAt;   // ?string
$job->completedAt;  // ?string
$job->failedAt;     // ?string
$job->delay;        // int    — delay in seconds
$job->error;        // ?string — last error message

State Transitions

$job->incrementAttempts();       // attempts++
$job->hasExhaustedAttempts();    // attempts >= maxAttempts
$job->markReserved();            // status = Reserved, reservedAt = now
$job->markCompleted();           // status = Completed, completedAt = now
$job->markFailed(\Throwable $e); // status = Failed, failedAt = now, error = message
$job->markBuried();              // status = Buried

Job Status Lifecycle

   dispatch()
      │
      ▼
  ┌─────────┐
  │ Pending  │ ◄──── release (retry)
  └────┬─────┘
       │ reserve()
       ▼
  ┌──────────┐
  │ Reserved │
  └────┬─────┘
       │
  ┌────┴────┐
  │         │
  ▼ success ▼ failure
┌───────┐ ┌────────┐
│Compltd│ │ Failed │
└───────┘ └───┬────┘
              │
        ┌─────┴─────┐
        │            │
        ▼ retry      ▼ exhausted
    (Pending)    ┌────────┐
                 │ Buried │
                 └────────┘

Serialisation

$array = $job->toArray();
$restored = Job::fromArray($array);

Queue Stores

DatabaseStore

Persists jobs in a database table using the Razy Database driver.

use Razy\Queue\Store\DatabaseStore;

$store = new DatabaseStore($database);

// Create the table (if not exists)
$store->ensureStorage();

Table schema (razy_jobs):

Column Type Description
id VARCHAR(36) PK UUID
type VARCHAR(255) Handler type
payload TEXT JSON payload
status VARCHAR(20) Pending/Reserved/Completed/Failed/Buried
attempts INT Attempt counter
max_attempts INT Max allowed attempts
delay INT Delay in seconds
error TEXT Last error message
created_at DATETIME
updated_at DATETIME
reserved_at DATETIME
completed_at DATETIME
failed_at DATETIME

The DatabaseStore uses LRU reservation: when reserving a job, it selects the oldest pending job whose delay has elapsed, locks it with FOR UPDATE (MySQL/PostgreSQL) or equivalent, and updates its status atomically.

Custom Store

Implement QueueStoreInterface for other backends (Redis, SQS, etc.):

use Razy\Queue\QueueStoreInterface;
use Razy\Queue\Job;
use Razy\Queue\JobStatus;

class RedisStore implements QueueStoreInterface
{
    public function push(Job $job): void
    {
        $this->redis->hSet('jobs', $job->id, json_encode($job->toArray()));
        $this->redis->zAdd('pending', $job->createdAt, $job->id);
    }

    public function reserve(): ?Job
    {
        // Pop oldest from sorted set
        $id = $this->redis->zPopMin('pending');
        if (!$id) return null;
        
        $data = json_decode($this->redis->hGet('jobs', $id), true);
        $job = Job::fromArray($data);
        $job->markReserved();
        $this->redis->hSet('jobs', $id, json_encode($job->toArray()));
        return $job;
    }

    public function complete(Job $job): void { /* ... */ }
    public function release(Job $job): void { /* ... */ }
    public function bury(Job $job): void { /* ... */ }
    public function delete(string $id): void { /* ... */ }
    public function find(string $id): ?Job { /* ... */ }
    public function count(?JobStatus $status = null): int { /* ... */ }
    public function clear(?JobStatus $status = null): void { /* ... */ }
    public function ensureStorage(): void { /* ... */ }
}

Worker Loop Example

A simple CLI worker that processes jobs continuously:

#!/usr/bin/env php
<?php
require __DIR__ . '/vendor/autoload.php';

use Razy\Queue\QueueManager;
use Razy\Queue\Store\DatabaseStore;

$db = new \Razy\Database('mysql:host=127.0.0.1;dbname=app', 'root', 'secret');
$store = new DatabaseStore($db);
$queue = new QueueManager($store);

// Register handlers
$queue->registerHandler('send_email', SendEmailHandler::class);
$queue->registerHandler('process_image', ProcessImageHandler::class);

// Event logging
$queue->on('completed', fn($job) => echo "[OK] {$job->type} #{$job->id}\n");
$queue->on('failed', fn($job, $e) => echo "[FAIL] {$job->type}: {$e->getMessage()}\n");
$queue->on('buried', fn($job) => echo "[BURIED] {$job->type} #{$job->id}\n");

echo "Worker started. Listening for jobs...\n";

while (true) {
    $processed = $queue->processBatch(limit: 10);
    
    if ($processed === 0) {
        // No work — sleep to avoid busy loop
        usleep(500_000);  // 500ms
    }
}

Supervisor Configuration

For production, use a process manager like Supervisor:

[program:razy-worker]
command=php /var/www/app/worker.php
autostart=true
autorestart=true
numprocs=2
stderr_logfile=/var/log/razy-worker.err.log
stdout_logfile=/var/log/razy-worker.out.log

API Reference

QueueManager

Method Signature Returns
__construct (QueueStoreInterface $store)
dispatch (string $type, array $payload, int $delay = 0, int $maxAttempts = 3): Job Queued job
dispatchNow (string $type, array $payload): void Synchronous execution
process (): int Jobs processed
processBatch (int $limit): int Jobs processed
processJob (string $id): void Process specific job
find (string $id): ?Job Find by ID
count (?JobStatus $status = null): int Job count
delete (string $id): void Remove job
clear (?JobStatus $status = null): void Remove jobs
ensureStorage (): void Create storage
registerHandler (string $type, string|Closure $handler): void Register handler
setHandlerResolver (Closure $resolver): void DI resolver
on (string $event, Closure $listener): void Register event

Job

Property Type Description
id string UUID
type string Handler type key
payload array Job data
status JobStatus Current status
attempts int Attempts so far
maxAttempts int Max attempts
delay int Delay (seconds)
error ?string Last error
createdAt string Created timestamp
updatedAt string Updated timestamp
reservedAt ?string Reserved timestamp
completedAt ?string Completed timestamp
failedAt ?string Failed timestamp
Method Signature Returns
incrementAttempts (): void Bump counter
hasExhaustedAttempts (): bool At limit?
markReserved (): void Set Reserved
markCompleted (): void Set Completed
markFailed (\Throwable $e): void Set Failed
markBuried (): void Set Buried
toArray (): array Serialise
fromArray (array $data): static Deserialise

JobStatus Enum

Case Value
Pending 'pending'
Reserved 'reserved'
Completed 'completed'
Failed 'failed'
Buried 'buried'

QueueStoreInterface

Method Signature Returns
push (Job $job): void Add to store
reserve (): ?Job Reserve next
complete (Job $job): void Mark done
release (Job $job): void Back to queue
bury (Job $job): void Mark buried
delete (string $id): void Remove
find (string $id): ?Job Lookup
count (?JobStatus $status): int Count
clear (?JobStatus $status): void Purge
ensureStorage (): void Init storage

ORM HttpClient

Clone this wiki locally