From 91817e4de0e4f046c7162df3bd2d9cfe277965f0 Mon Sep 17 00:00:00 2001 From: Robin Chalas Date: Thu, 30 May 2019 20:39:48 +0200 Subject: [PATCH] [Messenger] Inject RoutableMessageBus instead of bus locator --- .../Resources/config/console.xml | 2 +- .../Command/ConsumeMessagesCommand.php | 20 ++- .../DependencyInjection/MessengerPass.php | 1 - .../Messenger/RoutableMessageBus.php | 15 +- .../Command/ConsumeMessagesCommandTest.php | 138 ++++++++++++++++++ 5 files changed, 159 insertions(+), 17 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml index 46c103cee467..ebd7d6ce46a6 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml @@ -82,7 +82,7 @@ - + diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 0e91cfc79555..be6f4c1733b2 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -40,7 +40,7 @@ class ConsumeMessagesCommand extends Command { protected static $defaultName = 'messenger:consume'; - private $busLocator; + private $routableBus; private $receiverLocator; private $logger; private $receiverNames; @@ -49,15 +49,23 @@ class ConsumeMessagesCommand extends Command /** @var CacheItemPoolInterface|null */ private $restartSignalCachePool; - public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) + /** + * @param RoutableMessageBus $routableBus + */ + public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null) { + // to be deprecated in 4.4 + if ($routableBus instanceof ContainerInterface) { + $routableBus = new RoutableMessageBus($routableBus); + } + if (\is_array($retryStrategyLocator)) { @trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED); $retryStrategyLocator = null; } - $this->busLocator = $busLocator; + $this->routableBus = $routableBus; $this->receiverLocator = $receiverLocator; $this->logger = $logger; $this->receiverNames = $receiverNames; @@ -177,11 +185,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null; } - if (null !== $input->getOption('bus')) { - $bus = $this->busLocator->get($input->getOption('bus')); - } else { - $bus = new RoutableMessageBus($this->busLocator); - } + $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; $worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger); $stopsWhen = []; diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index bc5f0290c337..ce7abe1d25fa 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -260,7 +260,6 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) if ($container->hasDefinition('console.command.messenger_consume_messages')) { $container->getDefinition('console.command.messenger_consume_messages') - ->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses)) ->replaceArgument(3, array_values($receiverNames)); } diff --git a/src/Symfony/Component/Messenger/RoutableMessageBus.php b/src/Symfony/Component/Messenger/RoutableMessageBus.php index c0f7eeca1b9a..3af52308904f 100644 --- a/src/Symfony/Component/Messenger/RoutableMessageBus.php +++ b/src/Symfony/Component/Messenger/RoutableMessageBus.php @@ -45,11 +45,6 @@ public function dispatch($envelope, array $stamps = []): Envelope throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope'); } - return $this->getMessageBus($envelope)->dispatch($envelope, $stamps); - } - - private function getMessageBus(Envelope $envelope): MessageBusInterface - { /** @var BusNameStamp|null $busNameStamp */ $busNameStamp = $envelope->last(BusNameStamp::class); @@ -58,11 +53,17 @@ private function getMessageBus(Envelope $envelope): MessageBusInterface throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.')); } - return $this->fallbackBus; + return $this->fallbackBus->dispatch($envelope, $stamps); } - $busName = $busNameStamp->getBusName(); + return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps); + } + /** + * @internal + */ + public function getMessageBus(string $busName): MessageBusInterface + { if (!$this->busLocator->has($busName)) { throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName)); } diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index e7ce90b85c0b..3191c65b644a 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -12,8 +12,16 @@ namespace Symfony\Component\Messenger\Tests\Command; use PHPUnit\Framework\TestCase; +use Symfony\Component\Console\Application; +use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\DependencyInjection\ContainerInterface; use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\RoutableMessageBus; +use Symfony\Component\Messenger\Stamp\BusNameStamp; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class ConsumeMessagesCommandTest extends TestCase { @@ -24,4 +32,134 @@ public function testConfigurationWithDefaultReceiver() $this->assertFalse($inputArgument->isRequired()); $this->assertSame(['amqp'], $inputArgument->getDefault()); } + + public function testBasicRun() + { + $envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]); + + $receiver = $this->createMock(ReceiverInterface::class); + $receiver->expects($this->once())->method('get')->willReturn([$envelope]); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--limit' => 1, + ]); + + $this->assertSame(0, $tester->getStatusCode()); + $this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay()); + } + + public function testRunWithBusOption() + { + $envelope = new Envelope(new \stdClass()); + + $receiver = $this->createMock(ReceiverInterface::class); + $receiver->expects($this->once())->method('get')->willReturn([$envelope]); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--bus' => 'dummy-bus', + '--limit' => 1, + ]); + + $this->assertSame(0, $tester->getStatusCode()); + $this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay()); + } + + public function testBasicRunWithBusLocator() + { + $envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]); + + $receiver = $this->createMock(ReceiverInterface::class); + $receiver->expects($this->once())->method('get')->willReturn([$envelope]); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand($busLocator, $receiverLocator); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--limit' => 1, + ]); + + $this->assertSame(0, $tester->getStatusCode()); + $this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay()); + } + + public function testRunWithBusOptionAndBusLocator() + { + $envelope = new Envelope(new \stdClass()); + + $receiver = $this->createMock(ReceiverInterface::class); + $receiver->expects($this->once())->method('get')->willReturn([$envelope]); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand($busLocator, $receiverLocator); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--bus' => 'dummy-bus', + '--limit' => 1, + ]); + + $this->assertSame(0, $tester->getStatusCode()); + $this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay()); + } }