Skip to content

Commit

Permalink
bug #31748 [Messenger] Inject RoutableMessageBus instead of bus locat…
Browse files Browse the repository at this point in the history
…or (chalasr)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Inject RoutableMessageBus instead of bus locator

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #31741
| License       | MIT
| Doc PR        | n/a

Commits
-------

91817e4 [Messenger] Inject RoutableMessageBus instead of bus locator
  • Loading branch information
fabpot committed May 31, 2019
2 parents 1318d3b + 91817e4 commit 6c93002
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 17 deletions.
Expand Up @@ -82,7 +82,7 @@
</service>

<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument /> <!-- Message bus locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument type="collection" /> <!-- Receiver names -->
Expand Down
20 changes: 12 additions & 8 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -40,7 +40,7 @@ class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume';

private $busLocator;
private $routableBus;
private $receiverLocator;
private $logger;
private $receiverNames;
Expand All @@ -49,15 +49,23 @@ class ConsumeMessagesCommand extends Command
/** @var CacheItemPoolInterface|null */
private $restartSignalCachePool;

public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
/**
* @param RoutableMessageBus $routableBus
*/
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
{
// to be deprecated in 4.4
if ($routableBus instanceof ContainerInterface) {
$routableBus = new RoutableMessageBus($routableBus);
}

if (\is_array($retryStrategyLocator)) {
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);

$retryStrategyLocator = null;
}

$this->busLocator = $busLocator;
$this->routableBus = $routableBus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
Expand Down Expand Up @@ -177,11 +185,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
}

if (null !== $input->getOption('bus')) {
$bus = $this->busLocator->get($input->getOption('bus'));
} else {
$bus = new RoutableMessageBus($this->busLocator);
}
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;

$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
$stopsWhen = [];
Expand Down
Expand Up @@ -260,7 +260,6 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)

if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$container->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(3, array_values($receiverNames));
}

Expand Down
15 changes: 8 additions & 7 deletions src/Symfony/Component/Messenger/RoutableMessageBus.php
Expand Up @@ -45,11 +45,6 @@ public function dispatch($envelope, array $stamps = []): Envelope
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
}

return $this->getMessageBus($envelope)->dispatch($envelope, $stamps);
}

private function getMessageBus(Envelope $envelope): MessageBusInterface
{
/** @var BusNameStamp|null $busNameStamp */
$busNameStamp = $envelope->last(BusNameStamp::class);

Expand All @@ -58,11 +53,17 @@ private function getMessageBus(Envelope $envelope): MessageBusInterface
throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.'));
}

return $this->fallbackBus;
return $this->fallbackBus->dispatch($envelope, $stamps);
}

$busName = $busNameStamp->getBusName();
return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps);
}

/**
* @internal
*/
public function getMessageBus(string $busName): MessageBusInterface
{
if (!$this->busLocator->has($busName)) {
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
}
Expand Down
Expand Up @@ -12,8 +12,16 @@
namespace Symfony\Component\Messenger\Tests\Command;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

class ConsumeMessagesCommandTest extends TestCase
{
Expand All @@ -24,4 +32,134 @@ public function testConfigurationWithDefaultReceiver()
$this->assertFalse($inputArgument->isRequired());
$this->assertSame(['amqp'], $inputArgument->getDefault());
}

public function testBasicRun()
{
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);

$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');

$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);

$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);

$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--limit' => 1,
]);

$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}

public function testRunWithBusOption()
{
$envelope = new Envelope(new \stdClass());

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);

$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');

$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);

$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);

$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--bus' => 'dummy-bus',
'--limit' => 1,
]);

$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}

public function testBasicRunWithBusLocator()
{
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);

$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');

$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);

$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);

$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--limit' => 1,
]);

$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}

public function testRunWithBusOptionAndBusLocator()
{
$envelope = new Envelope(new \stdClass());

$receiver = $this->createMock(ReceiverInterface::class);
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);

$receiverLocator = $this->createMock(ContainerInterface::class);
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);

$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch');

$busLocator = $this->createMock(ContainerInterface::class);
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);

$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);

$application = new Application();
$application->add($command);
$tester = new CommandTester($application->get('messenger:consume'));
$tester->execute([
'receivers' => ['dummy-receiver'],
'--bus' => 'dummy-bus',
'--limit' => 1,
]);

$this->assertSame(0, $tester->getStatusCode());
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
}
}

0 comments on commit 6c93002

Please sign in to comment.