-
Notifications
You must be signed in to change notification settings - Fork 0
Quick Start
This walkthrough takes you from nothing to a producer pushing messages and a worker consuming them. It uses the PDO transport so you need no broker — just a database (SQLite here).
A handler implements InitPHP\Queue\Contracts\Handler and is mapped to a message
URN. It receives a read-only BabelQueue\Contracts\InboundMessage. Return
to acknowledge; throw to fail (the worker retries, then dead-letters).
use BabelQueue\Contracts\InboundMessage;
use InitPHP\Queue\Contracts\Handler;
final class SendWelcomeEmail implements Handler
{
public function handle(InboundMessage $message): void
{
$data = $message->getData(); // ['user_id' => 42, 'email' => '...']
$traceId = $message->getTraceId(); // correlate with the producer
$meta = $message->getMeta(); // ['id' => ..., 'queue' => ..., 'lang' => ...]
// Do the work. An exception here marks the message as failed.
// mailer()->sendWelcome($data['email']);
}
}Idempotency. Delivery is at-least-once: a handler may run more than once for the same message (after a crash, for example). Make it safe to repeat.
use InitPHP\Queue\Transport\Pdo\PdoTransport;
$pdo = new PDO('sqlite:' . __DIR__ . '/queue.sqlite');
$transport = new PdoTransport($pdo, table: 'jobs');
$transport->createSchema(); // creates the jobs / jobs_failed tables (dev convenience)use InitPHP\Queue\Producer\Producer;
$producer = new Producer($transport, defaultQueue: 'emails');
$producer->send('urn:babel:users:registered', [
'user_id' => 42,
'email' => 'jane@example.com',
]);send() encodes the canonical envelope and publishes it. A consumer in any
language subscribed to the emails queue can read it.
Map URNs to handlers, wrap them in a Dispatcher, and run a Worker.
use InitPHP\Queue\Consumer\Dispatcher;
use InitPHP\Queue\Consumer\Worker;
use InitPHP\Queue\Consumer\WorkerOptions;
use InitPHP\Queue\Routing\HandlerMap;
$handlers = (new HandlerMap())
->register('urn:babel:users:registered', SendWelcomeEmail::class);
$worker = new Worker(
$transport,
new Dispatcher($handlers),
new WorkerOptions(maxAttempts: 3, backoff: [1, 5, 15]),
);
$worker->run('emails'); // loops, processing messages until stoppedTo process the queue once and exit (handy in tests or cron):
$worker->runOnce('emails'); // returns bool: was a message processed?
Put the wiring in a bootstrap file that returns a configured Worker, and run
it with the CLI:
// worker.php
require __DIR__ . '/vendor/autoload.php';
use InitPHP\Queue\Consumer\{Dispatcher, Worker, WorkerOptions};
use InitPHP\Queue\Routing\HandlerMap;
use InitPHP\Queue\Transport\Pdo\PdoTransport;
$transport = new PdoTransport(new PDO('sqlite:' . __DIR__ . '/queue.sqlite'), 'jobs');
$handlers = (new HandlerMap())->register('urn:babel:users:registered', SendWelcomeEmail::class);
return new Worker($transport, new Dispatcher($handlers), new WorkerOptions(maxAttempts: 3));php vendor/bin/queue work --bootstrap=worker.php --queue=emails
# or process exactly one message and exit:
php vendor/bin/queue work --bootstrap=worker.php --queue=emails --onceRun the worker under a process supervisor (systemd, supervisord, a container
restart policy) and set a maxJobs/memoryLimitMb limit so each worker exits and
is restarted periodically.
- Core Concepts — the three moving parts and how they interact.
- Handlers & Routing — closures, container resolution, unknown URNs.
- The Worker — every tuning knob.
- Transports — switch from PDO to Redis or RabbitMQ.
InitPHP Queue · GitHub · Packagist · BabelQueue standard · MIT License
Getting Started
Messages
Consuming
Transports
Guides
Other