Skip to content
Ray Fung edited this page Feb 26, 2026 · 3 revisions

Queue

Razy provides a job queue system for deferring time-consuming tasks (sending emails, processing images, generating reports) to be executed asynchronously. Jobs are dispatched to a persistent store and processed by a worker loop.


Table of Contents


Quick Start

use Razy\Queue\QueueManager;

use Razy\Queue\Store\DatabaseStore;

use Razy\Queue\JobHandlerInterface;



// 1. Create a store

$store = new DatabaseStore($database);

$store->ensureStorage();  // create razy_jobs table if needed



// 2. Create queue manager

$queue = new QueueManager($store);



// 3. Register a handler

$queue->registerHandler('send_email', SendEmailHandler::class);



// 4. Dispatch a job

$queue->dispatch('send_email', [

    'to'      => 'alice@example.com',

    'subject' => 'Welcome!',

    'body'    => 'Thanks for signing up.',

]);



// 5. Process pending jobs

$queue->process();

Architecture


│?→→→→→→→→→→→→→    dispatch()     →?│→→→→→→→→→→→→→    push()     →?→→→→→→→→→→→→→→→│

│ Application →→→→→→→→→→→→→→→→→ →→QueueManager →→→→→→→→→→→→→ →→ QueueStore    │

│ Code        →                   →             →               → (Database)    │

→?→→→→→→→→→→→→→                   →             →               →?→→→→→→→→→→→→→→→→

                                   │ process()   →    reserve()        │

┌───────────────────────────────────┐

                                   │             │

                                   │ resolve     →    →?→→→→→→→→→→→→→→→│

                                   │ handler →→ →? →→ →? JobHandler     │

                                   │             →    →handle($job)   │

┌───────────────────────────────────┐


QueueManager

The central class that coordinates dispatching, processing, and event handling.

Dispatching Jobs

$queue = new QueueManager($store);



// Dispatch to queue (processed later)

$queue->dispatch('send_email', [

    'to'      => 'bob@example.com',

    'subject' => 'Invoice #123',

]);



// Dispatch with delay (seconds)

$queue->dispatch('send_reminder', ['user_id' => 42], delay: 3600);



// Dispatch with max attempts

$queue->dispatch('process_image', ['path' => '/uploads/photo.jpg'], maxAttempts: 5);



// Dispatch and execute immediately (synchronous)

$queue->dispatchNow('send_email', [

    'to'      => 'urgent@example.com',

    'subject' => 'Alert!',

]);

Processing Jobs

// Process all pending jobs (FIFO)

$processed = $queue->process();

// Returns number of jobs processed



// Process a batch of N jobs

$processed = $queue->processBatch(limit: 10);



// Process a specific job by ID

$queue->processJob($jobId);

The processing flow for each job:

  1. Reserve — mark job status as Reserved (prevents other workers picking it up)

  2. Resolve handler — find the registered handler class for the job type

  3. Execute — call $handler->handle($job)

  4. Complete — on success, mark status as Completed

  5. Fail — on exception, call $handler->failed($job, $e), then:

    • If attempts < maxAttempts → Release back to queue (status Pending)

    • If exhausted → Bury the job (status Buried)

Managing Jobs

// Find a job by ID

$job = $queue->find($jobId);



// Count pending jobs

$pending = $queue->count();

$pending = $queue->count(status: JobStatus::Pending);



// Delete a specific job

$queue->delete($jobId);



// Clear all jobs (or by status)

$queue->clear();

$queue->clear(status: JobStatus::Buried);



// Ensure storage exists (create tables)

$queue->ensureStorage();

Events

$queue->on('dispatched', function (Job $job) {

    Log::info("Job dispatched: {$job->type}");

});



$queue->on('reserved', function (Job $job) {

    Log::info("Job reserved: {$job->id}");

});



$queue->on('completed', function (Job $job) {

    Log::info("Job completed: {$job->id}");

});



$queue->on('failed', function (Job $job, \Throwable $e) {

    Log::error("Job failed: {$job->id}{$e->getMessage()}");

});



$queue->on('buried', function (Job $job) {

    Log::warning("Job buried: {$job->id} (exhausted attempts)");

});



$queue->on('released', function (Job $job) {

    Log::info("Job released for retry: {$job->id}, attempt {$job->attempts}");

});

Job Handlers

JobHandlerInterface

Every handler must implement this interface:

use Razy\Queue\JobHandlerInterface;

use Razy\Queue\Job;



class SendEmailHandler implements JobHandlerInterface

{

    public function handle(Job $job): void

    {

        $payload = $job->payload;

        

        $mailer = new Mailer();

        $mailer->send(

            to:      $payload['to'],

            subject: $payload['subject'],

            body:    $payload['body'] ?? '',

        );

    }



