diff --git a/README.md b/README.md index 3e0318c9..7db1d753 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,94 @@ Add connection to `config/queue.php`: ], ``` +### Optional Config + +Optionally add queue options to the config of a connection. +Every queue created for this connection, get's the properties. + +When you want to prioritize messages when they were delayed, then this is possible by adding extra options. +- When max-priority is omitted, the max priority is set with 2 when used. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'prioritize_delayed_messages' => false, + 'queue_max_priority' => 10, + ], + ], + ], + + // ... +], +``` + +When you want to publish messages against an exchange with routing-key's, then this is possible by adding extra options. +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted the routing-key by default is the `queue` name. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: when using exchange with routing-key, u probably create your queues with bindings yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => '', + ], + ], + ], + + // ... +], +``` + +In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do something with the message. +When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible by adding extra options. +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: When using failed_job exchange with routing-key, u probably need to create your exchange/queue with bindings yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'options' => [ + 'queue' => [ + // ... + + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s', + ], + ], + ], + + // ... +], +``` + ## Laravel Usage Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues diff --git a/config/rabbitmq.php b/config/rabbitmq.php index afe6f2aa..765d73b8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -36,44 +36,4 @@ */ 'worker' => env('RABBITMQ_WORKER', 'default'), - /* - * ## Manage the delay strategy from the config. - * - * The delay strategy can be set to: - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\DlxDelayStrategy::class - * - * ### Backoff Strategy - * - * The `DlxDelayStrategy` is BackoffAware and by default a ConstantBackoffStrategy is assigned. - * This ensures the same behavior as if the `RabbitMqDlxDelayStrategy` was assigned. - * - * You can assign different backoffStrategies with extra options, for example: - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\LinearBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ExponentialBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\PolynomialBackoffStrategy::class - * - * The options must be an array of key -> value. - * - * For reference about RabbitMQ backoff strategy, see the following article: - * https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81 - * - * ### First-in First-out concept - * - * U can easily prioritize delayed messages. When set to `true` a message will be set with a higher priority. - * This means that delayed messages are handled first when returning to the queue. - * - * This is useful when your queue has allot of jobs, and you want to make sure, a job will be handled - * as soon as possible. This way RabbitMq handles the jobs and the way they are consumed by workers. - * - */ - 'delay' => [ - 'strategy' => env('RABBITMQ_DELAY_STRATEGY', \Enqueue\AmqpTools\RabbitMqDlxDelayStrategy::class), - 'backoff' => [ - 'strategy' => env('RABBITMQ_DELAY_BACKOFF_STRATEGY', \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class), - 'options' => [], - ], - 'prioritize'=> env('RABBITMQ_DELAY_PRIORITIZE'), - ], - ]; diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 88bbed71..955da0a9 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -40,6 +40,6 @@ public function handle(RabbitMQConnector $connector): void (bool) $this->option('auto-delete') ); - $this->warn('Exchange declared successfully.'); + $this->info('Exchange declared successfully.'); } } diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index c18cb980..a3b44b98 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -38,12 +38,13 @@ public function __construct(Dispatcher $dispatcher) */ public function connect(array $config): Queue { - $connection = $this->createConnection($config); + $connection = $this->createConnection(Arr::except($config, 'options.queue')); $queue = $this->createQueue( Arr::get($config, 'worker', 'default'), $connection, - $config['queue'] + $config['queue'], + Arr::get($config, 'options.queue', []) ); if (! $queue instanceof RabbitMQQueue) { @@ -71,29 +72,42 @@ protected function createConnection(array $config): AbstractConnection /** @var AbstractConnection $connection */ $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - $hosts = Arr::shuffle(Arr::get($config, 'hosts', [])); - // manually disable heartbeat so long-running tasks will not fail - $config['options']['heartbeat'] = 0; + Arr::set($config, 'options.heartbeat', 0); return $connection::create_connection( - $hosts, + Arr::shuffle(Arr::get($config, 'hosts', [])), $this->filter(Arr::get($config, 'options', [])) ); } - protected function createQueue(string $worker, AbstractConnection $connection, string $queue) + /** + * Create a queue for the worker. + * + * @param string $worker + * @param AbstractConnection $connection + * @param string $queue + * @param array $options + * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue + */ + protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = []) { switch ($worker) { case 'default': - return new RabbitMQQueue($connection, $queue); + return new RabbitMQQueue($connection, $queue, $options); case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue); + return new HorizonRabbitMQQueue($connection, $queue, $options); default: - return new $worker($connection, $queue); + return new $worker($connection, $queue, $options); } } + /** + * Recursively filter only null values. + * + * @param array $array + * @return array + */ private function filter(array $array): array { foreach ($array as $index => &$value) { diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 410098c6..7f7af1d7 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -75,7 +75,7 @@ public function attempts(): int $headers = Arr::get($this->message->get_properties(), 'application_headers'); if (! $headers) { - return 0; + return 1; } $data = $headers->getNativeData(); @@ -83,7 +83,17 @@ public function attempts(): int $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); - return $laravelAttempts + $xDeathCount; + return ($laravelAttempts) + 1; + } + + public function fail($e = null): void + { + parent::fail($e); + + // We must tel rabbitMQ this Job is failed + // The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic. + // like: Death lettering the message to an other exchange/routing-key. + $this->rabbitmq->reject($this); } /** @@ -95,7 +105,11 @@ public function delete(): void { parent::delete(); - $this->rabbitmq->ack($this); + // When delete is called and the Job was not failed, the message must be acknowledged. + // This is because this is a controlled call by a developer. So the message was handled correct. + if (! $this->failed) { + $this->rabbitmq->ack($this); + } // required for Laravel Horizon if ($this->rabbitmq instanceof HorizonRabbitMQQueue) { @@ -108,17 +122,15 @@ public function delete(): void */ public function release($delay = 0): void { - parent::release($delay); - - if ($delay > 0) { - $this->rabbitmq->ack($this); + parent::release(); - $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); + // Always create a new message when this Job is released + $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); - return; - } - - $this->rabbitmq->reject($this); + // Releasing a Job means the message was failed to process. + // Because this Job is always recreated and pushed as new message, this Job is correctly handled. + // We must tell rabbitMQ this fact. + $this->rabbitmq->ack($this); } /** diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 07996dbb..5092ac61 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -8,6 +8,7 @@ use Exception; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue; +use Illuminate\Support\Arr; use Illuminate\Support\Str; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; @@ -68,13 +69,27 @@ class RabbitMQQueue extends Queue implements QueueContract */ protected $currentJob; + /** + * @var array + */ + protected $options; + + /** + * RabbitMQQueue constructor. + * + * @param AbstractConnection $connection + * @param string $default + * @param array $options + */ public function __construct( AbstractConnection $connection, - string $default + string $default, + array $options = [] ) { $this->connection = $connection; $this->channel = $connection->channel(); $this->default = $default; + $this->options = $options; } /** @@ -100,6 +115,8 @@ public function size($queue = null): int /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function push($job, $data = '', $queue = null) { @@ -108,27 +125,38 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function pushRaw($payload, $queue = null, array $options = []) { - $queue = $this->getQueue($queue); + $destination = $this->getQueue($queue); + $exchange = $this->getExchange(); - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); - $this->bindQueue($queue, $queue, $queue); + // When a exchange is defined and no exchange is present in RabbitMQ, create an exchange. + if ($exchange && ! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $this->getExchangeType()); + } - [$message, $correlationId] = $this->createMessage($payload); + // When no exchange is defined, create a queue for amq.direct publishing, but only if it not already present. + if (! $exchange && ! $this->isQueueExists($destination)) { + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + } - $this->channel->basic_publish($message, $queue, $queue, true, false); + [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) + ? $this->createMessage($payload, $attempts) + : $this->createMessage($payload); + + // Publish the message + $this->channel->basic_publish($message, $exchange, $this->getRoutingKey($destination), true, false); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function later($delay, $job, $data = '', $queue = null) { @@ -139,56 +167,62 @@ public function later($delay, $job, $data = '', $queue = null) ); } + /** + * @param $delay + * @param $payload + * @param null $queue + * @param int $attempts + * @return mixed + * + * @throws AMQPProtocolChannelException + */ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) { $ttl = $this->secondsUntil($delay) * 1000; - if ($ttl < 0) { - return $this->pushRaw($payload, $queue, []); + // When no ttl just publish a new message to the exchange or queue + if ($ttl <= 0) { + return $this->pushRaw($payload, $queue, ['delay' => $delay, 'attempts' => $attempts]); } - $destinationQueue = $this->getQueue($queue); - $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; + $destination = $this->getQueue($queue).'.delay.'.$ttl; - $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - ]); - $this->declareQueue($delayedQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - 'x-message-ttl' => $ttl, - ]); - $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); + $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, null, $delayedQueue, true, false); + // Publish directly on the delayQueue, no need to publish trough an exchange. + $this->channel->basic_publish($message, null, $destination, true, false); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function bulk($jobs, $data = '', $queue = null): void { - $queue = $this->getQueue($queue); + $destination = $this->getQueue($queue); + $exchange = $this->getExchange(); + + // When a exchange is defined and no exchange is present in RabbitMQ, create an exchange. + if ($exchange && ! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $this->getExchangeType()); + } + + // When no exchange is defined, create a queue for amq.direct publishing, but only if it not already present. + if (! $exchange && ! $this->isQueueExists($destination)) { + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + } foreach ((array) $jobs as $job) { [$message] = $this->createMessage( $this->createPayload($job, $queue, $data) ); - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); - $this->bindQueue($queue, $queue, $queue); - - $this->channel->batch_basic_publish($message, $queue, $queue); + $this->channel->batch_basic_publish($message, $exchange, $destination); } $this->channel->publish_batch(); @@ -215,9 +249,14 @@ public function pop($queue = null) ); } } catch (AMQPProtocolChannelException $exception) { - // if there is not exchange or queue AMQP will throw exception with code 404 - // we need to catch it and return null + // If there is not exchange or queue AMQP will throw exception with code 404 + // We need to catch it and return null if ($exception->amqp_reply_code === 404) { + + // Because of the channel exception the channel was closed and removed. + // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. + $this->channel = $this->connection->channel(); + return null; } @@ -227,22 +266,36 @@ public function pop($queue = null) return null; } + /** + * @return AbstractConnection + */ public function getConnection(): AbstractConnection { return $this->connection; } + /** + * @return AMQPChannel + */ public function getChannel(): AMQPChannel { return $this->channel; } + /** + * Gets a queue/destination, by default the queue option set on the connection. + * @param null $queue + * @return string + */ public function getQueue($queue = null) { return $queue ?: $this->default; } /** + * Checks if the given exchange already present/defined in RabbitMQ. + * Returns false when when the exchange is missing. + * * @param string $exchange * @return bool * @throws AMQPProtocolChannelException @@ -265,11 +318,21 @@ public function isExchangeExists(string $exchange): bool } } + /** + * Declare a exchange in rabbitMQ. + * + * @param string $name + * @param string $type + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments + */ public function declareExchange( string $name, string $type = AMQPExchangeType::DIRECT, bool $durable = true, - bool $autoDelete = false + bool $autoDelete = false, + array $arguments = [] ): void { if (in_array($name, $this->exchanges, true)) { return; @@ -282,11 +345,15 @@ public function declareExchange( $durable, $autoDelete, false, - true + true, + new AMQPTable($arguments) ); } /** + * Checks if the given queue already present/defined in RabbitMQ. + * Returns false when when the queue is missing. + * * @param string $name * @return bool * @throws AMQPProtocolChannelException @@ -311,6 +378,14 @@ public function isQueueExists(?string $name = null): bool } } + /** + * Declare a queue in rabbitMQ. + * + * @param string $name + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments + */ public function declareQueue(string $name, bool $durable = true, bool $autoDelete = false, array $arguments = []): void { if (in_array($name, $this->queues, true)) { @@ -328,6 +403,11 @@ public function declareQueue(string $name, bool $durable = true, bool $autoDelet ); } + /** + * @param string $queue + * @param string $exchange + * @param string $routingKey + */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { if (in_array( @@ -341,6 +421,9 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = $this->channel->queue_bind($queue, $exchange, $routingKey); } + /** + * @param null $queue + */ public function purge($queue = null): void { // create a temporary channel, so the main channel will not be closed on exception @@ -349,16 +432,32 @@ public function purge($queue = null): void $channel->close(); } + /** + * @param RabbitMQJob $job + */ public function ack(RabbitMQJob $job): void { $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } + /** + * Reject current Job. + * + * @param RabbitMQJob $job + * @param bool $requeue + */ public function reject(RabbitMQJob $job, bool $requeue = false): void { $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } + /** + * Create a AMQP message. + * + * @param $payload + * @param int $attempts + * @return array + */ protected function createMessage($payload, int $attempts = 0): array { $properties = [ @@ -370,6 +469,10 @@ protected function createMessage($payload, int $attempts = 0): array $properties['correlation_id'] = $correlationId; } + if ($this->isPrioritizeDelayed()) { + $properties['priority'] = $attempts; + } + $message = new AMQPMessage($payload, $properties); $message->set('application_headers', new AMQPTable([ @@ -384,6 +487,14 @@ protected function createMessage($payload, int $attempts = 0): array ]; } + /** + * Create a payload array from the given job and data. + * + * @param object|string $job + * @param string $queue + * @param string $data + * @return array + */ protected function createPayloadArray($job, $queue, $data = '') { return array_merge(parent::createPayloadArray($job, $queue, $data), [ @@ -416,4 +527,135 @@ public function close(): void // Ignore the exception } } + + /** + * Get the Queue arguments. + * + * @return array + */ + protected function getQueueArguments(string $destination): array + { + $arguments = []; + + // Messages without a priority property are treated as if their priority were 0. + // Messages with a priority which is higher than the queue's maximum, are treated as if they were + // published with the maximum priority. + if ($this->isPrioritizeDelayed()) { + $arguments['x-max-priority'] = $this->getQueueMaxPriority(); + } + + if ($this->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange() ?? ''; + $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); + } + + return $arguments; + } + + /** + * Get the Delay queue arguments. + * + * @param string $destination + * @param int $ttl + * @return array + */ + protected function getDelayQueueArguments(string $destination, int $ttl): array + { + return [ + 'x-dead-letter-exchange' => $this->getExchange() ?? '', + 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * @return bool + */ + protected function isPrioritizeDelayed(): bool + { + return boolval(Arr::get($this->options, 'prioritize_delayed') ?: false); + } + + /** + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. + * + * @see https://www.rabbitmq.com/priority.html + * @return int + */ + protected function getQueueMaxPriority(): int + { + return intval(Arr::get($this->options, 'queue_max_priority') ?: 2); + } + + /** + * Get the exchange name, or &null; as default value. + * + * @param string $exchange + * @return string|null + */ + protected function getExchange(string $exchange = null): ?string + { + return $exchange ?: Arr::get($this->options, 'exchange') ?: null; + } + + /** + * Get the routing-key for when you use exchanges + * The default routing-key is the given destination. + * + * @param string $destination + * @return string + */ + protected function getRoutingKey(string $destination): string + { + return ltrim(sprintf(Arr::get($this->options, 'exchange_routing_key') ?: '%s', $destination), '.'); + } + + /** + * Get the exchangeType, or AMQPExchangeType::DIRECT as default. + * + * @param string|null $type + * @return string + */ + protected function getExchangeType(string $type = null): string + { + return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get($this->options, 'exchange_type') ?: 'direct')) ?: AMQPExchangeType::DIRECT; + } + + /** + * Returns &true;, if failed messages should be rerouted. + * + * @return bool + */ + protected function isRerouteFailed(): bool + { + return boolval(Arr::get($this->options, 'reroute_failed') ?: false); + } + + /** + * Get the exchange for failed messages. + * + * @param string|null $exchange + * @return string|null + */ + protected function getFailedExchange(string $exchange = null): ?string + { + return $exchange ?: Arr::get($this->options, 'failed_exchange') ?: null; + } + + /** + * Get the routing-key for failed messages + * The default routing-key is the given destination substituted by '.failed'. + * + * @param string $destination + * @return string + */ + protected function getFailedRoutingKey(string $destination): string + { + return ltrim(sprintf(Arr::get($this->options, 'failed_routing_key') ?: '%s.failed', $destination), '.'); + } } diff --git a/src/Queue/Tools/AbstractBackoffStrategy.php b/src/Queue/Tools/AbstractBackoffStrategy.php deleted file mode 100644 index c29bcc47..00000000 --- a/src/Queue/Tools/AbstractBackoffStrategy.php +++ /dev/null @@ -1,15 +0,0 @@ -options = new ParameterBag($options); - } -} diff --git a/src/Queue/Tools/BackoffStrategy.php b/src/Queue/Tools/BackoffStrategy.php deleted file mode 100644 index d9de5d8f..00000000 --- a/src/Queue/Tools/BackoffStrategy.php +++ /dev/null @@ -1,15 +0,0 @@ -backoffStrategy = $backoffStrategy; - - return $this; - } -} diff --git a/src/Queue/Tools/ConstantBackoffStrategy.php b/src/Queue/Tools/ConstantBackoffStrategy.php deleted file mode 100644 index caf86c04..00000000 --- a/src/Queue/Tools/ConstantBackoffStrategy.php +++ /dev/null @@ -1,18 +0,0 @@ -getProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, 2)); - $previousAttempt = $delayedAttempt - 1; - - $delay = $this->calculateDelay($delay, $previousAttempt); - - $delayMessage = $this->createDelayMessage($context, $message); - $delayQueue = $this->createDelayQueue($context, $dest, $message, $delayMessage, $delay); - $producer = $this->createProducer($context, $previousAttempt); - - $producer->send($delayQueue, $delayMessage); - } - - /** - * @param AmqpContext $context - * @param AmqpDestination $dest - * @param AmqpMessage $message - * @param AmqpMessage $delayMessage - * @param int $delay - * @return AmqpQueue - * @throws InvalidDestinationException - */ - protected function createDelayQueue(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message, AmqpMessage $delayMessage, int $delay): AmqpQueue - { - if ($dest instanceof AmqpTopic) { - $routingKey = $message->getRoutingKey() ? '.'.$message->getRoutingKey() : ''; - $name = sprintf('enqueue.%s%s.%s.x.delay', $dest->getTopicName(), $routingKey, $delay); - - $delayQueue = $context->createQueue($name); - $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); - $delayQueue->setArgument('x-message-ttl', $delay); - $delayQueue->setArgument('x-expires', $delay * 2); - $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); - $delayQueue->setArgument('x-dead-letter-routing-key', (string) $delayMessage->getRoutingKey()); - } elseif ($dest instanceof AmqpQueue) { - $delayQueue = $context->createQueue('enqueue.'.$dest->getQueueName().'.'.$delay.'.delayed'); - $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); - $delayQueue->setArgument('x-message-ttl', $delay); - $delayQueue->setArgument('x-expires', $delay * 2); - $delayQueue->setArgument('x-dead-letter-exchange', ''); - $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); - } else { - throw new InvalidDestinationException(sprintf('The destination must be an instance of %s but got %s.', - AmqpTopic::class.'|'.AmqpQueue::class, - get_class($dest) - )); - } - - $context->declareQueue($delayQueue); - - return $delayQueue; - } - - /** - * @param int $delay - * @param int $attempt - * @return int - */ - private function calculateDelay(int $delay, int $attempt): int - { - if ($this->backoffStrategy) { - $delay = $this->backoffStrategy->backoffDelayTime($delay, $attempt); - } - - return $delay; - } - - /** - * @param AmqpContext $context - * @param int $priority - * @return \Interop\Amqp\AmqpProducer - * @throws \Interop\Queue\Exception\PriorityNotSupportedException - */ - private function createProducer(AmqpContext $context, int $priority = null): \Interop\Amqp\AmqpProducer - { - $producer = $context->createProducer(); - - if ($this->prioritize && $priority) { - $producer->setPriority($priority); - } - - return $producer; - } - - /** - * @param AmqpContext $context - * @param AmqpMessage $message - * @return AmqpMessage - */ - private function createDelayMessage(AmqpContext $context, AmqpMessage $message): AmqpMessage - { - $properties = $message->getProperties(); - - // The x-death header must be removed because of the bug in RabbitMQ. - // It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there. - // https://github.com/rabbitmq/rabbitmq-server/issues/216 - unset($properties['x-death']); - - $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); - $delayMessage->setRoutingKey($message->getRoutingKey()); - - return $delayMessage; - } -} diff --git a/src/Queue/Tools/ExponentialBackoffStrategy.php b/src/Queue/Tools/ExponentialBackoffStrategy.php deleted file mode 100644 index 501f9033..00000000 --- a/src/Queue/Tools/ExponentialBackoffStrategy.php +++ /dev/null @@ -1,22 +0,0 @@ -options->get('factor', 2)) * $delay); - } -} diff --git a/src/Queue/Tools/PrioritizeAware.php b/src/Queue/Tools/PrioritizeAware.php deleted file mode 100644 index b92350d9..00000000 --- a/src/Queue/Tools/PrioritizeAware.php +++ /dev/null @@ -1,8 +0,0 @@ -prioritize = $prioritize; - - return $this; - } -} diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 60f39b4c..3f7092ed 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -55,7 +55,7 @@ public function testPushRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame($payload, $job->getRawBody()); @@ -74,7 +74,7 @@ public function testPush(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame(TestJob::class, $job->resolveName()); $this->assertNotNull($job->getJobId()); @@ -173,9 +173,9 @@ public function testReleaseRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -196,9 +196,9 @@ public function testRelease(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -219,9 +219,9 @@ public function testReleaseWithDelayRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(4); sleep(1); @@ -265,7 +265,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void sleep(1); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(1, $job->attempts()); + $this->assertSame(2, $job->attempts()); $job->release(3); @@ -273,7 +273,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void $this->assertNotNull($job = Queue::pop()); - $this->assertSame(2, $job->attempts()); + $this->assertSame(3, $job->attempts()); } public function testDelete(): void @@ -289,4 +289,20 @@ public function testDelete(): void $this->assertSame(0, Queue::size()); $this->assertNull(Queue::pop()); } + + public function testFailed(): void + { + Queue::push(new TestJob()); + + $job = Queue::pop(); + + $job->fail(new \RuntimeException($job->resolveName().' has an exception.')); + + sleep(1); + + $this->assertSame(true, $job->hasFailed()); + $this->assertSame(true, $job->isDeleted()); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + } } diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php new file mode 100644 index 00000000..4781e19d --- /dev/null +++ b/tests/Functional/RabbitMQQueueTest.php @@ -0,0 +1,151 @@ +connection(); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + } + + public function testRerouteFailed(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callMethod($queue, 'isRerouteFailed')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + } + + public function testPrioritizeDelayed(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callMethod($queue, 'isPrioritizeDelayed')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + } + + public function testQueueMaxPriority(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(20, $this->callMethod($queue, 'getQueueMaxPriority')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); + } + + public function testExchangeType(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['test'])); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType', ['topic'])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + } + + public function testExchange(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertNull($this->callMethod($queue, 'getExchange', [''])); + $this->assertNull($this->callMethod($queue, 'getExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertNotNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertNull($this->callMethod($queue, 'getExchange')); + } + + public function testFailedExchange(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertNull($this->callMethod($queue, 'getExchange', [''])); + $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertNotNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + } + + public function testRoutingKey(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('process.test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + } + + public function testFailedRoutingKey(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('application-x.test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + } + + public function testQueueArguments(): void + { + $this->assertTrue(true); + } + + public function testDelayQueueArguments(): void + { + $this->assertTrue(true); + } +} diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php new file mode 100644 index 00000000..b7f5b48b --- /dev/null +++ b/tests/Functional/TestCase.php @@ -0,0 +1,142 @@ +set('queue.default', 'rabbitmq'); + $app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => true, + 'queue_max_priority' => 20, + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + + 'queue' => [ + 'prioritize_delayed' => '', + 'queue_max_priority' => '', + 'exchange' => '', + 'exchange_type' => '', + 'exchange_routing_key' => '', + 'reroute_failed' => '', + 'failed_exchange' => '', + 'failed_routing_key' => '', + ], + ], + + 'worker' => 'default', + + ]); + } + + /** + * @param $object + * @param string $method + * @param array $parameters + * @return mixed + * @throws \Exception + */ + protected function callMethod($object, string $method, array $parameters = []) + { + try { + $className = get_class($object); + $reflection = new \ReflectionClass($className); + } catch (\ReflectionException $e) { + throw new \Exception($e->getMessage()); + } + + $method = $reflection->getMethod($method); + $method->setAccessible(true); + + return $method->invokeArgs($object, $parameters); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 5eea441e..7d50fa67 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -50,8 +50,8 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(): RabbitMQQueue + protected function connection(string $name = null): RabbitMQQueue { - return Queue::connection(); + return Queue::connection($name); } }