Skip to content

Commit

Permalink
feature #29010 [Messenger] make senders and handlers subscribing to p…
Browse files Browse the repository at this point in the history
…arent interfaces receive *all* matching messages, wildcard included (nicolas-grekas)

This PR was merged into the 4.2-dev branch.

Discussion
----------

 [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included

| Q             | A
| ------------- | ---
| Branch?       | 4.2
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | -
| License       | MIT
| Doc PR        | -

~Embeds #29006 for now.~

From the CHANGELOG:
 * senders and handlers subscribing to parent interfaces now receive *all* matching messages, wildcard included
 * `HandlerLocatorInterface::resolve()` has been removed, use `HandlersLocator::getHandlers()` instead
 * `SenderLocatorInterface::getSenderForMessage()` has been removed, use `SendersLocator::getSenders()` instead
 * The `ChainHandler` and `ChainSender` classes have been removed
 * The `ContainerHandlerLocator`, `AbstractHandlerLocator`, `SenderLocator` and `AbstractSenderLocator` classes have been removed

The new `HandlersLocatorInterface` and `SendersLocatorInterface` interfaces return **`iterable`** of corresponding handlers/senders. This allows simplifying a lot the DI configuration and standalone usage.

Inheritance-based configuration is now stable: when a sender or a handler is bound to `SomeMessageInterface`, it will always get all messages of that kind. This fixes the unstable nature of the previous logic, where only the first matching type bound to a message would match, making routing/handling unpredictable (note that the new behavior is also how Laravel does it.)

Some cleanups found meanwhile:
 * the `messenger.sender` tag was useless, it's removed
 * the reponsibility of the "send-and-handle" decision has been moved to the locator, where it belongs
 * thanks to type+argument autowiring aliases, we don't need to force defining a default bus - gaining nice errors when a named argument has a typo
 * some services have been renamed to make them more conventional

As far as priorities are concerned, the priority number applies in the scope of the type bound to it: senders/handlers that are bound to more specific types are always called before less specific ones, no matter the defined priority.

Commits
-------

1e7af4d [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included
  • Loading branch information
sroze committed Oct 31, 2018
2 parents 225746b + 1e7af4d commit d2b8014
Show file tree
Hide file tree
Showing 38 changed files with 419 additions and 977 deletions.
Expand Up @@ -391,14 +391,15 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool
/**
* Dispatches a message to the bus.
*
* @param object $message The message to dispatch
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @final
*/
protected function dispatchMessage($message): Envelope
{
if (!$this->container->has('message_bus')) {
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
$message = class_exists(Envelope::class) ? 'You need to define the "messenger.default_bus" configuration option.' : 'Try running "composer require symfony/messenger".';
throw new \LogicException('The message bus is not enabled in your application. '.$message);
}

return $this->container->get('message_bus')->dispatch($message);
Expand Down
Expand Up @@ -40,7 +40,6 @@ class UnusedTagsPass implements CompilerPassInterface
'kernel.event_subscriber',
'kernel.fragment_renderer',
'messenger.bus',
'messenger.sender',
'messenger.receiver',
'messenger.message_handler',
'monolog.logger',
Expand Down
Expand Up @@ -1077,7 +1077,7 @@ function ($a) {
->end()
->end()
->end()
->scalarNode('default_bus')->defaultValue(null)->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())))
->useAttributeAsKey('name')
Expand Down
Expand Up @@ -36,6 +36,8 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\RewindableGenerator;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand Down Expand Up @@ -68,7 +70,6 @@
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Sender\ChainSender;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
Expand Down Expand Up @@ -1491,7 +1492,7 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
{
if (!interface_exists(MessageBusInterface::class)) {
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed. Try running "composer require symfony/messenger".');
}

$loader->load('messenger.xml');
Expand All @@ -1502,7 +1503,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".');
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".');
}

$container->getDefinition('messenger.transport.symfony_serializer')
Expand All @@ -1517,17 +1518,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}
}

if (null === $config['default_bus']) {
if (\count($config['buses']) > 1) {
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
}

if (null === $config['default_bus'] && 1 === \count($config['buses'])) {
$config['default_bus'] = key($config['buses']);
}

$defaultMiddleware = array(
'before' => array(array('id' => 'logging')),
'after' => array(array('id' => 'route_messages'), array('id' => 'call_message_handler')),
'after' => array(array('id' => 'send_message'), array('id' => 'handle_message')),
);
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
Expand Down Expand Up @@ -1562,51 +1559,44 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}
}

if (!$container->hasAlias('message_bus')) {
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}

$senderAliases = array();
foreach ($config['transports'] as $name => $transport) {
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
}

$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('alias' => $name))
->addTag('messenger.sender', array('alias' => $name))
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
}

$messageToSenderIdMapping = array();
$messageToSendAndHandleMapping = array();
$messageToSendersMapping = array();
$messagesToSendAndHandle = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
$senders = array_map(function ($sender) use ($senderAliases) {
return new Reference($senderAliases[$sender] ?? $sender);
}, $messageConfiguration['senders']);

