From 0f15306d615d3215366fc43713b3c1822e464cd3 Mon Sep 17 00:00:00 2001 From: Tobias Schultze Date: Fri, 14 Jun 2019 02:13:58 +0200 Subject: [PATCH] [Messenger] fix delay delivery for non-fanout exchanges also fix dsn parsing of plain amqp:// uri --- .../Transport/AmqpExt/ConnectionTest.php | 38 ++++++++----- .../Transport/AmqpExt/Connection.php | 53 ++++++++++--------- 2 files changed, 54 insertions(+), 37 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 549ff1f8ec45..9462529b9431 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -25,14 +25,14 @@ class ConnectionTest extends TestCase { /** * @expectedException \InvalidArgumentException - * @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid. + * @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid. */ public function testItCannotBeConstructedWithAWrongDsn() { - Connection::fromDsn('amqp://'); + Connection::fromDsn('amqp://:'); } - public function testItGetsParametersFromTheDsn() + public function testItCanBeConstructedWithDefaults() { $this->assertEquals( new Connection([ @@ -44,7 +44,23 @@ public function testItGetsParametersFromTheDsn() ], [ 'messages' => [], ]), - Connection::fromDsn('amqp://localhost/%2f/messages') + Connection::fromDsn('amqp://') + ); + } + + public function testItGetsParametersFromTheDsn() + { + $this->assertEquals( + new Connection([ + 'host' => 'host', + 'port' => 5672, + 'vhost' => '/', + ], [ + 'name' => 'custom', + ], [ + 'custom' => [], + ]), + Connection::fromDsn('amqp://host/%2f/custom') ); } @@ -52,9 +68,9 @@ public function testOverrideOptionsViaQueryParameters() { $this->assertEquals( new Connection([ - 'host' => 'redis', + 'host' => 'localhost', 'port' => 1234, - 'vhost' => '/', + 'vhost' => 'vhost', 'login' => 'guest', 'password' => 'password', ], [ @@ -62,7 +78,7 @@ public function testOverrideOptionsViaQueryParameters() ], [ 'queueName' => [], ]), - Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]') + Connection::fromDsn('amqp://guest:password@localhost:1234/vhost/queue?exchange[name]=exchangeName&queues[queueName]') ); } @@ -70,18 +86,16 @@ public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() { $this->assertEquals( new Connection([ - 'host' => 'redis', - 'port' => 1234, + 'host' => 'localhost', + 'port' => 5672, 'vhost' => '/', - 'login' => 'guest', - 'password' => 'password', 'persistent' => 'true', ], [ 'name' => 'exchangeName', ], [ 'queueName' => [], ]), - Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [ + Connection::fromDsn('amqp://localhost/%2f/queue?exchange[name]=exchangeName&queues[queueName]', [ 'persistent' => 'true', 'exchange' => ['name' => 'toBeOverwritten'], ]) diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 721949c06953..e35cbaa1e396 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -58,8 +58,22 @@ class Connection */ private $amqpDelayExchange; + public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null) + { + $this->connectionOptions = array_replace_recursive([ + 'delay' => [ + 'routing_key_pattern' => 'delay_%routing_key%_%delay%', + 'exchange_name' => 'delay', + 'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%', + ], + ], $connectionOptions); + $this->exchangeOptions = $exchangeOptions; + $this->queuesOptions = $queuesOptions; + $this->amqpFactory = $amqpFactory ?: new AmqpFactory(); + } + /** - * Constructor. + * Creates a connection based on the DSN and options. * * Available options: * @@ -81,29 +95,19 @@ class Connection * * delay: * * routing_key_pattern: The pattern of the routing key (Default: "delay_%routing_key%_%delay%") * * queue_name_pattern: Pattern to use to create the queues (Default: "delay_queue_%routing_key%_%delay%") - * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry") + * * exchange_name: Name of the exchange to be used for the retried messages (Default: "delay") * * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true) - * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) * * prefetch_count: set channel prefetch count */ - public function __construct(array $connectionOptions, array $exchangeOptions, array $queuesOptions, AmqpFactory $amqpFactory = null) - { - $this->connectionOptions = array_replace_recursive([ - 'delay' => [ - 'routing_key_pattern' => 'delay_%routing_key%_%delay%', - 'exchange_name' => 'delay', - 'queue_name_pattern' => 'delay_queue_%routing_key%_%delay%', - ], - ], $connectionOptions); - $this->exchangeOptions = $exchangeOptions; - $this->queuesOptions = $queuesOptions; - $this->amqpFactory = $amqpFactory ?: new AmqpFactory(); - } - public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self { if (false === $parsedUrl = parse_url($dsn)) { - throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn)); + // this is a valid URI that parse_url cannot handle when you want to pass all parameters as options + if ('amqp://' !== $dsn) { + throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn)); + } + + $parsedUrl = []; } $pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : []; @@ -275,18 +279,17 @@ private function createDelayQueue(int $delay, ?string $routingKey) $queue = $this->amqpFactory->createQueue($this->channel()); $queue->setName(str_replace( ['%delay%', '%routing_key%'], - [$delay, $routingKey ?: ''], + [$delay, $routingKey ?? ''], $this->connectionOptions['delay']['queue_name_pattern'] - )); + )); $queue->setArguments([ 'x-message-ttl' => $delay, 'x-dead-letter-exchange' => $this->exchange()->getName(), ]); - if (null !== $routingKey) { - // after being released from to DLX, this routing key will be used - $queue->setArgument('x-dead-letter-routing-key', $routingKey); - } + // after being released from to DLX, make sure the original routing key will be used + // we must use an empty string instead of null for the argument to be picked up + $queue->setArgument('x-dead-letter-routing-key', $routingKey ?? ''); return $queue; } @@ -295,7 +298,7 @@ private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): st { return str_replace( ['%delay%', '%routing_key%'], - [$delay, $finalRoutingKey ?: ''], + [$delay, $finalRoutingKey ?? ''], $this->connectionOptions['delay']['routing_key_pattern'] ); }