Swarrot is PHP library to consume messages from a broker.
The recommended way to install Swarrot is through
Composer. Require the swarrot/swarrot
package
into your composer.json
file:
{
"require": {
"swarrot/swarrot": "@stable"
}
}
Protip: you should browse the
swarrot/swarrot
page to choose a stable version to use, avoid the @stable
meta constraint.
First, you need to create a message provider to retrieve message from you're broker. For example, with A PeclPackageMessageProvider (retrieve message from an AMQP broker with the pecl amqp package:
use Swarrot\Broker\PeclPackageMessageProvider;
// Create connection
$connection = new \AMQPConnection();
$connection->connect();
$channel = new \AMQPChannel($connection);
// Get the queue to consume
$queue = new \AMQPQueue($channel);
$queue->setName('global');
$messageProvider = new PeclPackageMessageProvider($queue);
Once it's done you need to create a Processor to process messages retrieved
from the broker. This processor must implement
Swarrot\Processor\ProcessorInterface
. For example:
use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;
class Processor implements ProcessorInterface {
public function process(Message $message, array $options) {
echo sprintf("Consume message #%d\n", $message->getId());
}
}
You now have a Swarrot\Broker\MessageProviderInterface
to retrieve messages
and a Processor to process them. So, ask the Swarrot\Consumer
to do it's job :
use Swarrot\Message;
$consumer = new Consumer($messageProvider, $processor);
$consumer->consume();
Using the built in processors or by creating your own, you can extend the behavior of your processor. Let's imagine you want to catch exception during execution to avoid the consumer to stop in production environment, you can use the ExceptionCatcherProcessor like this:
use Swarrot\Processor\ExceptionCatcherProcessor;
use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;
class Processor implements ProcessorInterface {
public function process(Message $message, array $options) {
echo sprintf("Consume message #%d\n", $message->getId());
}
}
$processor = new ExceptionCatcherProcessor(new Processor());
Take a look at this processor's code. It just decorate your own processor with a try/catch block.
Heavily inspired by stackphp/builder you
can use Swarrot\Processor\Stack\Builder
to stack your processors.
Because it can be annoying to chain all you're processors, you can use the
Builder like this:
use Swarrot\Processor\ProcessorInterface;
use Swarrot\Broker\Message;
class Processor implements ProcessorInterface {
public function process(Message $message, array $options) {
echo sprintf("Consume message #%d\n", $message->getId());
}
}
$stack = (new \Swarrot\Processor\Stack\Builder())
->push('Swarrot\Processor\ExceptionCatcherProcessor')
->push('Swarrot\Processor\MaxMessagesProcessor', new Logger())
;
$processor = $stack->resolve(new Processor());
- AckProcessor
- ExceptionCatcherProcessor
- InstantRetryProcessor
- MaxExecutionTimeProcessor (thanks to Remy Lemeunier)
- MaxMessagesProcessor (thanks to Remy Lemeunier)
- RetryProcessor
- SignalHandlerProcessor
To create your own processor and be able to use it with the StackProcessor, you
just need to implement ProcessorInterface
and to take another
ProcessorInterface
as first argument in constructor.
Swarrot is released under the MIT License. See the bundled LICENSE file for details.