Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,94 @@ Add connection to `config/queue.php`:
],
```

### Optional Config

Optionally add queue options to the config of a connection.
Every queue created for this connection, get's the properties.

When you want to prioritize messages when they were delayed, then this is possible by adding extra options.
- When max-priority is omitted, the max priority is set with 2 when used.

```php
'connections' => [
// ...

'rabbitmq' => [
// ...

'options' => [
'queue' => [
// ...

'prioritize_delayed_messages' => false,
'queue_max_priority' => 10,
],
],
],

// ...
],
```

When you want to publish messages against an exchange with routing-key's, then this is possible by adding extra options.
- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key
- When routing-key is omitted the routing-key by default is the `queue` name.
- When using `%s` in the routing-key the queue_name will be substituted.

> Note: when using exchange with routing-key, u probably create your queues with bindings yourself.

```php
'connections' => [
// ...

'rabbitmq' => [
// ...

'options' => [
'queue' => [
// ...

'exchange' => 'application-x',
'exchange_type' => 'topic',
'exchange_routing_key' => '',
],
],
],

// ...
],
```

In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do something with the message.
When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible by adding extra options.
- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key
- When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`.
- When using `%s` in the routing-key the queue_name will be substituted.

> Note: When using failed_job exchange with routing-key, u probably need to create your exchange/queue with bindings yourself.

```php
'connections' => [
// ...

'rabbitmq' => [
// ...

'options' => [
'queue' => [
// ...

'reroute_failed' => true,
'failed_exchange' => 'failed-exchange',
'failed_routing_key' => 'application-x.%s',
],
],
],

// ...
],
```

## Laravel Usage

Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues
Expand Down
40 changes: 0 additions & 40 deletions config/rabbitmq.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,4 @@
*/
'worker' => env('RABBITMQ_WORKER', 'default'),

/*
* ## Manage the delay strategy from the config.
*
* The delay strategy can be set to:
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\DlxDelayStrategy::class
*
* ### Backoff Strategy
*
* The `DlxDelayStrategy` is BackoffAware and by default a ConstantBackoffStrategy is assigned.
* This ensures the same behavior as if the `RabbitMqDlxDelayStrategy` was assigned.
*
* You can assign different backoffStrategies with extra options, for example:
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\LinearBackoffStrategy::class
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ExponentialBackoffStrategy::class
* - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\PolynomialBackoffStrategy::class
*
* The options must be an array of key -> value.
*
* For reference about RabbitMQ backoff strategy, see the following article:
* https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81
*
* ### First-in First-out concept
*
* U can easily prioritize delayed messages. When set to `true` a message will be set with a higher priority.
* This means that delayed messages are handled first when returning to the queue.
*
* This is useful when your queue has allot of jobs, and you want to make sure, a job will be handled
* as soon as possible. This way RabbitMq handles the jobs and the way they are consumed by workers.
*
*/
'delay' => [
'strategy' => env('RABBITMQ_DELAY_STRATEGY', \Enqueue\AmqpTools\RabbitMqDlxDelayStrategy::class),
'backoff' => [
'strategy' => env('RABBITMQ_DELAY_BACKOFF_STRATEGY', \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class),
'options' => [],
],
'prioritize'=> env('RABBITMQ_DELAY_PRIORITIZE'),
],

];
2 changes: 1 addition & 1 deletion src/Console/ExchangeDeclareCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public function handle(RabbitMQConnector $connector): void
(bool) $this->option('auto-delete')
);

$this->warn('Exchange declared successfully.');
$this->info('Exchange declared successfully.');
}
}
34 changes: 24 additions & 10 deletions src/Queue/Connectors/RabbitMQConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public function __construct(Dispatcher $dispatcher)
*/
public function connect(array $config): Queue
{
$connection = $this->createConnection($config);
$connection = $this->createConnection(Arr::except($config, 'options.queue'));

$queue = $this->createQueue(
Arr::get($config, 'worker', 'default'),
$connection,
$config['queue']
$config['queue'],
Arr::get($config, 'options.queue', [])
);

if (! $queue instanceof RabbitMQQueue) {
Expand Down Expand Up @@ -71,29 +72,42 @@ protected function createConnection(array $config): AbstractConnection
/** @var AbstractConnection $connection */
$connection = Arr::get($config, 'connection', AMQPLazyConnection::class);

$hosts = Arr::shuffle(Arr::get($config, 'hosts', []));

// manually disable heartbeat so long-running tasks will not fail
$config['options']['heartbeat'] = 0;
Arr::set($config, 'options.heartbeat', 0);

return $connection::create_connection(
$hosts,
Arr::shuffle(Arr::get($config, 'hosts', [])),
$this->filter(Arr::get($config, 'options', []))
);
}

protected function createQueue(string $worker, AbstractConnection $connection, string $queue)
/**
* Create a queue for the worker.
*
* @param string $worker
* @param AbstractConnection $connection
* @param string $queue
* @param array $options
* @return HorizonRabbitMQQueue|RabbitMQQueue|Queue
*/
protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = [])
{
switch ($worker) {
case 'default':
return new RabbitMQQueue($connection, $queue);
return new RabbitMQQueue($connection, $queue, $options);
case 'horizon':
return new HorizonRabbitMQQueue($connection, $queue);
return new HorizonRabbitMQQueue($connection, $queue, $options);
default:
return new $worker($connection, $queue);
return new $worker($connection, $queue, $options);
}
}

/**
* Recursively filter only null values.
*
* @param array $array
* @return array
*/
private function filter(array $array): array
{
foreach ($array as $index => &$value) {
Expand Down
36 changes: 24 additions & 12 deletions src/Queue/Jobs/RabbitMQJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,25 @@ public function attempts(): int
$headers = Arr::get($this->message->get_properties(), 'application_headers');

if (! $headers) {
return 0;
return 1;
}

$data = $headers->getNativeData();

$laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0);
$xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0);

return $laravelAttempts + $xDeathCount;
return ($laravelAttempts) + 1;
}

public function fail($e = null): void
{
parent::fail($e);

// We must tel rabbitMQ this Job is failed
// The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic.
// like: Death lettering the message to an other exchange/routing-key.
$this->rabbitmq->reject($this);
}

/**
Expand All @@ -95,7 +105,11 @@ public function delete(): void
{
parent::delete();

$this->rabbitmq->ack($this);
// When delete is called and the Job was not failed, the message must be acknowledged.
// This is because this is a controlled call by a developer. So the message was handled correct.
if (! $this->failed) {
$this->rabbitmq->ack($this);
}

// required for Laravel Horizon
if ($this->rabbitmq instanceof HorizonRabbitMQQueue) {
Expand All @@ -108,17 +122,15 @@ public function delete(): void
*/
public function release($delay = 0): void
{
parent::release($delay);

if ($delay > 0) {
$this->rabbitmq->ack($this);
parent::release();

$this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts());
// Always create a new message when this Job is released
$this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts());

return;
}

$this->rabbitmq->reject($this);
// Releasing a Job means the message was failed to process.
// Because this Job is always recreated and pushed as new message, this Job is correctly handled.
// We must tell rabbitMQ this fact.
$this->rabbitmq->ack($this);
}

/**
Expand Down
Loading