Skip to content

Commit

Permalink
feature #26632 [Messenger] Add AMQP adapter (sroze)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 4.1-dev branch (closes #26632).

Discussion
----------

[Messenger] Add AMQP adapter

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | ø
| License       | MIT

- [x] Depends on the Messenger component #24411
- [x] Add tests once we are all happy about the structure

---

In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.

Configuring the adapter is as simple as the following configuration:
```yaml
# config/packages/messenger_adapters.yaml
framework:
    messenger:
        adapter: "%env(MESSENGER_DSN)%"
```

With the given `.env` for example:
```
MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages
```

Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.default_sender
```

---

Additionally, multiple adapters can be created and messages routed to these ones.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        adapters:
            commands: "amqp://guest:guest@localhost:5672/%2f/commands"
            maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance"
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.commands_sender
            'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender
```

Commits
-------

798c230 [Messenger] Add AMQP adapter
  • Loading branch information
sroze committed Apr 12, 2018
2 parents 9a99955 + 798c230 commit aa04d06
Show file tree
Hide file tree
Showing 33 changed files with 1,322 additions and 86 deletions.
9 changes: 9 additions & 0 deletions .travis.yml
Expand Up @@ -12,11 +12,13 @@ addons:
- language-pack-fr-base
- ldap-utils
- slapd
- librabbitmq-dev

env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages

matrix:
include:
Expand All @@ -38,6 +40,7 @@ services:
- memcached
- mongodb
- redis-server
- rabbitmq

before_install:
- |
Expand Down Expand Up @@ -134,6 +137,11 @@ before_install:
- |
# Install extra PHP extensions
if [[ ! $skip ]]; then
# Install librabbitmq
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb
wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb
sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb
# install libsodium
sudo add-apt-repository ppa:ondrej/php -y
sudo apt-get update -q
Expand All @@ -142,6 +150,7 @@ before_install:
tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI
tfold ext.libsodium tpecl libsodium sodium.so $INI
tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI
tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI
fi
- |
Expand Down
Expand Up @@ -971,12 +971,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
->arrayNode('messenger')
->info('Messenger configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->fixXmlConfig('adapter')
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->beforeNormalization()
->always()
->then(function ($config) {
if (!is_array($config)) {
return array();
}

$newConfig = array();
foreach ($config as $k => $v) {
if (!is_int($k)) {
Expand Down Expand Up @@ -1011,6 +1016,28 @@ function ($a) {
->end()
->end()
->end()
->arrayNode('adapters')
->useAttributeAsKey('name')
->arrayPrototype()
->beforeNormalization()
->ifString()
->then(function (string $dsn) {
return array('dsn' => $dsn);
})
->end()
->fixXmlConfig('option')
->children()
->scalarNode('dsn')->end()
->arrayNode('options')
->normalizeKeys(false)
->useAttributeAsKey('name')
->defaultValue(array())
->prototype('variable')
->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()
Expand Down
Expand Up @@ -1468,6 +1468,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
$container->removeDefinition('messenger.middleware.validator');
}

foreach ($config['adapters'] as $name => $adapter) {
$container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createSender',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.sender'));

$container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array(
new Reference('messenger.adapter_factory'),
'createReceiver',
))->setArguments(array(
$adapter['dsn'],
$adapter['options'],
))->addTag('messenger.receiver'));
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
13 changes: 13 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Expand Up @@ -72,5 +72,18 @@
<tag name="container.service_locator" />
<argument type="collection" />
</service>

<!-- Adapters -->
<service id="messenger.adapter_factory" class="Symfony\Component\Messenger\Adapter\Factory\ChainAdapterFactory">
<argument type="tagged" tag="messenger.adapter_factory" />
</service>

<service id="messenger.adapter.amqp.factory" class="Symfony\Component\Messenger\Adapter\AmqpExt\AmqpAdapterFactory">
<argument type="service" id="messenger.transport.default_encoder" />
<argument type="service" id="messenger.transport.default_decoder" />
<argument>%kernel.debug%</argument>

<tag name="messenger.adapter_factory" />
</service>
</services>
</container>
Expand Up @@ -354,6 +354,7 @@
<xsd:sequence>
<xsd:element name="routing" type="messenger_routing" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="middlewares" type="messenger_middleware" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="adapter" type="messenger_adapter" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>

Expand All @@ -368,6 +369,19 @@
<xsd:attribute name="service" type="xsd:string" use="required"/>
</xsd:complexType>

<xsd:complexType name="messenger_adapter">
<xsd:sequence>
<xsd:element name="option" type="messenger_adapter_option" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_adapter_option">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="value" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_middleware">
<xsd:sequence>
<xsd:element name="validation" type="messenger_validation" minOccurs="0" maxOccurs="1" />
Expand Down
Expand Up @@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
'enabled' => !class_exists(FullStack::class),
),
),
'adapters' => array(),
),
);
}
Expand Down
@@ -0,0 +1,13 @@
<?php

$container->loadFromExtension('framework', array(
'messenger' => array(
'adapters' => array(
'default' => 'amqp://localhost/%2f/messages',
'customised' => array(
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
'options' => array('queue_name' => 'Queue'),
),
),
),
));
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:adapter name="default" dsn="amqp://localhost/%2f/messages" />
<framework:adapter name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name">
<framework:option name="queue_name" value="Queue" />
</framework:adapter>
</framework:messenger>
</framework:config>
</container>
@@ -0,0 +1,8 @@
framework:
messenger:
adapters:
default: 'amqp://localhost/%2f/messages'
customised:
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
options:
queue_name: Queue
Expand Up @@ -523,6 +523,7 @@ public function testWebLink()
public function testMessenger()
{
$container = $this->createContainerFromFile('messenger');
$this->assertTrue($container->hasDefinition('message_bus'));
$this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction'));
}

Expand All @@ -538,6 +539,33 @@ public function testMessengerValidationDisabled()
$this->assertFalse($container->hasDefinition('messenger.middleware.validator'));
}

public function testMessengerAdapter()
{
$container = $this->createContainerFromFile('messenger_adapter');
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));

$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]);
}

public function testTranslator()
{
$container = $this->createContainerFromFile('full');
Expand Down
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Adapter\AmqpExt;

use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpAdapterFactory implements AdapterFactoryInterface
{
private $encoder;
private $decoder;
private $debug;

public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->debug = $debug;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
{
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function createSender(string $dsn, array $options): SenderInterface
{
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'amqp://');
}
}
35 changes: 35 additions & 0 deletions src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpFactory.php
@@ -0,0 +1,35 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Adapter\AmqpExt;

class AmqpFactory
{
public function createConnection(array $credentials): \AMQPConnection
{
return new \AMQPConnection($credentials);
}

public function createChannel(\AMQPConnection $connection): \AMQPChannel
{
return new \AMQPChannel($connection);
}

public function createQueue(\AMQPChannel $channel): \AMQPQueue
{
return new \AMQPQueue($channel);
}

public function createExchange(\AMQPChannel $channel): \AMQPExchange
{
return new \AMQPExchange($channel);
}
}

0 comments on commit aa04d06

Please sign in to comment.