Implementations of PMG\Queue\Consumer
pull message out of a driver backend and handle (process) them in some way. The default consumer accomplishes this a message handler <handlers>
.
In all cases $queueName
in the consume should correspond to queues into which your producer <producers>
put messages.
The script to run your consumer might look something like this. Check out the handlers <handlers>
documentation for more information about what $handler
is below.
<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Driver\MemoryDriver;
$driver = new MemoryDriver();
/** @var PMG\Queue\MessageHandler $handler */
$consumer = new DefaultConsumer($driver, $handler);
exit($consumer->run(isset($argv[1]) ? $argv[1] : 'defaultQueue'));
When a message fails -- by throwing an exception or returns false from a MessageHandler
-- the consumer puts it back in the queue to retry up to 5 times by default. This behavior can be adjusted by providing a RetrySpec
as the third argument to DefaultConsumers
constructor. pmg/queue provides a few by default.
Retry specs look at PMG\Queue\Envelope
instances, not raw messages. See the internals documentation <envelopes>
for more info about them.
Use PMG\\Queue\\Retry\\LimitedSpec
.
<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\LimitedSpec;
// five retries by default. This is what the consumer does automatically
$retry = new LimitedSpec();
// Or limit to a specific number of retries
$retry = new LimitedSpec(2);
// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);
Sometimes you don't want to retry a message, for those cases use PMG\\Queue\\Retry\\NeverSpec
.
<?php
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Retry\NeverSpec;
$retry = new NeverSpec();
// $driver and $handler as above
$consumer = new DefaultConsumer($driver, $handler, $retry);
When something goes wrong DefaultConsumer
logs it with a PSR-3 Logger implementation. The default is to use a NullLogger, but you can provide your own logger as the fourth argument to DefaultConsumer
's constructor.
<?php
use PMG\Queue\DefaultConsumer;
$monolog = new Monolog\Logger('yourApp');
// $driver, $handler, $retry as above
$consumer = new DefaultConsumer($driver, $handler, $retry, $monolog);
Extend PMG\\Queue\\AbstractConsumer
to make things easy and only have to implement the once
method. Here's an example that decorates another Consumer
with events.
<?php
use PMG\Queue\AbstractConsumer;
use PMG\Queue\Consumer;
use PMG\Queue\Message;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
final class EventingConsumer extends AbstractConsumer
{
/** @var Consumer */
private $wrapped;
/** @var EventDispatcherInterface $events */
// constructor that takes a consumer and dispatcher to set the props ^
public function once($queueName)
{
$this->events->dispatch('queue:before_once', new Event());
$this->wrapped->once($queueName);
$this->events->disaptch('queue:after_once', new Event());
}
}