From 36bbaaa4ac6c6c64817bc4159729c4d77a0b4daa Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:17:10 +0100 Subject: [PATCH 01/18] Fixed the updating of attempts when pushing the job back to the queue --- src/Queue/Jobs/RabbitMQJob.php | 10 +++++----- src/Queue/RabbitMQQueue.php | 26 +++++++++++--------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 410098c6..3967a936 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -74,16 +74,16 @@ public function attempts(): int /** @var AMQPTable|null $headers */ $headers = Arr::get($this->message->get_properties(), 'application_headers'); - if (! $headers) { - return 0; + if (!$headers) { + return 1; } $data = $headers->getNativeData(); - $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); - $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); + $laravelAttempts = (int)Arr::get($data, 'laravel.attempts', 0); + $xDeathCount = (int)Arr::get($headers->getNativeData(), 'x-death.0.count', 0); - return $laravelAttempts + $xDeathCount; + return ($laravelAttempts + $xDeathCount) + 1; } /** diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 07996dbb..742cadaf 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -8,6 +8,7 @@ use Exception; use Illuminate\Contracts\Queue\Queue as QueueContract; use Illuminate\Queue\Queue; +use Illuminate\Support\Arr; use Illuminate\Support\Str; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; @@ -114,13 +115,13 @@ public function pushRaw($payload, $queue = null, array $options = []) $queue = $this->getQueue($queue); $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); + $this->declareQueue($queue, true, false); $this->bindQueue($queue, $queue, $queue); - [$message, $correlationId] = $this->createMessage($payload); + [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) + ? $this->createMessage($payload, $attempts) + : $this->createMessage($payload) + ; $this->channel->basic_publish($message, $queue, $queue, true, false); @@ -143,22 +144,20 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) { $ttl = $this->secondsUntil($delay) * 1000; - if ($ttl < 0) { - return $this->pushRaw($payload, $queue, []); + if ($ttl <= 0) { + return $this->pushRaw($payload, $queue, ['attempts' => $attempts]); } $destinationQueue = $this->getQueue($queue); $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - ]); + $this->declareQueue($destinationQueue, true, false); $this->declareQueue($delayedQueue, true, false, [ 'x-dead-letter-exchange' => $destinationQueue, 'x-dead-letter-routing-key' => $destinationQueue, 'x-message-ttl' => $ttl, + 'x-expires' => $ttl*2 ]); $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); @@ -182,10 +181,7 @@ public function bulk($jobs, $data = '', $queue = null): void ); $this->declareExchange($queue); - $this->declareQueue($queue, true, false, [ - 'x-dead-letter-exchange' => $queue, - 'x-dead-letter-routing-key' => $queue, - ]); + $this->declareQueue($queue, true, false); $this->bindQueue($queue, $queue, $queue); $this->channel->batch_basic_publish($message, $queue, $queue); From 528855a151f5b8d2a5e904fb5bffc50418b5eabf Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:17:58 +0100 Subject: [PATCH 02/18] Fixed losing messages when no delay was set --- src/Queue/Jobs/RabbitMQJob.php | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 3967a936..0727d5c0 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -108,15 +108,9 @@ public function delete(): void */ public function release($delay = 0): void { - parent::release($delay); + parent::release(); - if ($delay > 0) { - $this->rabbitmq->ack($this); - - $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); - - return; - } + $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); $this->rabbitmq->reject($this); } From c2e990ab367ebc8dd596d52213d0a3233a9a1b0f Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:22:40 +0100 Subject: [PATCH 03/18] Cleanup fork --- config/rabbitmq.php | 45 ------ src/Queue/Tools/AbstractBackoffStrategy.php | 15 -- src/Queue/Tools/BackoffStrategy.php | 15 -- src/Queue/Tools/BackoffStrategyAware.php | 8 -- src/Queue/Tools/BackoffStrategyAwareTrait.php | 22 --- src/Queue/Tools/ConstantBackoffStrategy.php | 18 --- src/Queue/Tools/DlxDelayStrategy.php | 133 ------------------ .../Tools/ExponentialBackoffStrategy.php | 22 --- src/Queue/Tools/LinearBackoffStrategy.php | 18 --- src/Queue/Tools/PolynomialBackoffStrategy.php | 18 --- src/Queue/Tools/PrioritizeAware.php | 8 -- src/Queue/Tools/PrioritizeAwareTrait.php | 22 --- 12 files changed, 344 deletions(-) delete mode 100644 src/Queue/Tools/AbstractBackoffStrategy.php delete mode 100644 src/Queue/Tools/BackoffStrategy.php delete mode 100644 src/Queue/Tools/BackoffStrategyAware.php delete mode 100644 src/Queue/Tools/BackoffStrategyAwareTrait.php delete mode 100644 src/Queue/Tools/ConstantBackoffStrategy.php delete mode 100644 src/Queue/Tools/DlxDelayStrategy.php delete mode 100644 src/Queue/Tools/ExponentialBackoffStrategy.php delete mode 100644 src/Queue/Tools/LinearBackoffStrategy.php delete mode 100644 src/Queue/Tools/PolynomialBackoffStrategy.php delete mode 100644 src/Queue/Tools/PrioritizeAware.php delete mode 100644 src/Queue/Tools/PrioritizeAwareTrait.php diff --git a/config/rabbitmq.php b/config/rabbitmq.php index afe6f2aa..d4e4a17b 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -31,49 +31,4 @@ ], ], - /* - * Set to "horizon" if you wish to use Laravel Horizon. - */ - 'worker' => env('RABBITMQ_WORKER', 'default'), - - /* - * ## Manage the delay strategy from the config. - * - * The delay strategy can be set to: - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\DlxDelayStrategy::class - * - * ### Backoff Strategy - * - * The `DlxDelayStrategy` is BackoffAware and by default a ConstantBackoffStrategy is assigned. - * This ensures the same behavior as if the `RabbitMqDlxDelayStrategy` was assigned. - * - * You can assign different backoffStrategies with extra options, for example: - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\LinearBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ExponentialBackoffStrategy::class - * - \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\PolynomialBackoffStrategy::class - * - * The options must be an array of key -> value. - * - * For reference about RabbitMQ backoff strategy, see the following article: - * https://m.alphasights.com/exponential-backoff-with-rabbitmq-78386b9bec81 - * - * ### First-in First-out concept - * - * U can easily prioritize delayed messages. When set to `true` a message will be set with a higher priority. - * This means that delayed messages are handled first when returning to the queue. - * - * This is useful when your queue has allot of jobs, and you want to make sure, a job will be handled - * as soon as possible. This way RabbitMq handles the jobs and the way they are consumed by workers. - * - */ - 'delay' => [ - 'strategy' => env('RABBITMQ_DELAY_STRATEGY', \Enqueue\AmqpTools\RabbitMqDlxDelayStrategy::class), - 'backoff' => [ - 'strategy' => env('RABBITMQ_DELAY_BACKOFF_STRATEGY', \VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Tools\ConstantBackoffStrategy::class), - 'options' => [], - ], - 'prioritize'=> env('RABBITMQ_DELAY_PRIORITIZE'), - ], - ]; diff --git a/src/Queue/Tools/AbstractBackoffStrategy.php b/src/Queue/Tools/AbstractBackoffStrategy.php deleted file mode 100644 index c29bcc47..00000000 --- a/src/Queue/Tools/AbstractBackoffStrategy.php +++ /dev/null @@ -1,15 +0,0 @@ -options = new ParameterBag($options); - } -} diff --git a/src/Queue/Tools/BackoffStrategy.php b/src/Queue/Tools/BackoffStrategy.php deleted file mode 100644 index d9de5d8f..00000000 --- a/src/Queue/Tools/BackoffStrategy.php +++ /dev/null @@ -1,15 +0,0 @@ -backoffStrategy = $backoffStrategy; - - return $this; - } -} diff --git a/src/Queue/Tools/ConstantBackoffStrategy.php b/src/Queue/Tools/ConstantBackoffStrategy.php deleted file mode 100644 index caf86c04..00000000 --- a/src/Queue/Tools/ConstantBackoffStrategy.php +++ /dev/null @@ -1,18 +0,0 @@ -getProperty(RabbitMQJob::ATTEMPT_COUNT_HEADERS_KEY, 2)); - $previousAttempt = $delayedAttempt - 1; - - $delay = $this->calculateDelay($delay, $previousAttempt); - - $delayMessage = $this->createDelayMessage($context, $message); - $delayQueue = $this->createDelayQueue($context, $dest, $message, $delayMessage, $delay); - $producer = $this->createProducer($context, $previousAttempt); - - $producer->send($delayQueue, $delayMessage); - } - - /** - * @param AmqpContext $context - * @param AmqpDestination $dest - * @param AmqpMessage $message - * @param AmqpMessage $delayMessage - * @param int $delay - * @return AmqpQueue - * @throws InvalidDestinationException - */ - protected function createDelayQueue(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message, AmqpMessage $delayMessage, int $delay): AmqpQueue - { - if ($dest instanceof AmqpTopic) { - $routingKey = $message->getRoutingKey() ? '.'.$message->getRoutingKey() : ''; - $name = sprintf('enqueue.%s%s.%s.x.delay', $dest->getTopicName(), $routingKey, $delay); - - $delayQueue = $context->createQueue($name); - $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); - $delayQueue->setArgument('x-message-ttl', $delay); - $delayQueue->setArgument('x-expires', $delay * 2); - $delayQueue->setArgument('x-dead-letter-exchange', $dest->getTopicName()); - $delayQueue->setArgument('x-dead-letter-routing-key', (string) $delayMessage->getRoutingKey()); - } elseif ($dest instanceof AmqpQueue) { - $delayQueue = $context->createQueue('enqueue.'.$dest->getQueueName().'.'.$delay.'.delayed'); - $delayQueue->addFlag(AmqpTopic::FLAG_DURABLE); - $delayQueue->setArgument('x-message-ttl', $delay); - $delayQueue->setArgument('x-expires', $delay * 2); - $delayQueue->setArgument('x-dead-letter-exchange', ''); - $delayQueue->setArgument('x-dead-letter-routing-key', $dest->getQueueName()); - } else { - throw new InvalidDestinationException(sprintf('The destination must be an instance of %s but got %s.', - AmqpTopic::class.'|'.AmqpQueue::class, - get_class($dest) - )); - } - - $context->declareQueue($delayQueue); - - return $delayQueue; - } - - /** - * @param int $delay - * @param int $attempt - * @return int - */ - private function calculateDelay(int $delay, int $attempt): int - { - if ($this->backoffStrategy) { - $delay = $this->backoffStrategy->backoffDelayTime($delay, $attempt); - } - - return $delay; - } - - /** - * @param AmqpContext $context - * @param int $priority - * @return \Interop\Amqp\AmqpProducer - * @throws \Interop\Queue\Exception\PriorityNotSupportedException - */ - private function createProducer(AmqpContext $context, int $priority = null): \Interop\Amqp\AmqpProducer - { - $producer = $context->createProducer(); - - if ($this->prioritize && $priority) { - $producer->setPriority($priority); - } - - return $producer; - } - - /** - * @param AmqpContext $context - * @param AmqpMessage $message - * @return AmqpMessage - */ - private function createDelayMessage(AmqpContext $context, AmqpMessage $message): AmqpMessage - { - $properties = $message->getProperties(); - - // The x-death header must be removed because of the bug in RabbitMQ. - // It was reported that the bug is fixed since 3.5.4 but I tried with 3.6.1 and the bug still there. - // https://github.com/rabbitmq/rabbitmq-server/issues/216 - unset($properties['x-death']); - - $delayMessage = $context->createMessage($message->getBody(), $properties, $message->getHeaders()); - $delayMessage->setRoutingKey($message->getRoutingKey()); - - return $delayMessage; - } -} diff --git a/src/Queue/Tools/ExponentialBackoffStrategy.php b/src/Queue/Tools/ExponentialBackoffStrategy.php deleted file mode 100644 index 501f9033..00000000 --- a/src/Queue/Tools/ExponentialBackoffStrategy.php +++ /dev/null @@ -1,22 +0,0 @@ -options->get('factor', 2)) * $delay); - } -} diff --git a/src/Queue/Tools/PrioritizeAware.php b/src/Queue/Tools/PrioritizeAware.php deleted file mode 100644 index b92350d9..00000000 --- a/src/Queue/Tools/PrioritizeAware.php +++ /dev/null @@ -1,8 +0,0 @@ -prioritize = $prioritize; - - return $this; - } -} From d6ad8a2e737b65d3e5102f9d38cf35e09421b1ba Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:24:40 +0100 Subject: [PATCH 04/18] Cleanup fork --- config/rabbitmq.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/config/rabbitmq.php b/config/rabbitmq.php index d4e4a17b..765d73b8 100644 --- a/config/rabbitmq.php +++ b/config/rabbitmq.php @@ -31,4 +31,9 @@ ], ], + /* + * Set to "horizon" if you wish to use Laravel Horizon. + */ + 'worker' => env('RABBITMQ_WORKER', 'default'), + ]; From 60471a0953594c4aacb72021a168b6af49948a7c Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:41:40 +0100 Subject: [PATCH 05/18] Code style cleanup --- src/Queue/Jobs/RabbitMQJob.php | 6 +++--- src/Queue/RabbitMQQueue.php | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index 0727d5c0..e13aca09 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -74,14 +74,14 @@ public function attempts(): int /** @var AMQPTable|null $headers */ $headers = Arr::get($this->message->get_properties(), 'application_headers'); - if (!$headers) { + if (! $headers) { return 1; } $data = $headers->getNativeData(); - $laravelAttempts = (int)Arr::get($data, 'laravel.attempts', 0); - $xDeathCount = (int)Arr::get($headers->getNativeData(), 'x-death.0.count', 0); + $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); + $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); return ($laravelAttempts + $xDeathCount) + 1; } diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 742cadaf..6982aba1 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -120,8 +120,7 @@ public function pushRaw($payload, $queue = null, array $options = []) [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) ? $this->createMessage($payload, $attempts) - : $this->createMessage($payload) - ; + : $this->createMessage($payload); $this->channel->basic_publish($message, $queue, $queue, true, false); @@ -157,7 +156,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) 'x-dead-letter-exchange' => $destinationQueue, 'x-dead-letter-routing-key' => $destinationQueue, 'x-message-ttl' => $ttl, - 'x-expires' => $ttl*2 + 'x-expires' => $ttl * 2 ]); $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); From 4e5860ad0a0275535039f0a8d5b6093dcd445034 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 01:43:09 +0100 Subject: [PATCH 06/18] Code style cleanup --- src/Queue/RabbitMQQueue.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 6982aba1..e845b4c4 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -156,7 +156,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) 'x-dead-letter-exchange' => $destinationQueue, 'x-dead-letter-routing-key' => $destinationQueue, 'x-message-ttl' => $ttl, - 'x-expires' => $ttl * 2 + 'x-expires' => $ttl * 2, ]); $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); From 8941ad34a066e7ea96818fe4d0bbce367a76fe6a Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 18:00:09 +0100 Subject: [PATCH 07/18] Ditched the xDeathCount because the value is not a controlled value. The value can mess things up. The attempts() method was designed to return the current attempt value, not the value of the previous run. Else the worker will try 1 try to long. --- src/Queue/Jobs/RabbitMQJob.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index e13aca09..d5a314d2 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -81,9 +81,8 @@ public function attempts(): int $data = $headers->getNativeData(); $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); - $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); - return ($laravelAttempts + $xDeathCount) + 1; + return $laravelAttempts + 1; } /** From af9b1c422a4a86acd462b762f2500e796d95b69e Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 18:04:57 +0100 Subject: [PATCH 08/18] Fixed Tests The attempts() method on the Job is/was interpreted wrong. the method mus return the previous_attempt + 1 = current attempt. --- tests/Feature/TestCase.php | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index 60f39b4c..ed688207 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -55,7 +55,7 @@ public function testPushRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame($payload, $job->getRawBody()); @@ -74,7 +74,7 @@ public function testPush(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); $this->assertInstanceOf(RabbitMQJob::class, $job); $this->assertSame(TestJob::class, $job->resolveName()); $this->assertNotNull($job->getJobId()); @@ -173,9 +173,9 @@ public function testReleaseRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -196,9 +196,9 @@ public function testRelease(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(); sleep(1); @@ -219,9 +219,9 @@ public function testReleaseWithDelayRaw(): void $this->assertSame(1, Queue::size()); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(0, $job->attempts()); + $this->assertSame(1, $job->attempts()); - for ($attempt = 1; $attempt <= 3; $attempt++) { + for ($attempt = 2; $attempt <= 4; $attempt++) { $job->release(4); sleep(1); @@ -265,7 +265,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void sleep(1); $this->assertNotNull($job = Queue::pop()); - $this->assertSame(1, $job->attempts()); + $this->assertSame(2, $job->attempts()); $job->release(3); @@ -273,7 +273,7 @@ public function testReleaseAndReleaseWithDelayAttempts(): void $this->assertNotNull($job = Queue::pop()); - $this->assertSame(2, $job->attempts()); + $this->assertSame(3, $job->attempts()); } public function testDelete(): void From 86ab7023b2ff4c1634f86e8187b3a30d87fd3ed9 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 20:16:12 +0100 Subject: [PATCH 09/18] Added prioritization when messages put back onto the queue when relaesed. Previously this was handled by rabbitmq with a DLX. [removed] --- src/Queue/RabbitMQQueue.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index e845b4c4..af8d8af6 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -115,7 +115,7 @@ public function pushRaw($payload, $queue = null, array $options = []) $queue = $this->getQueue($queue); $this->declareExchange($queue); - $this->declareQueue($queue, true, false); + $this->declareQueue($queue, true, false, ['x-max-priority'=> 100]); $this->bindQueue($queue, $queue, $queue); [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) @@ -151,7 +151,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false); + $this->declareQueue($destinationQueue, true, false, ['x-max-priority'=> 100]); $this->declareQueue($delayedQueue, true, false, [ 'x-dead-letter-exchange' => $destinationQueue, 'x-dead-letter-routing-key' => $destinationQueue, @@ -180,7 +180,7 @@ public function bulk($jobs, $data = '', $queue = null): void ); $this->declareExchange($queue); - $this->declareQueue($queue, true, false); + $this->declareQueue($queue, true, false, ['x-max-priority'=> 100]); $this->bindQueue($queue, $queue, $queue); $this->channel->batch_basic_publish($message, $queue, $queue); @@ -363,6 +363,7 @@ protected function createMessage($payload, int $attempts = 0): array if ($correlationId = json_decode($payload, true)['id'] ?? null) { $properties['correlation_id'] = $correlationId; + $properties['priority'] = $attempts; } $message = new AMQPMessage($payload, $properties); From 849d608be1ddfe81f25dbe2f49f238d38cda6054 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 2 Feb 2020 20:21:25 +0100 Subject: [PATCH 10/18] Fixed Style issues --- src/Queue/RabbitMQQueue.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index af8d8af6..9213717e 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -115,7 +115,7 @@ public function pushRaw($payload, $queue = null, array $options = []) $queue = $this->getQueue($queue); $this->declareExchange($queue); - $this->declareQueue($queue, true, false, ['x-max-priority'=> 100]); + $this->declareQueue($queue, true, false, ['x-max-priority' => 100]); $this->bindQueue($queue, $queue, $queue); [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) @@ -151,7 +151,7 @@ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false, ['x-max-priority'=> 100]); + $this->declareQueue($destinationQueue, true, false, ['x-max-priority' => 100]); $this->declareQueue($delayedQueue, true, false, [ 'x-dead-letter-exchange' => $destinationQueue, 'x-dead-letter-routing-key' => $destinationQueue, @@ -180,7 +180,7 @@ public function bulk($jobs, $data = '', $queue = null): void ); $this->declareExchange($queue); - $this->declareQueue($queue, true, false, ['x-max-priority'=> 100]); + $this->declareQueue($queue, true, false, ['x-max-priority' => 100]); $this->bindQueue($queue, $queue, $queue); $this->channel->batch_basic_publish($message, $queue, $queue); From ef6bb55682fbd6a61cbd63b7f9daeedbad00906c Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Thu, 6 Feb 2020 14:13:40 +0100 Subject: [PATCH 11/18] - Added exchange publishing. - Added delayed messages with a higher priority when put back to the queue. - Added ability to reroute failed messages - Fixed when to acknowledge or decline messages. - Fixed attempts() when multiple delay values are used. - Fixed death worker loop, when queues are missing. - Fixed issue #305 - Fixed issue #302 - Fixed issue #301 - Fixed issue #299 --- README.md | 84 ++++++ src/Console/ExchangeDeclareCommand.php | 2 +- src/Queue/Connectors/RabbitMQConnector.php | 26 +- src/Queue/Jobs/RabbitMQJob.php | 25 +- src/Queue/RabbitMQQueue.php | 298 +++++++++++++++++++-- tests/Feature/TestCase.php | 16 ++ tests/Functional/RabbitMQQueueTest.php | 156 +++++++++++ tests/Functional/TestCase.php | 146 ++++++++++ tests/TestCase.php | 4 +- 9 files changed, 717 insertions(+), 40 deletions(-) create mode 100644 tests/Functional/RabbitMQQueueTest.php create mode 100644 tests/Functional/TestCase.php diff --git a/README.md b/README.md index 3e0318c9..b99ce790 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,90 @@ Add connection to `config/queue.php`: ], ``` +### Optional Config + +Optionally add queue options to the config of a connection. +Every queue created with this connection, get's these properties. + +When you want to prioritize messages when they were delayed, then this is possible by adding extra options. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue_options' => [ + // ... + + 'prioritize_delayed_messages' => false, + 'queue_max_priority' => 100, + + ], + ], + + // ... +], +``` + +When you want to publish messages against an exchange with routing-key's, then this is possible by adding extra options. +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted the routing-key by default is the `queue` name. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: when using exchange with routing-key, u probably create your queues with bindings yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue_options' => [ + // ... + + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => '', + + ], + ], + + // ... +], +``` + +In Laravel failed jobs are stored into the database. But maybe you want to instruct some other process to also do something with the message. +When you want to instruct RabbitMQ to reroute failed messages to a exchange or a specific queue, then this is possible by adding extra options. +- When the exchange is omitted, RabbitMQ will use the `amq.direct` exchange for the routing-key +- When routing-key is omitted, the routing-key by default the `queue` name is substituted with `'.failed'`. +- When using `%s` in the routing-key the queue_name will be substituted. + +> Note: When using failed_job exchange with routing-key, u probably need to create your exchange/queue with bindings yourself. + +```php +'connections' => [ + // ... + + 'rabbitmq' => [ + // ... + + 'queue_options' => [ + // ... + + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s', + + ], + ], + + // ... +], +``` + ## Laravel Usage Once you completed the configuration you can use Laravel Queue API. If you used other queue drivers you do not need to change anything else. If you do not know how to use Queue API, please refer to the official Laravel documentation: http://laravel.com/docs/queues diff --git a/src/Console/ExchangeDeclareCommand.php b/src/Console/ExchangeDeclareCommand.php index 88bbed71..955da0a9 100644 --- a/src/Console/ExchangeDeclareCommand.php +++ b/src/Console/ExchangeDeclareCommand.php @@ -40,6 +40,6 @@ public function handle(RabbitMQConnector $connector): void (bool) $this->option('auto-delete') ); - $this->warn('Exchange declared successfully.'); + $this->info('Exchange declared successfully.'); } } diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index c18cb980..2a17b703 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -43,7 +43,8 @@ public function connect(array $config): Queue $queue = $this->createQueue( Arr::get($config, 'worker', 'default'), $connection, - $config['queue'] + $config['queue'], + Arr::get($config, 'queue_options', []) ); if (! $queue instanceof RabbitMQQueue) { @@ -82,18 +83,33 @@ protected function createConnection(array $config): AbstractConnection ); } - protected function createQueue(string $worker, AbstractConnection $connection, string $queue) + /** + * Create a queue for the worker + * + * @param string $worker + * @param AbstractConnection $connection + * @param string $queue + * @param array $options + * @return HorizonRabbitMQQueue|RabbitMQQueue|Queue + */ + protected function createQueue(string $worker, AbstractConnection $connection, string $queue, array $options = []) { switch ($worker) { case 'default': - return new RabbitMQQueue($connection, $queue); + return new RabbitMQQueue($connection, $queue, $options); case 'horizon': - return new HorizonRabbitMQQueue($connection, $queue); + return new HorizonRabbitMQQueue($connection, $queue, $options); default: - return new $worker($connection, $queue); + return new $worker($connection, $queue, $options); } } + /** + * Recursively filter only null values + * + * @param array $array + * @return array + */ private function filter(array $array): array { foreach ($array as $index => &$value) { diff --git a/src/Queue/Jobs/RabbitMQJob.php b/src/Queue/Jobs/RabbitMQJob.php index d5a314d2..7f7af1d7 100644 --- a/src/Queue/Jobs/RabbitMQJob.php +++ b/src/Queue/Jobs/RabbitMQJob.php @@ -81,8 +81,19 @@ public function attempts(): int $data = $headers->getNativeData(); $laravelAttempts = (int) Arr::get($data, 'laravel.attempts', 0); + $xDeathCount = (int) Arr::get($headers->getNativeData(), 'x-death.0.count', 0); - return $laravelAttempts + 1; + return ($laravelAttempts) + 1; + } + + public function fail($e = null): void + { + parent::fail($e); + + // We must tel rabbitMQ this Job is failed + // The message must be rejected when the Job marked as failed, in case rabbitMQ wants to do some extra magic. + // like: Death lettering the message to an other exchange/routing-key. + $this->rabbitmq->reject($this); } /** @@ -94,7 +105,11 @@ public function delete(): void { parent::delete(); - $this->rabbitmq->ack($this); + // When delete is called and the Job was not failed, the message must be acknowledged. + // This is because this is a controlled call by a developer. So the message was handled correct. + if (! $this->failed) { + $this->rabbitmq->ack($this); + } // required for Laravel Horizon if ($this->rabbitmq instanceof HorizonRabbitMQQueue) { @@ -109,9 +124,13 @@ public function release($delay = 0): void { parent::release(); + // Always create a new message when this Job is released $this->rabbitmq->laterRaw($delay, $this->message->body, $this->queue, $this->attempts()); - $this->rabbitmq->reject($this); + // Releasing a Job means the message was failed to process. + // Because this Job is always recreated and pushed as new message, this Job is correctly handled. + // We must tell rabbitMQ this fact. + $this->rabbitmq->ack($this); } /** diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index 9213717e..b119ca44 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -69,13 +69,27 @@ class RabbitMQQueue extends Queue implements QueueContract */ protected $currentJob; + /** + * @var array + */ + protected $options; + + /** + * RabbitMQQueue constructor. + * + * @param AbstractConnection $connection + * @param string $default + * @param array $options + */ public function __construct( AbstractConnection $connection, - string $default + string $default, + array $options = [] ) { $this->connection = $connection; $this->channel = $connection->channel(); $this->default = $default; + $this->options = $options; } /** @@ -101,6 +115,8 @@ public function size($queue = null): int /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function push($job, $data = '', $queue = null) { @@ -109,26 +125,38 @@ public function push($job, $data = '', $queue = null) /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function pushRaw($payload, $queue = null, array $options = []) { - $queue = $this->getQueue($queue); + $destination = $this->getQueue($queue); + $exchange = $this->getExchange(); - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, ['x-max-priority' => 100]); - $this->bindQueue($queue, $queue, $queue); + // When a exchange is defined and no exchange is present in RabbitMQ, create an exchange. + if ($exchange && ! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $this->getExchangeType()); + } + + // When no exchange is defined, create a queue for amq.direct publishing, but only if it not already present. + if (! $exchange && ! $this->isQueueExists($destination)) { + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + } [$message, $correlationId] = ($attempts = Arr::get($options, 'attempts')) ? $this->createMessage($payload, $attempts) : $this->createMessage($payload); - $this->channel->basic_publish($message, $queue, $queue, true, false); + // Publish the message + $this->channel->basic_publish($message, $exchange, $this->getRoutingKey($destination), true, false); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function later($delay, $job, $data = '', $queue = null) { @@ -139,51 +167,62 @@ public function later($delay, $job, $data = '', $queue = null) ); } + /** + * @param $delay + * @param $payload + * @param null $queue + * @param int $attempts + * @return mixed + * + * @throws AMQPProtocolChannelException + */ public function laterRaw($delay, $payload, $queue = null, $attempts = 0) { $ttl = $this->secondsUntil($delay) * 1000; + // When no ttl just publish a new message to the exchange or queue if ($ttl <= 0) { - return $this->pushRaw($payload, $queue, ['attempts' => $attempts]); + return $this->pushRaw($payload, $queue, ['delay' => $delay, 'attempts' => $attempts]); } - $destinationQueue = $this->getQueue($queue); - $delayedQueue = $this->getQueue($queue).'.delay.'.$ttl; + $destination = $this->getQueue($queue).'.delay.'.$ttl; - $this->declareExchange($destinationQueue); - $this->declareQueue($destinationQueue, true, false, ['x-max-priority' => 100]); - $this->declareQueue($delayedQueue, true, false, [ - 'x-dead-letter-exchange' => $destinationQueue, - 'x-dead-letter-routing-key' => $destinationQueue, - 'x-message-ttl' => $ttl, - 'x-expires' => $ttl * 2, - ]); - $this->bindQueue($destinationQueue, $destinationQueue, $destinationQueue); + $this->declareQueue($destination, true, false, $this->getDelayQueueArguments($this->getQueue($queue), $ttl)); [$message, $correlationId] = $this->createMessage($payload, $attempts); - $this->channel->basic_publish($message, null, $delayedQueue, true, false); + // Publish directly on the delayQueue, no need to publish trough an exchange. + $this->channel->basic_publish($message, null, $destination, true, false); return $correlationId; } /** * {@inheritdoc} + * + * @throws AMQPProtocolChannelException */ public function bulk($jobs, $data = '', $queue = null): void { - $queue = $this->getQueue($queue); + $destination = $this->getQueue($queue); + $exchange = $this->getExchange(); + + // When a exchange is defined and no exchange is present in RabbitMQ, create an exchange. + if ($exchange && ! $this->isExchangeExists($exchange)) { + $this->declareExchange($exchange, $this->getExchangeType()); + } + + // When no exchange is defined, create a queue for amq.direct publishing, but only if it not already present. + if (! $exchange && ! $this->isQueueExists($destination)) { + $this->declareQueue($destination, true, false, $this->getQueueArguments($destination)); + } foreach ((array) $jobs as $job) { [$message] = $this->createMessage( $this->createPayload($job, $queue, $data) ); - $this->declareExchange($queue); - $this->declareQueue($queue, true, false, ['x-max-priority' => 100]); - $this->bindQueue($queue, $queue, $queue); - - $this->channel->batch_basic_publish($message, $queue, $queue); + $this->channel->batch_basic_publish($message, $exchange, $destination); } $this->channel->publish_batch(); @@ -210,9 +249,14 @@ public function pop($queue = null) ); } } catch (AMQPProtocolChannelException $exception) { - // if there is not exchange or queue AMQP will throw exception with code 404 - // we need to catch it and return null + // If there is not exchange or queue AMQP will throw exception with code 404 + // We need to catch it and return null if ($exception->amqp_reply_code === 404) { + + // Because of the channel exception the channel was closed and removed. + // We have to open a new channel. Because else the worker(s) are stuck in a loop, without processing. + $this->channel = $this->connection->channel(); + return null; } @@ -222,22 +266,36 @@ public function pop($queue = null) return null; } + /** + * @return AbstractConnection + */ public function getConnection(): AbstractConnection { return $this->connection; } + /** + * @return AMQPChannel + */ public function getChannel(): AMQPChannel { return $this->channel; } + /** + * Gets a queue/destination, by default the queue option set on the connection. + * @param null $queue + * @return string + */ public function getQueue($queue = null) { return $queue ?: $this->default; } /** + * Checks if the given exchange already present/defined in RabbitMQ. + * Returns false when when the exchange is missing. + * * @param string $exchange * @return bool * @throws AMQPProtocolChannelException @@ -260,11 +318,21 @@ public function isExchangeExists(string $exchange): bool } } + /** + * Declare a exchange in rabbitMQ + * + * @param string $name + * @param string $type + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments + */ public function declareExchange( string $name, string $type = AMQPExchangeType::DIRECT, bool $durable = true, - bool $autoDelete = false + bool $autoDelete = false, + array $arguments = [] ): void { if (in_array($name, $this->exchanges, true)) { return; @@ -277,11 +345,15 @@ public function declareExchange( $durable, $autoDelete, false, - true + true, + new AMQPTable($arguments) ); } /** + * Checks if the given queue already present/defined in RabbitMQ. + * Returns false when when the queue is missing. + * * @param string $name * @return bool * @throws AMQPProtocolChannelException @@ -306,6 +378,14 @@ public function isQueueExists(?string $name = null): bool } } + /** + * Declare a queue in rabbitMQ + * + * @param string $name + * @param bool $durable + * @param bool $autoDelete + * @param array $arguments + */ public function declareQueue(string $name, bool $durable = true, bool $autoDelete = false, array $arguments = []): void { if (in_array($name, $this->queues, true)) { @@ -323,6 +403,11 @@ public function declareQueue(string $name, bool $durable = true, bool $autoDelet ); } + /** + * @param string $queue + * @param string $exchange + * @param string $routingKey + */ public function bindQueue(string $queue, string $exchange, string $routingKey = ''): void { if (in_array( @@ -336,6 +421,9 @@ public function bindQueue(string $queue, string $exchange, string $routingKey = $this->channel->queue_bind($queue, $exchange, $routingKey); } + /** + * @param null $queue + */ public function purge($queue = null): void { // create a temporary channel, so the main channel will not be closed on exception @@ -344,16 +432,32 @@ public function purge($queue = null): void $channel->close(); } + /** + * @param RabbitMQJob $job + */ public function ack(RabbitMQJob $job): void { $this->channel->basic_ack($job->getRabbitMQMessage()->getDeliveryTag()); } + /** + * Reject current Job + * + * @param RabbitMQJob $job + * @param bool $requeue + */ public function reject(RabbitMQJob $job, bool $requeue = false): void { $this->channel->basic_reject($job->getRabbitMQMessage()->getDeliveryTag(), $requeue); } + /** + * Create a AMQP message + * + * @param $payload + * @param int $attempts + * @return array + */ protected function createMessage($payload, int $attempts = 0): array { $properties = [ @@ -363,6 +467,9 @@ protected function createMessage($payload, int $attempts = 0): array if ($correlationId = json_decode($payload, true)['id'] ?? null) { $properties['correlation_id'] = $correlationId; + } + + if ($this->isPrioritizeDelayed()) { $properties['priority'] = $attempts; } @@ -380,6 +487,14 @@ protected function createMessage($payload, int $attempts = 0): array ]; } + /** + * Create a payload array from the given job and data. + * + * @param object|string $job + * @param string $queue + * @param string $data + * @return array + */ protected function createPayloadArray($job, $queue, $data = '') { return array_merge(parent::createPayloadArray($job, $queue, $data), [ @@ -412,4 +527,129 @@ public function close(): void // Ignore the exception } } + + /** + * Get the Queue arguments + * + * @return array + */ + protected function getQueueArguments(string $destination): array + { + $arguments = []; + + if ($this->isPrioritizeDelayed()) { + $arguments['x-max-priority'] = $this->getQueueMaxPriority(); + } + + if ($this->isRerouteFailed()) { + $arguments['x-dead-letter-exchange'] = $this->getFailedExchange() ?? ''; + $arguments['x-dead-letter-routing-key'] = $this->getFailedRoutingKey($destination); + } + + return $arguments; + } + + /** + * Get the Delay queue arguments. + * + * @param string $destination + * @param int $ttl + * @return array + */ + protected function getDelayQueueArguments(string $destination, int $ttl): array + { + return [ + 'x-dead-letter-exchange' => $this->getExchange() ?? '', + 'x-dead-letter-routing-key' => $this->getRoutingKey($destination), + 'x-message-ttl' => $ttl, + 'x-expires' => $ttl * 2, + ]; + } + + /** + * Returns &true;, if delayed messages should be prioritized. + * + * @return bool + */ + protected function isPrioritizeDelayed(): bool + { + return boolval(Arr::get($this->options, 'prioritize_delayed') ?: false); + } + + /** + * Returns a integer with a default of '100' for when using prioritization on delayed messages + * + * @return int + */ + protected function getQueueMaxPriority(): int + { + return intval(Arr::get($this->options, 'queue_max_priority') ?: 100); + } + + /** + * Get the exchange name, or &null; as default value. + * + * @param string $exchange + * @return string|null + */ + protected function getExchange(string $exchange = null): ?string + { + return $exchange ?: Arr::get($this->options, 'exchange') ?: null; + } + + /** + * Get the routing-key for when you use exchanges + * The default routing-key is the given destination + * + * @param string $destination + * @return string + */ + protected function getRoutingKey(string $destination): string + { + return ltrim(sprintf(Arr::get($this->options, 'exchange_routing_key') ?: '%s', $destination), '.'); + } + + /** + * Get the exchangeType, or AMQPExchangeType::DIRECT as default + * + * @param string|null $type + * @return string + */ + protected function getExchangeType(string $type = null): string + { + return @constant(AMQPExchangeType::class.'::'.Str::upper($type ?: Arr::get($this->options, 'exchange_type') ?: 'direct')) ?: AMQPExchangeType::DIRECT; + } + + /** + * Returns &true;, if failed messages should be rerouted. + * + * @return bool + */ + protected function isRerouteFailed(): bool + { + return boolval(Arr::get($this->options, 'reroute_failed') ?: false); + } + + /** + * Get the exchange for failed messages + * + * @param string|null $exchange + * @return string|null + */ + protected function getFailedExchange(string $exchange = null): ?string + { + return $exchange ?: Arr::get($this->options, 'failed_exchange') ?: null; + } + + /** + * Get the routing-key for failed messages + * The default routing-key is the given destination substituted by '.failed' + * + * @param string $destination + * @return string + */ + protected function getFailedRoutingKey(string $destination): string + { + return ltrim(sprintf(Arr::get($this->options, 'failed_routing_key') ?: '%s.failed', $destination), '.'); + } } diff --git a/tests/Feature/TestCase.php b/tests/Feature/TestCase.php index ed688207..3f7092ed 100644 --- a/tests/Feature/TestCase.php +++ b/tests/Feature/TestCase.php @@ -289,4 +289,20 @@ public function testDelete(): void $this->assertSame(0, Queue::size()); $this->assertNull(Queue::pop()); } + + public function testFailed(): void + { + Queue::push(new TestJob()); + + $job = Queue::pop(); + + $job->fail(new \RuntimeException($job->resolveName().' has an exception.')); + + sleep(1); + + $this->assertSame(true, $job->hasFailed()); + $this->assertSame(true, $job->isDeleted()); + $this->assertSame(0, Queue::size()); + $this->assertNull(Queue::pop()); + } } diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php new file mode 100644 index 00000000..e3f1ae75 --- /dev/null +++ b/tests/Functional/RabbitMQQueueTest.php @@ -0,0 +1,156 @@ +connection(); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertInstanceOf(RabbitMQQueue::class, $queue); + } + + public function testRerouteFailed(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callMethod($queue, 'isRerouteFailed')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callMethod($queue, 'isRerouteFailed')); + } + + public function testPrioritizeDelayed(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertTrue($this->callMethod($queue, 'isPrioritizeDelayed')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertFalse($this->callMethod($queue, 'isPrioritizeDelayed')); + } + + public function testQueueMaxPriority(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(100, $this->callMethod($queue, 'getQueueMaxPriority')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(20, $this->callMethod($queue, 'getQueueMaxPriority')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(100, $this->callMethod($queue, 'getQueueMaxPriority')); + } + + public function testExchangeType(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', [''])); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType', ['test'])); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType', ['topic'])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame(AMQPExchangeType::TOPIC, $this->callMethod($queue, 'getExchangeType')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame(AMQPExchangeType::DIRECT, $this->callMethod($queue, 'getExchangeType')); + } + + public function testExchange(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getExchange', ['test'])); + $this->assertNull($this->callMethod($queue, 'getExchange', [''])); + $this->assertNull($this->callMethod($queue, 'getExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertNotNull($this->callMethod($queue, 'getExchange')); + $this->assertSame('application-x', $this->callMethod($queue, 'getExchange')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertNull($this->callMethod($queue, 'getExchange')); + } + + public function testFailedExchange(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getFailedExchange', ['test'])); + $this->assertNull($this->callMethod($queue, 'getExchange', [''])); + $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertNotNull($this->callMethod($queue, 'getFailedExchange')); + $this->assertSame('failed-exchange', $this->callMethod($queue, 'getFailedExchange')); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertNull($this->callMethod($queue, 'getFailedExchange')); + } + + public function testRoutingKey(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + $this->assertSame('', $this->callMethod($queue, 'getRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('process.test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test', $this->callMethod($queue, 'getRoutingKey', ['test'])); + } + + public function testFailedRoutingKey(): void + { + /** @var $queue RabbitMQQueue */ + $queue = $this->connection(); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + $this->assertSame('failed', $this->callMethod($queue, 'getFailedRoutingKey', [''])); + + $queue = $this->connection('rabbitmq-with-options'); + $this->assertSame('application-x.test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + + $queue = $this->connection('rabbitmq-with-options-empty'); + $this->assertSame('test.failed', $this->callMethod($queue, 'getFailedRoutingKey', ['test'])); + } + + public function testQueueArguments(): void + { + $this->assertTrue(true); + } + + public function testDelayQueueArguments(): void + { + $this->assertTrue(true); + } +} diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php new file mode 100644 index 00000000..10dd71b1 --- /dev/null +++ b/tests/Functional/TestCase.php @@ -0,0 +1,146 @@ +set('queue.default', 'rabbitmq'); + $app['config']->set('queue.connections.rabbitmq', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ], + + 'worker' => 'default', + + ]); + $app['config']->set('queue.connections.rabbitmq-with-options', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ], + + 'worker' => 'default', + + 'queue_options' => [ + 'prioritize_delayed' => true, + 'queue_max_priority' => 20, + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + ], + ]); + $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ + 'driver' => 'rabbitmq', + 'queue' => 'order', + 'connection' => AMQPLazyConnection::class, + + 'hosts' => [ + [ + 'host' => getenv('HOST'), + 'port' => getenv('PORT'), + 'vhost' => '/', + 'user' => 'guest', + 'password' => 'guest', + ], + ], + + 'options' => [ + 'ssl_options' => [ + 'cafile' => null, + 'local_cert' => null, + 'local_key' => null, + 'verify_peer' => true, + 'passphrase' => null, + ], + ], + + 'worker' => 'default', + + 'queue_options' => [ + 'prioritize_delayed' => '', + 'queue_max_priority' => '', + 'exchange' => '', + 'exchange_type' => '', + 'exchange_routing_key' => '', + 'reroute_failed' => '', + 'failed_exchange' => '', + 'failed_routing_key' => '', + ], + ]); + } + + /** + * @param $object + * @param string $method + * @param array $parameters + * @return mixed + * @throws \Exception + */ + protected function callMethod($object, string $method, array $parameters = []) + { + try { + $className = get_class($object); + $reflection = new \ReflectionClass($className); + } catch (\ReflectionException $e) { + throw new \Exception($e->getMessage()); + } + + $method = $reflection->getMethod($method); + $method->setAccessible(true); + + return $method->invokeArgs($object, $parameters); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index 5eea441e..7d50fa67 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -50,8 +50,8 @@ protected function getEnvironmentSetUp($app): void ]); } - protected function connection(): RabbitMQQueue + protected function connection(string $name = null): RabbitMQQueue { - return Queue::connection(); + return Queue::connection($name); } } From 8a21c99bb7ba54da3a48f7196e3f27279a7eae79 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Thu, 6 Feb 2020 14:31:54 +0100 Subject: [PATCH 12/18] - Fixed style issues (not showingup in test:style) --- src/Queue/Connectors/RabbitMQConnector.php | 4 ++-- src/Queue/RabbitMQQueue.php | 20 ++++++++++---------- tests/Functional/RabbitMQQueueTest.php | 5 ----- tests/Functional/TestCase.php | 6 ------ 4 files changed, 12 insertions(+), 23 deletions(-) diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 2a17b703..17570354 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -84,7 +84,7 @@ protected function createConnection(array $config): AbstractConnection } /** - * Create a queue for the worker + * Create a queue for the worker. * * @param string $worker * @param AbstractConnection $connection @@ -105,7 +105,7 @@ protected function createQueue(string $worker, AbstractConnection $connection, s } /** - * Recursively filter only null values + * Recursively filter only null values. * * @param array $array * @return array diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index b119ca44..f699488d 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -319,7 +319,7 @@ public function isExchangeExists(string $exchange): bool } /** - * Declare a exchange in rabbitMQ + * Declare a exchange in rabbitMQ. * * @param string $name * @param string $type @@ -379,7 +379,7 @@ public function isQueueExists(?string $name = null): bool } /** - * Declare a queue in rabbitMQ + * Declare a queue in rabbitMQ. * * @param string $name * @param bool $durable @@ -441,7 +441,7 @@ public function ack(RabbitMQJob $job): void } /** - * Reject current Job + * Reject current Job. * * @param RabbitMQJob $job * @param bool $requeue @@ -452,7 +452,7 @@ public function reject(RabbitMQJob $job, bool $requeue = false): void } /** - * Create a AMQP message + * Create a AMQP message. * * @param $payload * @param int $attempts @@ -529,7 +529,7 @@ public function close(): void } /** - * Get the Queue arguments + * Get the Queue arguments. * * @return array */ @@ -577,7 +577,7 @@ protected function isPrioritizeDelayed(): bool } /** - * Returns a integer with a default of '100' for when using prioritization on delayed messages + * Returns a integer with a default of '100' for when using prioritization on delayed messages. * * @return int */ @@ -599,7 +599,7 @@ protected function getExchange(string $exchange = null): ?string /** * Get the routing-key for when you use exchanges - * The default routing-key is the given destination + * The default routing-key is the given destination. * * @param string $destination * @return string @@ -610,7 +610,7 @@ protected function getRoutingKey(string $destination): string } /** - * Get the exchangeType, or AMQPExchangeType::DIRECT as default + * Get the exchangeType, or AMQPExchangeType::DIRECT as default. * * @param string|null $type * @return string @@ -631,7 +631,7 @@ protected function isRerouteFailed(): bool } /** - * Get the exchange for failed messages + * Get the exchange for failed messages. * * @param string|null $exchange * @return string|null @@ -643,7 +643,7 @@ protected function getFailedExchange(string $exchange = null): ?string /** * Get the routing-key for failed messages - * The default routing-key is the given destination substituted by '.failed' + * The default routing-key is the given destination substituted by '.failed'. * * @param string $destination * @return string diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index e3f1ae75..d3f340e2 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -2,13 +2,8 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional; -use Illuminate\Support\Facades\Queue; -use Illuminate\Support\Str; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Exchange\AMQPExchangeType; -use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; -use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional\TestCase as BaseTestCase; class RabbitMQQueueTest extends BaseTestCase diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index 10dd71b1..5d5a663b 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -2,13 +2,7 @@ namespace VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Functional; -use Illuminate\Support\Facades\Queue; -use Illuminate\Support\Str; use PhpAmqpLib\Connection\AMQPLazyConnection; -use PhpAmqpLib\Exception\AMQPProtocolChannelException; -use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob; -use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue; -use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\Mocks\TestJob; use VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase as BaseTestCase; abstract class TestCase extends BaseTestCase From b9263c74354c446776fde18e463804a07dd2ca59 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sat, 8 Feb 2020 17:57:53 +0100 Subject: [PATCH 13/18] - Altered the config structure --- src/Queue/Connectors/RabbitMQConnector.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index 17570354..b6d76766 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -38,13 +38,13 @@ public function __construct(Dispatcher $dispatcher) */ public function connect(array $config): Queue { - $connection = $this->createConnection($config); + $connection = $this->createConnection(Arr::except($config, 'options.queue')); $queue = $this->createQueue( Arr::get($config, 'worker', 'default'), $connection, $config['queue'], - Arr::get($config, 'queue_options', []) + Arr::get($config, 'options.queue', []) ); if (! $queue instanceof RabbitMQQueue) { @@ -72,13 +72,11 @@ protected function createConnection(array $config): AbstractConnection /** @var AbstractConnection $connection */ $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); - $hosts = Arr::shuffle(Arr::get($config, 'hosts', [])); - // manually disable heartbeat so long-running tasks will not fail - $config['options']['heartbeat'] = 0; + Arr::set($config,'options.heartbeat', 0); return $connection::create_connection( - $hosts, + Arr::shuffle(Arr::get($config, 'hosts', [])), $this->filter(Arr::get($config, 'options', [])) ); } From 8cb75367d4378540c9eb72edc714f9d8651facbf Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sat, 8 Feb 2020 17:58:53 +0100 Subject: [PATCH 14/18] - Updated the readMe --- README.md | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index b99ce790..6d516b94 100644 --- a/README.md +++ b/README.md @@ -76,9 +76,10 @@ Add connection to `config/queue.php`: ### Optional Config Optionally add queue options to the config of a connection. -Every queue created with this connection, get's these properties. +Every queue created for this connection, get's the properties. When you want to prioritize messages when they were delayed, then this is possible by adding extra options. +- When max-priority is omitted, the max priority is set with 100. ```php 'connections' => [ @@ -87,12 +88,13 @@ When you want to prioritize messages when they were delayed, then this is possib 'rabbitmq' => [ // ... - 'queue_options' => [ - // ... - - 'prioritize_delayed_messages' => false, - 'queue_max_priority' => 100, + 'options' => [ + 'queue' => [ + // ... + 'prioritize_delayed_messages' => false, + 'queue_max_priority' => 100, + ], ], ], @@ -114,13 +116,14 @@ When you want to publish messages against an exchange with routing-key's, then t 'rabbitmq' => [ // ... - 'queue_options' => [ - // ... - - 'exchange' => 'application-x', - 'exchange_type' => 'topic', - 'exchange_routing_key' => '', + 'options' => [ + 'queue' => [ + // ... + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => '', + ], ], ], @@ -143,13 +146,14 @@ When you want to instruct RabbitMQ to reroute failed messages to a exchange or a 'rabbitmq' => [ // ... - 'queue_options' => [ - // ... - - 'reroute_failed' => true, - 'failed_exchange' => 'failed-exchange', - 'failed_routing_key' => 'application-x.%s', + 'options' => [ + 'queue' => [ + // ... + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s', + ], ], ], From 0ddfd3ead2c5914f6450d7672c317ba49438248d Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sat, 8 Feb 2020 18:01:30 +0100 Subject: [PATCH 15/18] - Fixed style --- src/Queue/Connectors/RabbitMQConnector.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queue/Connectors/RabbitMQConnector.php b/src/Queue/Connectors/RabbitMQConnector.php index b6d76766..a3b44b98 100644 --- a/src/Queue/Connectors/RabbitMQConnector.php +++ b/src/Queue/Connectors/RabbitMQConnector.php @@ -73,7 +73,7 @@ protected function createConnection(array $config): AbstractConnection $connection = Arr::get($config, 'connection', AMQPLazyConnection::class); // manually disable heartbeat so long-running tasks will not fail - Arr::set($config,'options.heartbeat', 0); + Arr::set($config, 'options.heartbeat', 0); return $connection::create_connection( Arr::shuffle(Arr::get($config, 'hosts', [])), From 8703cae99e6e3a1573b76e1cded9e2b39a48122c Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sat, 8 Feb 2020 18:12:13 +0100 Subject: [PATCH 16/18] - Fixed Tests (bump) --- tests/Functional/TestCase.php | 42 ++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/tests/Functional/TestCase.php b/tests/Functional/TestCase.php index 5d5a663b..b7f5b48b 100644 --- a/tests/Functional/TestCase.php +++ b/tests/Functional/TestCase.php @@ -61,20 +61,21 @@ protected function getEnvironmentSetUp($app): void 'verify_peer' => true, 'passphrase' => null, ], + + 'queue' => [ + 'prioritize_delayed' => true, + 'queue_max_priority' => 20, + 'exchange' => 'application-x', + 'exchange_type' => 'topic', + 'exchange_routing_key' => 'process.%s', + 'reroute_failed' => true, + 'failed_exchange' => 'failed-exchange', + 'failed_routing_key' => 'application-x.%s.failed', + ], ], 'worker' => 'default', - 'queue_options' => [ - 'prioritize_delayed' => true, - 'queue_max_priority' => 20, - 'exchange' => 'application-x', - 'exchange_type' => 'topic', - 'exchange_routing_key' => 'process.%s', - 'reroute_failed' => true, - 'failed_exchange' => 'failed-exchange', - 'failed_routing_key' => 'application-x.%s.failed', - ], ]); $app['config']->set('queue.connections.rabbitmq-with-options-empty', [ 'driver' => 'rabbitmq', @@ -99,20 +100,21 @@ protected function getEnvironmentSetUp($app): void 'verify_peer' => true, 'passphrase' => null, ], + + 'queue' => [ + 'prioritize_delayed' => '', + 'queue_max_priority' => '', + 'exchange' => '', + 'exchange_type' => '', + 'exchange_routing_key' => '', + 'reroute_failed' => '', + 'failed_exchange' => '', + 'failed_routing_key' => '', + ], ], 'worker' => 'default', - 'queue_options' => [ - 'prioritize_delayed' => '', - 'queue_max_priority' => '', - 'exchange' => '', - 'exchange_type' => '', - 'exchange_routing_key' => '', - 'reroute_failed' => '', - 'failed_exchange' => '', - 'failed_routing_key' => '', - ], ]); } From 4e129f69f4e20ce47264fa26583ca11401427ab4 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 9 Feb 2020 17:43:51 +0100 Subject: [PATCH 17/18] - Altered the default priority levels when using this feature --- README.md | 4 ++-- src/Queue/RabbitMQQueue.php | 10 ++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 6d516b94..7db1d753 100644 --- a/README.md +++ b/README.md @@ -79,7 +79,7 @@ Optionally add queue options to the config of a connection. Every queue created for this connection, get's the properties. When you want to prioritize messages when they were delayed, then this is possible by adding extra options. -- When max-priority is omitted, the max priority is set with 100. +- When max-priority is omitted, the max priority is set with 2 when used. ```php 'connections' => [ @@ -93,7 +93,7 @@ When you want to prioritize messages when they were delayed, then this is possib // ... 'prioritize_delayed_messages' => false, - 'queue_max_priority' => 100, + 'queue_max_priority' => 10, ], ], ], diff --git a/src/Queue/RabbitMQQueue.php b/src/Queue/RabbitMQQueue.php index f699488d..5092ac61 100644 --- a/src/Queue/RabbitMQQueue.php +++ b/src/Queue/RabbitMQQueue.php @@ -537,6 +537,9 @@ protected function getQueueArguments(string $destination): array { $arguments = []; + // Messages without a priority property are treated as if their priority were 0. + // Messages with a priority which is higher than the queue's maximum, are treated as if they were + // published with the maximum priority. if ($this->isPrioritizeDelayed()) { $arguments['x-max-priority'] = $this->getQueueMaxPriority(); } @@ -577,13 +580,16 @@ protected function isPrioritizeDelayed(): bool } /** - * Returns a integer with a default of '100' for when using prioritization on delayed messages. + * Returns a integer with a default of '2' for when using prioritization on delayed messages. + * If priority queues are desired, we recommend using between 1 and 10. + * Using more priority layers, will consume more CPU resources and would affect runtimes. * + * @see https://www.rabbitmq.com/priority.html * @return int */ protected function getQueueMaxPriority(): int { - return intval(Arr::get($this->options, 'queue_max_priority') ?: 100); + return intval(Arr::get($this->options, 'queue_max_priority') ?: 2); } /** From 10087038106e25e5caaf7cff65994450e49c2a25 Mon Sep 17 00:00:00 2001 From: Emiel Bom Date: Sun, 9 Feb 2020 18:16:34 +0100 Subject: [PATCH 18/18] - Fixed test for getQueueMaxPriority() --- tests/Functional/RabbitMQQueueTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Functional/RabbitMQQueueTest.php b/tests/Functional/RabbitMQQueueTest.php index d3f340e2..4781e19d 100644 --- a/tests/Functional/RabbitMQQueueTest.php +++ b/tests/Functional/RabbitMQQueueTest.php @@ -52,7 +52,7 @@ public function testQueueMaxPriority(): void /** @var $queue RabbitMQQueue */ $queue = $this->connection(); $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(100, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); $queue = $this->connection('rabbitmq-with-options'); $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); @@ -60,7 +60,7 @@ public function testQueueMaxPriority(): void $queue = $this->connection('rabbitmq-with-options-empty'); $this->assertIsInt($this->callMethod($queue, 'getQueueMaxPriority')); - $this->assertSame(100, $this->callMethod($queue, 'getQueueMaxPriority')); + $this->assertSame(2, $this->callMethod($queue, 'getQueueMaxPriority')); } public function testExchangeType(): void