    public function failed(Job $job, \Throwable $e): void

    {

        // Called when the job fails

        Log::error("Email to {$job->payload['to']} failed: {$e->getMessage()}");

        

        // Optionally notify admin

        Notification::send('admin@example.com', "Job #{$job->id} failed");

    }

}

Handler Resolution

Register handlers by job type:

// String class name (instantiated on demand)

$queue->registerHandler('send_email', SendEmailHandler::class);



// Closure handler

$queue->registerHandler('log_event', function (Job $job) {

    file_put_contents('events.log', json_encode($job->payload) . "\n", FILE_APPEND);

});



// With dependency injection (custom resolver)

$queue->setHandlerResolver(function (string $handlerClass) use ($container) {

    return $container->get($handlerClass);

});

Job Value Object

The Job class is an immutable value object representing a queued job. It carries 13 properties:

use Razy\Queue\Job;



$job->id;           // string → unique identifier

$job->type;         // string → handler type key (e.g., 'send_email')

$job->payload;      // array  → job data

$job->status;       // JobStatus enum

$job->attempts;     // int    → current attempt count

$job->maxAttempts;  // int    → maximum attempts before burying

$job->createdAt;    // string → ISO timestamp

$job->updatedAt;    // string → ISO timestamp

$job->reservedAt;   // ?string

$job->completedAt;  // ?string

$job->failedAt;     // ?string

$job->delay;        // int    → delay in seconds

$job->error;        // ?string → last error message

State Transitions

$job->incrementAttempts();       // attempts++

$job->hasExhaustedAttempts();    // attempts >= maxAttempts

$job->markReserved();            // status = Reserved, reservedAt = now

$job->markCompleted();           // status = Completed, completedAt = now

$job->markFailed(\Throwable $e); // status = Failed, failedAt = now, error = message

$job->markBuried();              // status = Buried

Job Status Lifecycle


   dispatch()

      │

      │

      ┌┴┴──┴──┴─┐

  │Pending  │→?→→→ release (retry)

      └┬┬──┬──┬─┘

       →reserve()

       │

  ┌───┴┴┴──┴┐

  │Reserved │

  └───┬┬┬──┬┘

       │

      ┌┴┴──┴──┴─┐

  │        │

  →success →failure

  ┌─────────┐

│Compltd→→Failed │

  └───┬┬┬──┬┘

              │

      ┌┴┴──┴──┴─┐

        │           │

        →retry      →exhausted

    (Pending)    →?→→→→→→→→

                 │Buried │

      ┌┴┴──┴──┴─┐

Serialisation

$array = $job->toArray();

$restored = Job::fromArray($array);

Queue Stores

DatabaseStore

Persists jobs in a database table using the Razy Database driver.

use Razy\Queue\Store\DatabaseStore;



$store = new DatabaseStore($database);



// Create the table (if not exists)

$store->ensureStorage();

Table schema (razy_jobs):

| Column | Type | Description |

| --- | --- | --- |

| id | VARCHAR(36) PK | UUID |

| type | VARCHAR(255) | Handler type |

| payload | TEXT | JSON payload |

| status | VARCHAR(20) | Pending/Reserved/Completed/Failed/Buried |

| attempts | INT | Attempt counter |

| max_attempts | INT | Max allowed attempts |

| delay | INT | Delay in seconds |

| error | TEXT | Last error message |

| created_at | DATETIME | |

| updated_at | DATETIME | |

| reserved_at | DATETIME | |

| completed_at | DATETIME | |

| failed_at | DATETIME | |

The DatabaseStore uses LRU reservation: when reserving a job, it selects the oldest pending job whose delay has elapsed, locks it with FOR UPDATE (MySQL/PostgreSQL) or equivalent, and updates its status atomically.

Custom Store

Implement QueueStoreInterface for other backends (Redis, SQS, etc.):

use Razy\Queue\QueueStoreInterface;

use Razy\Queue\Job;

use Razy\Queue\JobStatus;



class RedisStore implements QueueStoreInterface

{

    public function push(Job $job): void

    {

        $this->redis->hSet('jobs', $job->id, json_encode($job->toArray()));

        $this->redis->zAdd('pending', $job->createdAt, $job->id);

    }



    public function reserve(): ?Job

    {

        // Pop oldest from sorted set

        $id = $this->redis->zPopMin('pending');

        if (!$id) return null;

        

        $data = json_decode($this->redis->hGet('jobs', $id), true);

        $job = Job::fromArray($data);

        $job->markReserved();

        $this->redis->hSet('jobs', $id, json_encode($job->toArray()));

        return $job;

    }



