Skip to content

Commit

Permalink
Merge branch '4.1'
Browse files Browse the repository at this point in the history
* 4.1:
  [Messenger][DX] Uses a default receiver when only one is defined
  fix deps
  [Profiler] Join using ';\n'
  Rename the command `DebugCommand`
  [Messenger] Add debug:messenger CLI command
  [Messenger] Fix default bus name
  Fix the transport factory after moving it
  [Messenger] Fix AMQP Transport factory & TransportFactoryInterface
  Fix AmqpTransport
  [Profiler] Fix dump makes toolbar disappear
  Improved exception on invalid message type
  bumped Symfony version to 4.1.0
  updated VERSION for 4.1.0-BETA1
  updated CHANGELOG for 4.1.0-BETA1
  [Messenger] Add TransportInterface as first class citizen sender+receiver
  • Loading branch information
sroze committed May 9, 2018
2 parents d264fce + e0f225b commit cb2a77b
Show file tree
Hide file tree
Showing 23 changed files with 518 additions and 96 deletions.
196 changes: 196 additions & 0 deletions CHANGELOG-4.1.md

Large diffs are not rendered by default.

Expand Up @@ -1047,7 +1047,7 @@ function ($a) {
->end()
->scalarNode('default_bus')->defaultValue(null)->end()
->arrayNode('buses')
->defaultValue(array('default' => array('default_middleware' => true, 'middleware' => array())))
->defaultValue(array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())))
->useAttributeAsKey('name')
->prototype('array')
->addDefaultsIfNotSet()
Expand Down
Expand Up @@ -65,6 +65,7 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
Expand Down Expand Up @@ -272,6 +273,7 @@ public function load(array $configs, ContainerBuilder $container)
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer'], $config['validation']);
} else {
$container->removeDefinition('console.command.messenger_consume_messages');
$container->removeDefinition('console.command.messenger_debug');
}

if ($this->isConfigEnabled($container, $config['web_link'])) {
Expand Down Expand Up @@ -1506,19 +1508,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
}

$senderDefinition = (new Definition(SenderInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createSender'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.sender.'.$name, $senderDefinition);

$receiverDefinition = (new Definition(ReceiverInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver'))
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('name' => $name))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.receiver.'.$name, $receiverDefinition);
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
}
}

Expand Down
Expand Up @@ -73,10 +73,16 @@
<argument type="service" id="message_bus" />
<argument type="service" id="messenger.receiver_locator" />
<argument type="service" id="logger" on-invalid="null" />
<argument>null</argument> <!-- Default receiver name -->

<tag name="console.command" command="messenger:consume-messages" />
</service>

<service id="console.command.messenger_debug" class="Symfony\Component\Messenger\Command\DebugCommand">
<argument type="collection" /> <!-- Message to handlers mapping -->
<tag name="console.command" command="debug:messenger" />
</service>

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<tag name="console.command" command="debug:router" />
Expand Down
Expand Up @@ -60,7 +60,7 @@
</service>

<!-- transports -->
<service id="messenger.transport_factory" class="Symfony\Component\Messenger\Transport\Factory\ChainTransportFactory">
<service id="messenger.transport_factory" class="Symfony\Component\Messenger\Transport\TransportFactory">
<argument type="tagged" tag="messenger.transport_factory" />
</service>

Expand Down
Expand Up @@ -263,7 +263,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'encoder' => 'messenger.transport.serializer',
'decoder' => 'messenger.transport.serializer',
'default_bus' => null,
'buses' => array('default' => array('default_middleware' => true, 'middleware' => array())),
'buses' => array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())),
),
);
}
Expand Down
Expand Up @@ -533,30 +533,20 @@ public function testMessenger()
public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver'));

$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));

