Skip to content

Commit

Permalink
feature #26864 [Messenger] Define multiple buses from the `framework.…
Browse files Browse the repository at this point in the history
…messenger.buses` configuration (sroze)

This PR was squashed before being merged into the 4.1-dev branch (closes #26864).

Discussion
----------

[Messenger] Define multiple buses from the `framework.messenger.buses` configuration

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #26652
| License       | MIT
| Doc PR        | symfony/symfony-docs#9617

Not everybody will benefit from having only one bus, especially with the CQRS-like usages. While keeping the extremely use of use of the single bus, this PR has the following:

- Create multiple buses from the YAML configuration
- Tag middleware only a specific buses
- Register middlewares from the YAML configuration

Even if it's visible in the PR's tests, here's how it will look like, for a completely full-customised version:
```yaml
framework:
    messenger:
        default_bus: commands
        buses:
            commands: ~
            events:
                middlewares:
                    - validation
                    - route_messages
                    - "Your\\Middleware\\Service"
                    - call_message_handler
```

A few things to note:
1. The YAML configuration creates `messenger.bus.[name]` services for the buses.
2. The YAML configuration for middleware just adds tags to the corresponding middlewares.
3. If the middleware definition does not exists, it creates it. (without any magic on the arguments though, if it isn't auto-wirable, well... "your problem")
4. In the PR, there is this "TolerateNoHandler" middleware that is a great example for event buses

Commits
-------

e5deb84 [Messenger] Define multiple buses from the `framework.messenger.buses` configuration
  • Loading branch information
sroze committed Apr 25, 2018
2 parents 2232d99 + e5deb84 commit da4fccd
Show file tree
Hide file tree
Showing 25 changed files with 518 additions and 184 deletions.
Expand Up @@ -972,6 +972,7 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->info('Messenger configuration')
->{!class_exists(FullStack::class) && interface_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->fixXmlConfig('adapter')
->fixXmlConfig('bus', 'buses')
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
Expand Down Expand Up @@ -1023,14 +1024,6 @@ function ($a) {
->end()
->scalarNode('encoder')->defaultValue('messenger.transport.serializer')->end()
->scalarNode('decoder')->defaultValue('messenger.transport.serializer')->end()
->arrayNode('middlewares')
->addDefaultsIfNotSet()
->children()
->arrayNode('validation')
->{!class_exists(FullStack::class) && class_exists(Validation::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->end()
->end()
->end()
->arrayNode('adapters')
->useAttributeAsKey('name')
->arrayPrototype()
Expand All @@ -1052,6 +1045,22 @@ function ($a) {
->end()
->end()
->end()
->scalarNode('default_bus')->defaultValue(null)->end()
->arrayNode('buses')
->defaultValue(array('default' => array('default_middlewares' => true, 'middlewares' => array())))
->useAttributeAsKey('name')
->prototype('array')
->fixXmlConfig('middleware')
->addDefaultsIfNotSet()
->children()
->booleanNode('default_middlewares')->defaultTrue()->end()
->arrayNode('middlewares')
->defaultValue(array())
->prototype('scalar')->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()
Expand Down
Expand Up @@ -61,6 +61,7 @@
use Symfony\Component\Lock\Store\StoreFactory;
use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
Expand Down Expand Up @@ -273,7 +274,7 @@ public function load(array $configs, ContainerBuilder $container)
}

if ($this->isConfigEnabled($container, $config['messenger'])) {
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer']);
$this->registerMessengerConfiguration($config['messenger'], $container, $loader, $config['serializer'], $config['validation']);
} else {
$container->removeDefinition('console.command.messenger_consume_messages');
}
Expand Down Expand Up @@ -1439,7 +1440,7 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
}
}

private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig)
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
{
if (!interface_exists(MessageBusInterface::class)) {
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
Expand All @@ -1462,21 +1463,44 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->setAlias('messenger.transport.encoder', $config['encoder']);
$container->setAlias('messenger.transport.decoder', $config['decoder']);

$messageToSenderIdsMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
if (null === $config['default_bus']) {
if (count($config['buses']) > 1) {
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
}

$config['default_bus'] = array_keys($config['buses'])[0];
}

$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
$defaultMiddlewares = array('before' => array('logging'), 'after' => array('route_messages', 'call_message_handler'));
foreach ($config['buses'] as $name => $bus) {
$busId = 'messenger.bus.'.$name;

$middlewares = $bus['default_middlewares'] ? array_merge($defaultMiddlewares['before'], $bus['middlewares'], $defaultMiddlewares['after']) : $bus['middlewares'];

if ($config['middlewares']['validation']['enabled']) {
if (!$container->has('validator')) {
if (in_array('messenger.middleware.validation', $middlewares) && !$validationConfig['enabled']) {
throw new LogicException('The Validation middleware is only available when the Validator component is installed and enabled. Try running "composer require symfony/validator".');
}
} else {
$container->removeDefinition('messenger.middleware.validator');

$container->setParameter($busId.'.middlewares', $middlewares);
$container->setDefinition($busId, (new Definition(MessageBus::class, array(array())))->addTag('messenger.bus', array('name' => $name)));

if ($name === $config['default_bus']) {
$container->setAlias('message_bus', $busId);
$container->setAlias(MessageBusInterface::class, $busId);
}
}

if (!$container->hasAlias('message_bus')) {
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}

$messageToSenderIdsMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
}

$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);

foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
Expand Down
39 changes: 13 additions & 26 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Expand Up @@ -7,39 +7,18 @@
<services>
<defaults public="false" />

<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Messenger\MessageBus" public="true">
<argument type="tagged" tag="messenger.bus_middleware" /> <!-- Middlewares -->
</service>

<service id="Symfony\Component\Messenger\MessageBusInterface" alias="message_bus" />

<!-- Handlers -->
<service id="messenger.handler_resolver" class="Symfony\Component\Messenger\ContainerHandlerLocator">
<argument type="service" id="service_container"/>
</service>

<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware">
<argument type="service" id="messenger.handler_resolver" />

<tag name="messenger.bus_middleware" priority="-10" />
</service>

<service id="messenger.middleware.validator" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">
<argument type="service" id="validator" />

<tag name="messenger.bus_middleware" priority="100" />
</service>

<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
<argument type="service" id="messenger.sender_locator" />
<argument /> <!-- Message to sender ID mapping -->
</service>
<service id="messenger.asynchronous.middleware.send_message_to_producer" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />

<tag name="messenger.bus_middleware" priority="-5" />
</service>

<!-- Message encoding/decoding -->
Expand All @@ -49,17 +28,25 @@
<argument type="collection" /> <!-- Context -->
</service>

<!-- Middlewares -->
<service id="messenger.middleware.tolerate_no_handler" class="Symfony\Component\Messenger\Middleware\TolerateNoHandler" abstract="true" />
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<argument type="service" id="messenger.handler_resolver" />
</service>

<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware" abstract="true">
<argument type="service" id="validator" />
</service>

<!-- Logging & Debug -->
<service id="messenger.middleware.debug.logging" class="Symfony\Component\Messenger\Debug\LoggingMiddleware">
<service id="messenger.middleware.logging" class="Symfony\Component\Messenger\Middleware\LoggingMiddleware" abstract="true">
<argument type="service" id="logger" />

<tag name="messenger.bus_middleware" priority="10" />
<tag name="monolog.logger" channel="messenger" />
</service>

<service id="data_collector.messenger" class="Symfony\Component\Messenger\DataCollector\MessengerDataCollector">
<service id="messenger.data_collector" class="Symfony\Component\Messenger\DataCollector\MessengerDataCollector">
<tag name="data_collector" template="@WebProfiler/Collector/messenger.html.twig" id="messenger" priority="100" />
<tag name="messenger.bus_middleware" />
</service>

<!-- Discovery -->
Expand Down
Expand Up @@ -356,9 +356,10 @@
<xsd:element name="encoder" type="xsd:string" minOccurs="0" />
<xsd:element name="decoder" type="xsd:string" minOccurs="0" />
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="bus" type="messenger_bus" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="default-bus" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_serializer">
Expand Down Expand Up @@ -388,13 +389,16 @@
<xsd:attribute name="dsn" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />
</xsd:sequence>
<xsd:complexType name="messenger_adapter_option">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="value" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_validation">
<xsd:attribute name="enabled" type="xsd:boolean" />
<xsd:complexType name="messenger_bus">
<xsd:sequence>
<xsd:element name="middleware" type="xsd:string" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="default-middlewares" type="xsd:boolean"/>
</xsd:complexType>
</xsd:schema>
Expand Up @@ -254,11 +254,6 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'messenger' => array(
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
'routing' => array(),
'middlewares' => array(
'validation' => array(
'enabled' => !class_exists(FullStack::class),
),
),
'adapters' => array(),
'serializer' => array(
'enabled' => true,
Expand All @@ -267,6 +262,8 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
),
'encoder' => 'messenger.transport.serializer',
'decoder' => 'messenger.transport.serializer',
'default_bus' => null,
'buses' => array('default' => array('default_middlewares' => true, 'middlewares' => array())),
),
);
}
Expand Down
@@ -0,0 +1,23 @@
<?php

$container->loadFromExtension('framework', array(
'messenger' => array(
'default_bus' => 'commands',
'buses' => array(
'commands' => null,
'events' => array(
'middlewares' => array(
'tolerate_no_handler',
),
),
'queries' => array(
'default_middlewares' => false,
'middlewares' => array(
'route_messages',
'tolerate_no_handler',
'call_message_handler',
),
),
),
),
));

This file was deleted.

This file was deleted.

@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger default-bus="commands">
<framework:bus name="commands" />
<framework:bus name="events">
<framework:middleware>tolerate_no_handler</framework:middleware>
</framework:bus>
<framework:bus name="queries" default-middlewares="false">
<framework:middleware>route_messages</framework:middleware>
<framework:middleware>tolerate_no_handler</framework:middleware>
<framework:middleware>call_message_handler</framework:middleware>
</framework:bus>
</framework:messenger>
</framework:config>
</container>

This file was deleted.

This file was deleted.

@@ -0,0 +1,14 @@
framework:
messenger:
default_bus: commands
buses:
commands: ~
events:
middlewares:
- "tolerate_no_handler"
queries:
default_middlewares: false
middlewares:
- "route_messages"
- "tolerate_no_handler"
- "call_message_handler"

This file was deleted.

This file was deleted.

0 comments on commit da4fccd

Please sign in to comment.