Skip to content

Commit

Permalink
minor #27002 [Messenger] implement several senders using a ChainSende…
Browse files Browse the repository at this point in the history
…r (Tobion)

This PR was merged into the 4.1 branch.

Discussion
----------

[Messenger] implement several senders using a ChainSender

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets |
| License       | MIT
| Doc PR        |

Commits
-------

198925e [Messenger] implement several senders using a ChainSender
  • Loading branch information
sroze committed May 17, 2018
2 parents 68eda49 + 198925e commit f1967aa
Show file tree
Hide file tree
Showing 14 changed files with 204 additions and 90 deletions.
Expand Up @@ -986,14 +986,18 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
$newConfig = array();
foreach ($config as $k => $v) {
if (!\is_int($k)) {
$newConfig[$k] = array('senders' => \is_array($v) ? array_values($v) : array($v));
$newConfig[$k] = array(
'senders' => $v['senders'] ?? (\is_array($v) ? array_values($v) : array($v)),
'send_and_handle' => $v['send_and_handle'] ?? false,
);
} else {
$newConfig[$v['message-class']]['senders'] = array_map(
function ($a) {
return \is_string($a) ? $a : $a['service'];
},
array_values($v['sender'])
);
$newConfig[$v['message-class']]['send-and-handle'] = $v['send-and-handle'] ?? false;
}
}

Expand All @@ -1006,6 +1010,7 @@ function ($a) {
->requiresAtLeastOneElement()
->prototype('scalar')->end()
->end()
->booleanNode('send_and_handle')->defaultFalse()->end()
->end()
->end()
->end()
Expand Down
Expand Up @@ -63,6 +63,7 @@
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ChainSender;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
Expand Down Expand Up @@ -1494,16 +1495,28 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}

$messageToSenderIdsMapping = array();
$messageToSenderIdMapping = array();
$messageToSendAndHandleMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
}

$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
if (1 < \count($messageConfiguration['senders'])) {
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
$chainSenderId = '.messenger.chain_sender.'.$message;
$container->setDefinition($chainSenderId, $chainSenderDefinition);
$messageToSenderIdMapping[$message] = $chainSenderId;
} else {
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
}

$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
}

$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);

foreach ($config['transports'] as $name => $transport) {
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
Expand Down
Expand Up @@ -15,10 +15,11 @@
<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
<argument type="service" id="messenger.sender_locator" />
<argument /> <!-- Message to sender ID mapping -->
<argument type="collection" /> <!-- Message to sender ID mapping -->
</service>
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
<argument type="collection" /> <!-- Message to send and handle mapping -->
</service>

<!-- Message encoding/decoding -->
Expand Down
Expand Up @@ -375,6 +375,7 @@
<xsd:element name="sender" type="messenger_routing_sender" />
</xsd:choice>
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
</xsd:complexType>

<xsd:complexType name="messenger_routing_sender">
Expand Down
Expand Up @@ -3,8 +3,11 @@
$container->loadFromExtension('framework', array(
'messenger' => array(
'routing' => array(
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp'),
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array('amqp', 'audit', null),
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
'senders' => array('amqp', 'audit'),
'send_and_handle' => true,
),
'*' => 'amqp',
),
),
Expand Down
Expand Up @@ -9,11 +9,11 @@
<framework:messenger>
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
<framework:sender service="amqp" />
<framework:sender service="audit" />
</framework:routing>
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage">
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage" send-and-handle="true">
<framework:sender service="amqp" />
<framework:sender service="audit" />
<framework:sender service="null" />
</framework:routing>
<framework:routing message-class="*">
<framework:sender service="amqp" />
Expand Down
@@ -1,6 +1,8 @@
framework:
messenger:
routing:
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': amqp
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage': [amqp, audit, ~]
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
senders: [amqp, audit]
send_and_handle: true
'*': amqp
Expand Up @@ -558,14 +558,21 @@ public function testMessengerRouting()
{
$container = $this->createContainerFromFile('messenger_routing');
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');

$messageToSenderIdsMapping = array(
DummyMessage::class => array('amqp'),
SecondMessage::class => array('amqp', 'audit', null),
'*' => array('amqp'),
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
'*' => 'amqp',
);
$messageToSendAndHandleMapping = array(
DummyMessage::class => false,
SecondMessage::class => true,
'*' => false,
);

$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
}

/**
Expand Down
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Asynchronous\Middleware;

use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
use Symfony\Component\Messenger\Envelope;
Expand All @@ -19,14 +20,17 @@

/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*/
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
{
private $senderLocator;
private $messagesToSendAndHandleMapping;

public function __construct(SenderLocatorInterface $senderLocator)
public function __construct(SenderLocatorInterface $senderLocator, array $messagesToSendAndHandleMapping = array())
{
$this->senderLocator = $senderLocator;
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
}

/**
Expand All @@ -40,20 +44,21 @@ public function handle($message, callable $next)
return $next($message);
}

if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
foreach ($senders as $sender) {
if (null === $sender) {
continue;
}
$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());

$sender->send($envelope);
}
if ($sender) {
$sender->send($envelope);

if (!\in_array(null, $senders, true)) {
if (!$this->mustSendAndHandle($envelope->getMessage())) {
return;
}
}

return $next($message);
}

private function mustSendAndHandle($message): bool
{
return (bool) SenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
}
}
Expand Up @@ -12,49 +12,55 @@
namespace Symfony\Component\Messenger\Asynchronous\Routing;

use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SenderLocator implements SenderLocatorInterface
{
private $senderServiceLocator;
private $messageToSenderIdsMapping;
private $messageToSenderIdMapping;

public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping)
{
$this->senderServiceLocator = $senderServiceLocator;
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
}

/**
* {@inheritdoc}
*/
public function getSendersForMessage($message): array
public function getSenderForMessage($message): ?SenderInterface
{
$senders = array();
foreach ($this->getSenderIds($message) as $senderId) {
$senders[] = $this->senderServiceLocator->get($senderId);
}
$senderId = $this->getSenderId($message);

return $senderId ? $this->senderServiceLocator->get($senderId) : null;
}

return $senders;
private function getSenderId($message): ?string
{
return self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
}

private function getSenderIds($message): array
/**
* @internal
*/
public static function getValueFromMessageRouting(array $mapping, $message)
{
if (isset($this->messageToSenderIdsMapping[\get_class($message)])) {
return $this->messageToSenderIdsMapping[\get_class($message)];
if (isset($mapping[\get_class($message)])) {
return $mapping[\get_class($message)];
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_parents($message))) {
return current($messageToSenderIdsMapping);
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
return current($parentsMapping);
}
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_implements($message))) {
return current($messageToSenderIdsMapping);
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
return current($interfaceMapping);
}
if (isset($this->messageToSenderIdsMapping['*'])) {
return $this->messageToSenderIdsMapping['*'];
if (isset($mapping['*'])) {
return $mapping['*'];
}

return array();
return null;
}
}
Expand Up @@ -15,6 +15,7 @@

/**
* @author Samuel Roze <samuel.roze@gmail.com>
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.1
*/
Expand All @@ -25,7 +26,7 @@ interface SenderLocatorInterface
*
* @param object $message
*
* @return SenderInterface[]
* @return SenderInterface|null
*/
public function getSendersForMessage($message): array;
public function getSenderForMessage($message): ?SenderInterface;
}

0 comments on commit f1967aa

Please sign in to comment.