    public function complete(Job $job): void { /* ... */ }

    public function release(Job $job): void { /* ... */ }

    public function bury(Job $job): void { /* ... */ }

    public function delete(string $id): void { /* ... */ }

    public function find(string $id): ?Job { /* ... */ }

    public function count(?JobStatus $status = null): int { /* ... */ }

    public function clear(?JobStatus $status = null): void { /* ... */ }

    public function ensureStorage(): void { /* ... */ }

}

Worker Loop Example

A simple CLI worker that processes jobs continuously:

#!/usr/bin/env php

<?php

require __DIR__ . '/vendor/autoload.php';



use Razy\Queue\QueueManager;

use Razy\Queue\Store\DatabaseStore;



$db = new \Razy\Database('mysql:host=127.0.0.1;dbname=app', 'root', 'secret');

$store = new DatabaseStore($db);

$queue = new QueueManager($store);



// Register handlers

$queue->registerHandler('send_email', SendEmailHandler::class);

$queue->registerHandler('process_image', ProcessImageHandler::class);



// Event logging

$queue->on('completed', fn($job) => echo "[OK] {$job->type} #{$job->id}\n");

$queue->on('failed', fn($job, $e) => echo "[FAIL] {$job->type}: {$e->getMessage()}\n");

$queue->on('buried', fn($job) => echo "[BURIED] {$job->type} #{$job->id}\n");



echo "Worker started. Listening for jobs...\n";



while (true) {

    $processed = $queue->processBatch(limit: 10);

    

    if ($processed === 0) {

        // No work → sleep to avoid busy loop

        usleep(500_000);  // 500ms

    }

}

Supervisor Configuration

For production, use a process manager like Supervisor:

[program:razy-worker]

command=php /var/www/app/worker.php

autostart=true

autorestart=true

numprocs=2

stderr_logfile=/var/log/razy-worker.err.log

stdout_logfile=/var/log/razy-worker.out.log

API Reference

QueueManager

| Method | Signature | Returns |

| --- | --- | --- |

| __construct | (QueueStoreInterface $store) | |

| dispatch | (string $type, array $payload, int $delay = 0, int $maxAttempts = 3): Job | Queued job |

| dispatchNow | (string $type, array $payload): void | Synchronous execution |

| process | (): int | Jobs processed |

| processBatch | (int $limit): int | Jobs processed |

| processJob | (string $id): void | Process specific job |

| find | (string $id): ?Job | Find by ID |

| count | (?JobStatus $status = null): int | Job count |

| delete | (string $id): void | Remove job |

| clear | (?JobStatus $status = null): void | Remove jobs |

| ensureStorage | (): void | Create storage |

| registerHandler | (string $type, string\|Closure $handler): void | Register handler |

| setHandlerResolver | (Closure $resolver): void | DI resolver |

| on | (string $event, Closure $listener): void | Register event |

Job

| Property | Type | Description |

| --- | --- | --- |

| id | string | UUID |

| type | string | Handler type key |

| payload | array | Job data |

| status | JobStatus | Current status |

| attempts | int | Attempts so far |

| maxAttempts | int | Max attempts |

| delay | int | Delay (seconds) |

| error | ?string | Last error |

| createdAt | string | Created timestamp |

| updatedAt | string | Updated timestamp |

| reservedAt | ?string | Reserved timestamp |

| completedAt | ?string | Completed timestamp |

| failedAt | ?string | Failed timestamp |

| Method | Signature | Returns |

| --- | --- | --- |

| incrementAttempts | (): void | Bump counter |

| hasExhaustedAttempts | (): bool | At limit? |

| markReserved | (): void | Set Reserved |

| markCompleted | (): void | Set Completed |

| markFailed | (\Throwable $e): void | Set Failed |

| markBuried | (): void | Set Buried |

| toArray | (): array | Serialise |

| fromArray | (array $data): static | Deserialise |

JobStatus Enum

| Case | Value |

| --- | --- |

| Pending | 'pending' |

| Reserved | 'reserved' |

| Completed | 'completed' |

| Failed | 'failed' |

| Buried | 'buried' |

QueueStoreInterface

| Method | Signature | Returns |

| --- | --- | --- |

| push | (Job $job): void | Add to store |

| reserve | (): ?Job | Reserve next |

| complete | (Job $job): void | Mark done |

| release | (Job $job): void | Back to queue |

| bury | (Job $job): void | Mark buried |

| delete | (string $id): void | Remove |

| find | (string $id): ?Job | Lookup |

| count | (?JobStatus $status): int | Count |

| clear | (?JobStatus $status): void | Purge |

| ensureStorage | (): void | Init storage |

← Previous: ORM

HttpClient

Clone this wiki locally