$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory);
$this->assertCount(2, $transportArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Bundle/FrameworkBundle/composer.json
Expand Up @@ -41,7 +41,7 @@
"symfony/security": "~3.4|~4.0",
"symfony/form": "^4.1",
"symfony/expression-language": "~3.4|~4.0",
"symfony/messenger": "^4.1",
"symfony/messenger": "^4.1-beta2",
"symfony/process": "~3.4|~4.0",
"symfony/security-core": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
Expand Down
Expand Up @@ -418,12 +418,10 @@
'{{ path("_wdt", { "token": "xxxxxx" }) }}'.replace(/xxxxxx/, newToken),
function(xhr, el) {
/* Evaluate embedded scripts inside the toolbar */
var i, scripts = [].slice.call(el.querySelectorAll('script'));
for (i = 0; i < scripts.length; ++i) {
eval(scripts[i].firstChild.nodeValue);
}
/* Evaluate in global scope scripts embedded inside the toolbar */
eval.call({}, ([].slice.call(el.querySelectorAll('script')).map(function (script) {
return script.firstChild.nodeValue;
}).join(';\n')));
el.style.display = -1 !== xhr.responseText.indexOf('sf-toolbarreset') ? 'block' : 'none';
Expand All @@ -442,7 +440,7 @@
}
/* Handle toolbar-info position */
var toolbarBlocks = [].slice.call(el.querySelectorAll('.sf-toolbar-block'));
var i, toolbarBlocks = [].slice.call(el.querySelectorAll('.sf-toolbar-block'));
for (i = 0; i < toolbarBlocks.length; ++i) {
toolbarBlocks[i].onmouseover = function () {
var toolbarInfo = this.querySelectorAll('.sf-toolbar-info')[0];
Expand Down
Expand Up @@ -37,14 +37,16 @@ class ConsumeMessagesCommand extends Command
private $bus;
private $receiverLocator;
private $logger;
private $defaultReceiverName;

public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null)
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, string $defaultReceiverName = null)
{
parent::__construct();

$this->bus = $bus;
$this->receiverLocator = $receiverLocator;
$this->logger = $logger;
$this->defaultReceiverName = $defaultReceiverName;
}

/**
Expand All @@ -54,7 +56,7 @@ protected function configure(): void
{
$this
->setDefinition(array(
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputArgument('receiver', $this->defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $this->defaultReceiverName),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
Expand Down
81 changes: 81 additions & 0 deletions src/Symfony/Component/Messenger/Command/DebugCommand.php
@@ -0,0 +1,81 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

/**
* A console command to debug Messenger information.
*
* @author Roland Franssen <franssen.roland@gmail.com>
*
* @experimental in 4.1
*/
class DebugCommand extends Command
{
protected static $defaultName = 'debug:messenger';

private $mapping;

public function __construct(array $mapping)
{
parent::__construct();

$this->mapping = $mapping;
}

/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setDescription('Lists messages you can dispatch using the message bus')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command displays all messages that can be
dispatched using the message bus:
<info>php %command.full_name%</info>
EOF
)
;
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$io->title('Messenger');
$io->text('The following messages can be dispatched:');
$io->newLine();

$tableRows = array();
foreach ($this->mapping as $message => $handlers) {
$tableRows[] = array(sprintf('<fg=cyan>%s</fg=cyan>', $message));
foreach ($handlers as $handler) {
$tableRows[] = array(sprintf(' handled by %s', $handler));
}
}

if ($tableRows) {
$io->table(array(), $tableRows);
} else {
$io->text('No messages were found that have valid handlers.');
}
}
}
Expand Up @@ -104,26 +104,29 @@ private function registerHandlers(ContainerBuilder $container)
}

$definitions = array();
$handlersLocatorMapping = array();
foreach ($handlersByMessage as $message => $handlers) {
if (1 === \count($handlers)) {
$handlersByMessage[$message] = current($handlers);
$handlersLocatorMapping['handler.'.$message] = current($handlers);
} else {
$d = new Definition(ChainHandler::class, array($handlers));
$d->setPrivate(true);
$serviceId = hash('sha1', $message);
$definitions[$serviceId] = $d;
$handlersByMessage[$message] = new Reference($serviceId);
$handlersLocatorMapping['handler.'.$message] = new Reference($serviceId);
}
}
$container->addDefinitions($definitions);

$handlersLocatorMapping = array();
foreach ($handlersByMessage as $message => $handler) {
$handlersLocatorMapping['handler.'.$message] = $handler;
}

$handlerResolver = $container->getDefinition('messenger.handler_resolver');
$handlerResolver->replaceArgument(0, ServiceLocatorTagPass::register($container, $handlersLocatorMapping));

if ($container->hasDefinition('console.command.messenger_debug')) {
$container->getDefinition('console.command.messenger_debug')
->replaceArgument(0, array_map(function (array $handlers): array {
return array_map('strval', $handlers);
}, $handlersByMessage));
}
}

private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): array
Expand Down Expand Up @@ -161,26 +164,32 @@ private function guessHandledClasses(\ReflectionClass $handlerClass, string $ser
private function registerReceivers(ContainerBuilder $container)
{
$receiverMapping = array();
foreach ($container->findTaggedServiceIds($this->receiverTag) as $id => $tags) {
foreach ($tags as $tag) {
$receiverMapping[$id] = new Reference($id);
$taggedReceivers = $container->findTaggedServiceIds($this->receiverTag);

foreach ($taggedReceivers as $id => $tags) {
$receiverMapping[$id] = new Reference($id);

foreach ($tags as $tag) {
if (isset($tag['name'])) {
$receiverMapping[$tag['name']] = $receiverMapping[$id];
}
}
}

if (1 === \count($taggedReceivers) && $container->hasDefinition('console.command.messenger_consume_messages')) {
$container->getDefinition('console.command.messenger_consume_messages')->replaceArgument(3, (string) current($receiverMapping));
}

$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
}

private function registerSenders(ContainerBuilder $container)
{
$senderLocatorMapping = array();
foreach ($container->findTaggedServiceIds($this->senderTag) as $id => $tags) {
foreach ($tags as $tag) {
$senderLocatorMapping[$id] = new Reference($id);
$senderLocatorMapping[$id] = new Reference($id);

foreach ($tags as $tag) {
if (isset($tag['name'])) {
$senderLocatorMapping[$tag['name']] = $senderLocatorMapping[$id];
}
Expand Down
@@ -0,0 +1,19 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

/**
* @author Yonel Ceruto <yonelceruto@gmail.com>
*/
class InvalidArgumentException extends \InvalidArgumentException implements ExceptionInterface
{
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/MessageBus.php
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger;

use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;

/**
Expand Down Expand Up @@ -39,6 +40,10 @@ public function __construct(iterable $middlewareHandlers = array())
*/
public function dispatch($message)
{
if (!\is_object($message)) {
throw new InvalidArgumentException(sprintf('Invalid type for message argument. Expected object, but got "%s".', \gettype($message)));
}

return \call_user_func($this->callableForNextMiddleware(0), $message);
}

Expand Down

0 comments on commit cb2a77b

Please sign in to comment.