if (1 < \count($messageConfiguration['senders'])) {
$senders = array_map(function ($sender) use ($senderAliases) {
return new Reference($senderAliases[$sender] ?? $sender);
}, $messageConfiguration['senders']);
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
$chainSenderDefinition->addTag('messenger.sender');
$chainSenderId = '.messenger.chain_sender.'.$message;
$container->setDefinition($chainSenderId, $chainSenderDefinition);
$messageToSenderIdMapping[$message] = $chainSenderId;
} else {
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
}
$sendersId = 'messenger.senders.'.$message;
$sendersDefinition = $container->register($sendersId, RewindableGenerator::class)
->setFactory('current')
->addArgument(array(new IteratorArgument($senders)));
$messageToSendersMapping[$message] = new Reference($sendersId);

$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}

$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $messagesToSendAndHandle)
;
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
17 changes: 6 additions & 11 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Expand Up @@ -8,13 +8,12 @@
<defaults public="false" />

<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Transport\Sender\Locator\ContainerSenderLocator">
<argument type="service" id="messenger.sender_locator" />
<argument type="collection" /> <!-- Message to sender ID mapping -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message sender iterators -->
<argument type="collection" /> <!-- Messages to send and handle -->
</service>
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
<argument type="collection" /> <!-- Message to send and handle mapping -->
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.senders_locator" />
</service>

<!-- Message encoding/decoding -->
Expand All @@ -25,7 +24,7 @@
</service>

<!-- Middleware -->
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<service id="messenger.middleware.handle_message" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<argument /> <!-- Bus handler resolver -->
</service>

Expand All @@ -48,10 +47,6 @@
<tag name="container.service_locator" />
<argument type="collection" />
</service>
<service id="messenger.sender_locator">
<tag name="container.service_locator" />
<argument type="collection" />
</service>

<!-- transports -->
<service id="messenger.transport_factory" class="Symfony\Component\Messenger\Transport\TransportFactory">
Expand Down
Expand Up @@ -13,8 +13,8 @@
'messenger.bus.queries' => array(
'default_middleware' => false,
'middleware' => array(
'route_messages',
'call_message_handler',
'send_message',
'handle_message',
),
),
),
Expand Down
Expand Up @@ -18,8 +18,8 @@
</framework:middleware>
</framework:bus>
<framework:bus name="messenger.bus.queries" default-middleware="false">
<framework:middleware id="route_messages" />
<framework:middleware id="call_message_handler" />
<framework:middleware id="send_message" />
<framework:middleware id="handle_message" />
</framework:bus>
</framework:messenger>
</framework:config>
Expand Down
Expand Up @@ -9,5 +9,5 @@ framework:
messenger.bus.queries:
default_middleware: false
middleware:
- "route_messages"
- "call_message_handler"
- "send_message"
- "handle_message"
Expand Up @@ -544,9 +544,7 @@ public function testMessengerTransports()
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));

$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
Expand All @@ -563,28 +561,21 @@ public function testMessengerTransports()
public function testMessengerRouting()
{
$container = $this->createContainerFromFile('messenger_routing');
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');

$messageToSenderIdsMapping = array(
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
'*' => 'amqp',
);
$messageToSendAndHandleMapping = array(
DummyMessage::class => false,
SecondMessage::class => true,
'*' => false,
);

$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('.messenger.chain_sender.'.DummyMessage::class)->getArgument(0));
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('messenger.senders.'.DummyMessage::class)->getArgument(0)[0]->getValues());
}

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".
*/
public function testMessengerTransportConfigurationWithoutSerializer()
{
Expand All @@ -593,7 +584,7 @@ public function testMessengerTransportConfigurationWithoutSerializer()

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".
*/
public function testMessengerAMQPTransportConfigurationWithoutSerializer()
{
Expand All @@ -619,22 +610,22 @@ public function testMessengerWithMultipleBuses()
$this->assertSame(array(), $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
array('id' => 'send_message'),
array('id' => 'handle_message'),
), $container->getParameter('messenger.bus.commands.middleware'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'with_factory', 'arguments' => array('foo', true, array('bar' => 'baz'))),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
array('id' => 'send_message'),
array('id' => 'handle_message'),
), $container->getParameter('messenger.bus.events.middleware'));
$this->assertTrue($container->has('messenger.bus.queries'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.queries')->getArgument(0));
$this->assertEquals(array(
array('id' => 'route_messages', 'arguments' => array()),
array('id' => 'call_message_handler', 'arguments' => array()),
array('id' => 'send_message', 'arguments' => array()),
array('id' => 'handle_message', 'arguments' => array()),
), $container->getParameter('messenger.bus.queries.middleware'));

$this->assertTrue($container->hasAlias('message_bus'));
Expand Down

0 comments on commit d2b8014

Please sign in to comment.