Skip to content

Commit

Permalink
feature #28190 [Messenger] Add a --bus option to the messenger:consum…
Browse files Browse the repository at this point in the history
…e-messages command (chalasr, sroze)

This PR was merged into the 4.2-dev branch.

Discussion
----------

[Messenger] Add a --bus option to the messenger:consume-messages command

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | n/a
| License       | MIT
| Doc PR        | todo

Making it compatible with the multi-bus feature.

Commits
-------

e3f1eec Bus argument is a required option when multiple buses are defined
539cb62 [Messenger] Add a --bus option to the messenger:consume-messages command
  • Loading branch information
fabpot committed Aug 29, 2018
2 parents bedd7aa + e3f1eec commit 6c539e1
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 21 deletions.
Expand Up @@ -70,10 +70,11 @@
</service>

<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
<argument type="service" id="message_bus" />
<argument /> <!-- Message bus locator -->
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument>null</argument> <!-- Default receiver name -->
<argument type="collection" /> <!-- Message bus names -->

<tag name="console.command" command="messenger:consume-messages" />
</service>
Expand Down
3 changes: 2 additions & 1 deletion src/Symfony/Bundle/FrameworkBundle/composer.json
Expand Up @@ -41,7 +41,7 @@
"symfony/security": "~3.4|~4.0",
"symfony/form": "^4.1",
"symfony/expression-language": "~3.4|~4.0",
"symfony/messenger": "^4.1",
"symfony/messenger": "^4.2",
"symfony/process": "~3.4|~4.0",
"symfony/security-core": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
Expand All @@ -67,6 +67,7 @@
"symfony/asset": "<3.4",
"symfony/console": "<3.4",
"symfony/form": "<4.1",
"symfony/messenger": "<4.2",
"symfony/property-info": "<3.4",
"symfony/serializer": "<4.1",
"symfony/stopwatch": "<3.4",
Expand Down
2 changes: 2 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -6,3 +6,5 @@ CHANGELOG

* `ValidationMiddleware::handle()` and `SendMessageMiddleware::handle()` now require an `Envelope` object
* `EnvelopeItemInterface` doesn't extend `Serializable` anymore
* [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
as first constructor argument
55 changes: 40 additions & 15 deletions src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Expand Up @@ -20,7 +20,6 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
Expand All @@ -35,17 +34,19 @@ class ConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume-messages';

private $bus;
private $busLocator;
private $receiverLocator;
private $logger;
private $receiverNames;
private $busNames;

public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array())
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array(), array $busNames = array())
{
$this->bus = $bus;
$this->busLocator = $busLocator;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->receiverNames = $receiverNames;
$this->busNames = $busNames;

parent::__construct();
}
Expand All @@ -56,13 +57,15 @@ public function __construct(MessageBusInterface $bus, ContainerInterface $receiv
protected function configure(): void
{
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;

$this
->setDefinition(array(
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
Expand Down Expand Up @@ -91,18 +94,35 @@ protected function configure(): void
*/
protected function interact(InputInterface $input, OutputInterface $output)
{
if (!$this->receiverNames || $this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
return;
$style = new SymfonyStyle($input, $output);

if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
if (null === $receiverName) {
$style->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
}
}
}

$style = new SymfonyStyle($input, $output);
if (null === $receiverName) {
$style->block('Missing receiver argument.', null, 'error', ' ', true);
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
$input->setArgument('receiver', $alternatives[0]);
$busName = $input->getOption('bus');
if ($this->busNames && !$this->busLocator->has($busName)) {
if (null === $busName) {
$style->block('Missing bus argument.', null, 'error', ' ', true);
$input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames));
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);

if (1 === \count($alternatives)) {
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
$input->setOption('bus', $alternatives[0]);
}
} else {
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
}
}
}
}
Expand All @@ -116,7 +136,12 @@ protected function execute(InputInterface $input, OutputInterface $output): void
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}

if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}

$receiver = $this->receiverLocator->get($receiverName);
$bus = $this->busLocator->get($busName);

if ($limit = $input->getOption('limit')) {
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
Expand All @@ -130,7 +155,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
}

$worker = new Worker($receiver, $this->bus);
$worker = new Worker($receiver, $bus);
$worker->run();
}

Expand Down
Expand Up @@ -207,6 +207,17 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
}
$container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping);
}

if ($container->hasDefinition('console.command.messenger_consume_messages')) {
$buses = array();
foreach ($busIds as $busId) {
$buses[$busId] = new Reference($busId);
}
$container
->getDefinition('console.command.messenger_consume_messages')
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
->replaceArgument(4, $busIds);
}
}

private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable
Expand Down
Expand Up @@ -14,21 +14,20 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\MessageBus;

class ConsumeMessagesCommandTest extends TestCase
{
public function testConfigurationWithDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertFalse($inputArgument->isRequired());
$this->assertSame('amqp', $inputArgument->getDefault());
}

public function testConfigurationWithoutDefaultReceiver()
{
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
$inputArgument = $command->getDefinition()->getArgument('receiver');
$this->assertTrue($inputArgument->isRequired());
$this->assertNull($inputArgument->getDefault());
Expand Down
Expand Up @@ -241,10 +241,11 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
{
$container = $this->getContainerBuilder();
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments(array(
new Reference('message_bus'),
null,
new Reference('messenger.receiver_locator'),
null,
null,
null,
));

$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
Expand All @@ -253,6 +254,7 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
(new MessengerPass())->process($container);

$this->assertSame(array('amqp', 'dummy'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
$this->assertSame(array('message_bus'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
}

public function testItRegistersSenders()
Expand Down

0 comments on commit 6c539e1

Please sign in to comment.