diff --git a/.travis.yml b/.travis.yml index 1718bd601bfc..c23169a9b4d9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,11 +12,13 @@ addons: - language-pack-fr-base - ldap-utils - slapd + - librabbitmq-dev env: global: - MIN_PHP=7.1.3 - SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php + - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages matrix: include: @@ -38,6 +40,7 @@ services: - memcached - mongodb - redis-server + - rabbitmq before_install: - | @@ -134,6 +137,11 @@ before_install: - | # Install extra PHP extensions if [[ ! $skip ]]; then + # Install librabbitmq + wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq-dev_0.5.2-2_amd64.deb + wget http://ftp.debian.org/debian/pool/main/libr/librabbitmq/librabbitmq1_0.5.2-2_amd64.deb + sudo dpkg -i librabbitmq1_0.5.2-2_amd64.deb librabbitmq-dev_0.5.2-2_amd64.deb + # install libsodium sudo add-apt-repository ppa:ondrej/php -y sudo apt-get update -q @@ -142,6 +150,7 @@ before_install: tfold ext.apcu tpecl apcu-5.1.6 apcu.so $INI tfold ext.libsodium tpecl libsodium sodium.so $INI tfold ext.mongodb tpecl mongodb-1.4.0RC1 mongodb.so $INI + tfold ext.amqp tpecl amqp-1.9.3 amqp.so $INI fi - | diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 73e7470d9f23..b16f03852b0f 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -971,12 +971,17 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode) ->arrayNode('messenger') ->info('Messenger configuration') ->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}() + ->fixXmlConfig('adapter') ->children() ->arrayNode('routing') ->useAttributeAsKey('message_class') ->beforeNormalization() ->always() ->then(function ($config) { + if (!is_array($config)) { + return array(); + } + $newConfig = array(); foreach ($config as $k => $v) { if (!is_int($k)) { @@ -1011,6 +1016,28 @@ function ($a) { ->end() ->end() ->end() + ->arrayNode('adapters') + ->useAttributeAsKey('name') + ->arrayPrototype() + ->beforeNormalization() + ->ifString() + ->then(function (string $dsn) { + return array('dsn' => $dsn); + }) + ->end() + ->fixXmlConfig('option') + ->children() + ->scalarNode('dsn')->end() + ->arrayNode('options') + ->normalizeKeys(false) + ->useAttributeAsKey('name') + ->defaultValue(array()) + ->prototype('variable') + ->end() + ->end() + ->end() + ->end() + ->end() ->end() ->end() ->end() diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 028abd99f200..51aee716f43f 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1468,6 +1468,24 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder } else { $container->removeDefinition('messenger.middleware.validator'); } + + foreach ($config['adapters'] as $name => $adapter) { + $container->setDefinition('messenger.sender.'.$name, (new Definition(SenderInterface::class))->setFactory(array( + new Reference('messenger.adapter_factory'), + 'createSender', + ))->setArguments(array( + $adapter['dsn'], + $adapter['options'], + ))->addTag('messenger.sender')); + + $container->setDefinition('messenger.receiver.'.$name, (new Definition(ReceiverInterface::class))->setFactory(array( + new Reference('messenger.adapter_factory'), + 'createReceiver', + ))->setArguments(array( + $adapter['dsn'], + $adapter['options'], + ))->addTag('messenger.receiver')); + } } private function registerCacheConfiguration(array $config, ContainerBuilder $container) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 4d571ce2f484..ab685fe5a544 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -72,5 +72,18 @@ + + + + + + + + + + %kernel.debug% + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd index 00b61e30a0d0..3fbfaa5d9a6a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd @@ -354,6 +354,7 @@ + @@ -368,6 +369,19 @@ + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php index 6216babe8151..0ba24a688656 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php @@ -258,6 +258,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor 'enabled' => !class_exists(FullStack::class), ), ), + 'adapters' => array(), ), ); } diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_adapter.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_adapter.php new file mode 100644 index 000000000000..5e8608e4e894 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_adapter.php @@ -0,0 +1,13 @@ +loadFromExtension('framework', array( + 'messenger' => array( + 'adapters' => array( + 'default' => 'amqp://localhost/%2f/messages', + 'customised' => array( + 'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name', + 'options' => array('queue_name' => 'Queue'), + ), + ), + ), +)); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_adapter.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_adapter.xml new file mode 100644 index 000000000000..830ba48a9cc2 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_adapter.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_adapter.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_adapter.yml new file mode 100644 index 000000000000..2ec24e9aa15d --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_adapter.yml @@ -0,0 +1,8 @@ +framework: + messenger: + adapters: + default: 'amqp://localhost/%2f/messages' + customised: + dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name' + options: + queue_name: Queue diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 5157944c7a58..87824e829352 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -523,6 +523,7 @@ public function testWebLink() public function testMessenger() { $container = $this->createContainerFromFile('messenger'); + $this->assertTrue($container->hasDefinition('message_bus')); $this->assertFalse($container->hasDefinition('messenger.middleware.doctrine_transaction')); } @@ -538,6 +539,33 @@ public function testMessengerValidationDisabled() $this->assertFalse($container->hasDefinition('messenger.middleware.validator')); } + public function testMessengerAdapter() + { + $container = $this->createContainerFromFile('messenger_adapter'); + $this->assertTrue($container->hasDefinition('messenger.sender.default')); + $this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender')); + $this->assertTrue($container->hasDefinition('messenger.receiver.default')); + $this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver')); + + $this->assertTrue($container->hasDefinition('messenger.sender.customised')); + $senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory(); + $senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments(); + + $this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createSender'), $senderFactory); + $this->assertCount(2, $senderArguments); + $this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]); + $this->assertEquals(array('queue_name' => 'Queue'), $senderArguments[1]); + + $this->assertTrue($container->hasDefinition('messenger.receiver.customised')); + $receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory(); + $receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments(); + + $this->assertEquals(array(new Reference('messenger.adapter_factory'), 'createReceiver'), $receiverFactory); + $this->assertCount(2, $receiverArguments); + $this->assertEquals('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]); + $this->assertEquals(array('queue_name' => 'Queue'), $receiverArguments[1]); + } + public function testTranslator() { $container = $this->createContainerFromFile('full'); diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpAdapterFactory.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpAdapterFactory.php new file mode 100644 index 000000000000..1085355bd2bd --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpAdapterFactory.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt; + +use Symfony\Component\Messenger\Adapter\Factory\AdapterFactoryInterface; +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; +use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; + +/** + * @author Samuel Roze + */ +class AmqpAdapterFactory implements AdapterFactoryInterface +{ + private $encoder; + private $decoder; + private $debug; + + public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug) + { + $this->encoder = $encoder; + $this->decoder = $decoder; + $this->debug = $debug; + } + + public function createReceiver(string $dsn, array $options): ReceiverInterface + { + return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug)); + } + + public function createSender(string $dsn, array $options): SenderInterface + { + return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug)); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'amqp://'); + } +} diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpFactory.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpFactory.php new file mode 100644 index 000000000000..8ff8270c201b --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpFactory.php @@ -0,0 +1,35 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt; + +class AmqpFactory +{ + public function createConnection(array $credentials): \AMQPConnection + { + return new \AMQPConnection($credentials); + } + + public function createChannel(\AMQPConnection $connection): \AMQPChannel + { + return new \AMQPChannel($connection); + } + + public function createQueue(\AMQPChannel $channel): \AMQPQueue + { + return new \AMQPQueue($channel); + } + + public function createExchange(\AMQPChannel $channel): \AMQPExchange + { + return new \AMQPExchange($channel); + } +} diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpReceiver.php new file mode 100644 index 000000000000..fcecfe95ac99 --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpReceiver.php @@ -0,0 +1,80 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt; + +use Symfony\Component\Messenger\Adapter\AmqpExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; + +/** + * Symfony Messenger receiver to get messages from AMQP brokers using PHP's AMQP extension. + * + * @author Samuel Roze + */ +class AmqpReceiver implements ReceiverInterface +{ + private $messageDecoder; + private $connection; + private $shouldStop; + + public function __construct(DecoderInterface $messageDecoder, Connection $connection) + { + $this->messageDecoder = $messageDecoder; + $this->connection = $connection; + } + + /** + * {@inheritdoc} + */ + public function receive(callable $handler): void + { + while (!$this->shouldStop) { + $message = $this->connection->get(); + if (null === $message) { + $handler(null); + + usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000); + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + + continue; + } + + try { + $handler($this->messageDecoder->decode(array( + 'body' => $message->getBody(), + 'headers' => $message->getHeaders(), + ))); + + $this->connection->ack($message); + } catch (RejectMessageExceptionInterface $e) { + $this->connection->reject($message); + + throw $e; + } catch (\Throwable $e) { + $this->connection->nack($message, AMQP_REQUEUE); + + throw $e; + } finally { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + } + } + } + + public function stop(): void + { + $this->shouldStop = true; + } +} diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpSender.php new file mode 100644 index 000000000000..d7711607fcf2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/AmqpSender.php @@ -0,0 +1,42 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt; + +use Symfony\Component\Messenger\Transport\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; + +/** + * Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension. + * + * @author Samuel Roze + */ +class AmqpSender implements SenderInterface +{ + private $messageEncoder; + private $connection; + + public function __construct(EncoderInterface $messageEncoder, Connection $connection) + { + $this->messageEncoder = $messageEncoder; + $this->connection = $connection; + } + + /** + * {@inheritdoc} + */ + public function send($message) + { + $encodedMessage = $this->messageEncoder->encode($message); + + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + } +} diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/Connection.php new file mode 100644 index 000000000000..e7bfcf88b89c --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/Connection.php @@ -0,0 +1,219 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt; + +/** + * An AMQP connection. + * + * @author Samuel Roze + */ +class Connection +{ + private $connectionCredentials; + private $exchangeConfiguration; + private $queueConfiguration; + private $debug; + private $amqpFactory; + + /** + * @var \AMQPChannel|null + */ + private $amqpChannel; + + /** + * @var \AMQPExchange|null + */ + private $amqpExchange; + + /** + * @var \AMQPQueue|null + */ + private $amqpQueue; + + public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null) + { + $this->connectionCredentials = $connectionCredentials; + $this->debug = $debug; + $this->exchangeConfiguration = $exchangeConfiguration; + $this->queueConfiguration = $queueConfiguration; + $this->amqpFactory = $amqpFactory ?: new AmqpFactory(); + } + + public static function fromDsn(string $dsn, array $options = array(), bool $debug = false, AmqpFactory $amqpFactory = null): self + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new \InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn)); + } + + $pathParts = isset($parsedUrl['path']) ? explode('/', trim($parsedUrl['path'], '/')) : array(); + $amqpOptions = array_replace_recursive(array( + 'host' => $parsedUrl['host'] ?? 'localhost', + 'port' => $parsedUrl['port'] ?? 5672, + 'vhost' => isset($pathParts[0]) ? urldecode($pathParts[0]) : '/', + 'queue' => array( + 'name' => $queueName = $pathParts[1] ?? 'messages', + ), + 'exchange' => array( + 'name' => $queueName, + ), + ), $options); + + if (isset($parsedUrl['user'])) { + $amqpOptions['login'] = $parsedUrl['user']; + } + + if (isset($parsedUrl['pass'])) { + $amqpOptions['password'] = $parsedUrl['pass']; + } + + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $parsedQuery); + + $amqpOptions = array_replace_recursive($amqpOptions, $parsedQuery); + } + + $exchangeOptions = $amqpOptions['exchange']; + $queueOptions = $amqpOptions['queue']; + + unset($amqpOptions['queue']); + unset($amqpOptions['exchange']); + + return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory); + } + + /** + * @throws \AMQPException + */ + public function publish(string $body, array $headers = array()) + { + if ($this->debug) { + $this->setup(); + } + + $this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers)); + } + + /** + * Waits and gets a message from the configured queue. + * + * @throws \AMQPException + */ + public function get(): ?\AMQPEnvelope + { + if ($this->debug) { + $this->setup(); + } + + try { + if (false !== $message = $this->queue()->get()) { + return $message; + } + } catch (\AMQPQueueException $e) { + if (404 === $e->getCode()) { + // If we get a 404 for the queue, it means we need to setup the exchange & queue. + $this->setup(); + + return $this->get(); + } else { + throw $e; + } + } + + return null; + } + + public function ack(\AMQPEnvelope $message) + { + return $this->queue()->ack($message->getDeliveryTag()); + } + + public function reject(\AMQPEnvelope $message) + { + return $this->queue()->reject($message->getDeliveryTag()); + } + + public function nack(\AMQPEnvelope $message, int $flags = AMQP_NOPARAM) + { + return $this->queue()->nack($message->getDeliveryTag(), $flags); + } + + public function setup() + { + if (!$this->channel()->isConnected()) { + $this->clear(); + } + + $this->exchange()->declareExchange(); + + $this->queue()->declareQueue(); + $this->queue()->bind($this->exchange()->getName(), $this->queueConfiguration['routing_key'] ?? null); + } + + public function channel(): \AMQPChannel + { + if (null === $this->amqpChannel) { + $connection = $this->amqpFactory->createConnection($this->connectionCredentials); + $connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect'; + + if (false === $connection->{$connectMethod}()) { + throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.'); + } + + $this->amqpChannel = $this->amqpFactory->createChannel($connection); + } + + return $this->amqpChannel; + } + + public function queue(): \AMQPQueue + { + if (null === $this->amqpQueue) { + $this->amqpQueue = $this->amqpFactory->createQueue($this->channel()); + $this->amqpQueue->setName($this->queueConfiguration['name']); + $this->amqpQueue->setFlags($this->queueConfiguration['flags'] ?? AMQP_DURABLE); + + if (isset($this->queueConfiguration['arguments'])) { + $this->amqpQueue->setArguments($this->queueConfiguration['arguments']); + } + } + + return $this->amqpQueue; + } + + public function exchange(): \AMQPExchange + { + if (null === $this->amqpExchange) { + $this->amqpExchange = $this->amqpFactory->createExchange($this->channel()); + $this->amqpExchange->setName($this->exchangeConfiguration['name']); + $this->amqpExchange->setType($this->exchangeConfiguration['type'] ?? AMQP_EX_TYPE_FANOUT); + $this->amqpExchange->setFlags($this->exchangeConfiguration['flags'] ?? AMQP_DURABLE); + + if (isset($this->exchangeConfiguration['arguments'])) { + $this->amqpExchange->setArguments($this->exchangeConfiguration['arguments']); + } + } + + return $this->amqpExchange; + } + + public function getConnectionCredentials(): array + { + return $this->connectionCredentials; + } + + private function clear() + { + $this->amqpChannel = null; + $this->amqpQueue = null; + $this->amqpExchange = null; + } +} diff --git a/src/Symfony/Component/Messenger/Adapter/AmqpExt/Exception/RejectMessageExceptionInterface.php b/src/Symfony/Component/Messenger/Adapter/AmqpExt/Exception/RejectMessageExceptionInterface.php new file mode 100644 index 000000000000..d353eda24aa1 --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/AmqpExt/Exception/RejectMessageExceptionInterface.php @@ -0,0 +1,25 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\AmqpExt\Exception; + +/** + * If something goes wrong while consuming and handling a message from the AMQP broker, there are two choices: rejecting + * or re-queuing the message. + * + * If the exception that is thrown by the bus while dispatching the message implements this interface, the message will + * be rejected. Otherwise, it will be re-queued. + * + * @author Samuel Roze + */ +interface RejectMessageExceptionInterface extends \Throwable +{ +} diff --git a/src/Symfony/Component/Messenger/Adapter/Factory/AdapterFactoryInterface.php b/src/Symfony/Component/Messenger/Adapter/Factory/AdapterFactoryInterface.php new file mode 100644 index 000000000000..766e86088d17 --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/Factory/AdapterFactoryInterface.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\Factory; + +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\SenderInterface; + +/** + * Creates a Messenger adapter. + * + * @author Samuel Roze + */ +interface AdapterFactoryInterface +{ + public function createReceiver(string $dsn, array $options): ReceiverInterface; + + public function createSender(string $dsn, array $options): SenderInterface; + + public function supports(string $dsn, array $options): bool; +} diff --git a/src/Symfony/Component/Messenger/Adapter/Factory/ChainAdapterFactory.php b/src/Symfony/Component/Messenger/Adapter/Factory/ChainAdapterFactory.php new file mode 100644 index 000000000000..92e2e101b060 --- /dev/null +++ b/src/Symfony/Component/Messenger/Adapter/Factory/ChainAdapterFactory.php @@ -0,0 +1,64 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Adapter\Factory; + +use Symfony\Component\Messenger\Transport\ReceiverInterface; +use Symfony\Component\Messenger\Transport\SenderInterface; + +/** + * @author Samuel Roze + */ +class ChainAdapterFactory implements AdapterFactoryInterface +{ + private $factories; + + /** + * @param iterable|AdapterFactoryInterface[] $factories + */ + public function __construct(iterable $factories) + { + $this->factories = $factories; + } + + public function createReceiver(string $dsn, array $options): ReceiverInterface + { + foreach ($this->factories as $factory) { + if ($factory->supports($dsn, $options)) { + return $factory->createReceiver($dsn, $options); + } + } + + throw new \InvalidArgumentException(sprintf('No adapter supports the given DSN "%s".', $dsn)); + } + + public function createSender(string $dsn, array $options): SenderInterface + { + foreach ($this->factories as $factory) { + if ($factory->supports($dsn, $options)) { + return $factory->createSender($dsn, $options); + } + } + + throw new \InvalidArgumentException(sprintf('No adapter supports the given DSN "%s".', $dsn)); + } + + public function supports(string $dsn, array $options): bool + { + foreach ($this->factories as $factory) { + if ($factory->supports($dsn, $options)) { + return true; + } + } + + return false; + } +} diff --git a/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php index dd17a94b13be..8af87a2a4514 100644 --- a/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php +++ b/src/Symfony/Component/Messenger/Asynchronous/Transport/WrapIntoReceivedMessage.php @@ -25,20 +25,19 @@ public function __construct(ReceiverInterface $decoratedConsumer) $this->decoratedReceiver = $decoratedConsumer; } - public function receive(): iterable + public function receive(callable $handler): void { - $iterator = $this->decoratedReceiver->receive(); + $this->decoratedReceiver->receive(function ($message) use ($handler) { + if (null !== $message) { + $message = new ReceivedMessage($message); + } - foreach ($iterator as $message) { - try { - yield new ReceivedMessage($message); - } catch (\Throwable $e) { - if (!$iterator instanceof \Generator) { - throw $e; - } + $handler($message); + }); + } - $iterator->throw($e); - } - } + public function stop(): void + { + $this->decoratedReceiver->stop(); } } diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php new file mode 100644 index 000000000000..be1529168022 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpExtIntegrationTest.php @@ -0,0 +1,139 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver; +use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender; +use Symfony\Component\Messenger\Adapter\AmqpExt\Connection; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Process\PhpProcess; +use Symfony\Component\Process\Process; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +/** + * @requires extension amqp + */ +class AmqpExtIntegrationTest extends TestCase +{ + protected function setUp() + { + parent::setUp(); + + if (!getenv('MESSENGER_AMQP_DSN')) { + $this->markTestSkipped('The "MESSENGER_AMQP_DSN" environment variable is required.'); + } + } + + public function testItSendsAndReceivesMessages() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); + $connection->setup(); + $connection->queue()->purge(); + + $sender = new AmqpSender($serializer, $connection); + $receiver = new AmqpReceiver($serializer, $connection); + + $sender->send($firstMessage = new DummyMessage('First')); + $sender->send($secondMessage = new DummyMessage('Second')); + + $receivedMessages = 0; + $generator = $receiver->receive(function ($message) use ($receiver, &$receivedMessages, $firstMessage, $secondMessage) { + $this->assertEquals(0 == $receivedMessages ? $firstMessage : $secondMessage, $message); + + if (2 == ++$receivedMessages) { + $receiver->stop(); + } + }); + } + + public function testItReceivesSignals() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN')); + $connection->setup(); + $connection->queue()->purge(); + + $sender = new AmqpSender($serializer, $connection); + $sender->send(new DummyMessage('Hello')); + + $amqpReadTimeout = 30; + $dsn = getenv('MESSENGER_AMQP_DSN').'?read_timeout='.$amqpReadTimeout; + $process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array( + 'COMPONENT_ROOT' => __DIR__.'/../../../', + 'DSN' => $dsn, + )); + + $process->start(); + + $this->waitForOutput($process, $expectedOutput = "Receiving messages...\n"); + + $signalTime = microtime(true); + $timedOutTime = time() + 10; + + $process->signal(15); + + while ($process->isRunning() && time() < $timedOutTime) { + usleep(100 * 1000); // 100ms + } + + $this->assertFalse($process->isRunning()); + $this->assertLessThan($amqpReadTimeout, microtime(true) - $signalTime); + $this->assertEquals($expectedOutput."Get message: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage\nDone.\n", $process->getOutput()); + } + + /** + * @runInSeparateProcess + */ + public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), array('read_timeout' => '1')); + $connection->setup(); + $connection->queue()->purge(); + + $sender = new AmqpSender($serializer, $connection); + $receiver = new MaximumCountReceiver(new AmqpReceiver($serializer, $connection), 2); + $receiver->receive(function ($message) { + $this->assertNull($message); + }); + } + + private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10) + { + $timedOutTime = time() + $timeoutInSeconds; + + while (time() < $timedOutTime) { + if (0 === strpos($process->getOutput(), $output)) { + return; + } + + usleep(100 * 1000); // 100ms + } + + throw new \RuntimeException('Expected output never arrived. Got "'.$process->getOutput().'" instead.'); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpReceiverTest.php new file mode 100644 index 000000000000..18df3ac41fe5 --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpReceiverTest.php @@ -0,0 +1,111 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpReceiver; +use Symfony\Component\Messenger\Adapter\AmqpExt\Connection; +use Symfony\Component\Messenger\Adapter\AmqpExt\Exception\RejectMessageExceptionInterface; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +/** + * @requires extension amqp + */ +class AmqpReceiverTest extends TestCase +{ + public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn(array( + 'type' => DummyMessage::class, + )); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->expects($this->once())->method('ack')->with($envelope); + + $receiver = new AmqpReceiver($serializer, $connection); + $receiver->receive(function ($message) use ($receiver) { + $this->assertEquals(new DummyMessage('Hi'), $message); + $receiver->stop(); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Tests\Adapter\AmqpExt\InterruptException + */ + public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn(array( + 'type' => DummyMessage::class, + )); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->expects($this->once())->method('nack')->with($envelope); + + $receiver = new AmqpReceiver($serializer, $connection); + $receiver->receive(function () { + throw new InterruptException('Well...'); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Tests\Adapter\AmqpExt\WillNeverWorkException + */ + public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn(array( + 'type' => DummyMessage::class, + )); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->expects($this->once())->method('reject')->with($envelope); + + $receiver = new AmqpReceiver($serializer, $connection); + $receiver->receive(function () { + throw new WillNeverWorkException('Well...'); + }); + } +} + +class InterruptException extends \Exception +{ +} + +class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpSenderTest.php new file mode 100644 index 000000000000..169ac97f9ccd --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/AmqpSenderTest.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt; + +use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpSender; +use Symfony\Component\Messenger\Adapter\AmqpExt\Connection; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; + +/** + * @requires extension amqp + */ +class AmqpSenderTest extends TestCase +{ + public function testItSendsTheEncodedMessage() + { + $message = new DummyMessage('Oy'); + $encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class)); + + $encoder = $this->getMockBuilder(EncoderInterface::class)->getMock(); + $encoder->method('encode')->with($message)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers']); + + $sender = new AmqpSender($encoder, $connection); + $sender->send($message); + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/ConnectionTest.php new file mode 100644 index 000000000000..510397e298ca --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/ConnectionTest.php @@ -0,0 +1,186 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Adapter\AmqpExt; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Adapter\AmqpExt\AmqpFactory; +use Symfony\Component\Messenger\Adapter\AmqpExt\Connection; + +/** + * @requires extension amqp + */ +class ConnectionTest extends TestCase +{ + /** + * @expectedException \InvalidArgumentException + * @expectedExceptionMessage The given AMQP DSN "amqp://" is invalid. + */ + public function testItCannotBeConstructedWithAWrongDsn() + { + Connection::fromDsn('amqp://'); + } + + public function testItGetsParametersFromTheDsn() + { + $this->assertEquals( + new Connection(array( + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + ), array( + 'name' => 'messages', + ), array( + 'name' => 'messages', + )), + Connection::fromDsn('amqp://localhost/%2f/messages') + ); + } + + public function testOverrideOptionsViaQueryParameters() + { + $this->assertEquals( + new Connection(array( + 'host' => 'redis', + 'port' => 1234, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'password', + ), array( + 'name' => 'exchangeName', + ), array( + 'name' => 'queue', + )), + Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName') + ); + } + + public function testOptionsAreTakenIntoAccountAndOverwrittenByDsn() + { + $this->assertEquals( + new Connection(array( + 'host' => 'redis', + 'port' => 1234, + 'vhost' => '/', + 'login' => 'guest', + 'password' => 'password', + 'persistent' => 'true', + ), array( + 'name' => 'exchangeName', + ), array( + 'name' => 'queueName', + )), + Connection::fromDsn('amqp://guest:password@redis:1234/%2f/queue?exchange[name]=exchangeName&queue[name]=queueName', array( + 'persistent' => 'true', + 'exchange' => array('name' => 'toBeOverwritten'), + )) + ); + } + + public function testSetsParametersOnTheQueueAndExchange() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(), + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(), + $amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + ); + + $amqpQueue->expects($this->once())->method('setArguments')->with(array( + 'x-dead-letter-exchange' => 'dead-exchange', + 'x-message-ttl' => '1200', + )); + + $amqpExchange->expects($this->once())->method('setArguments')->with(array( + 'alternate-exchange' => 'alternate', + )); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[arguments][x-dead-letter-exchange]=dead-exchange', array( + 'queue' => array( + 'arguments' => array( + 'x-message-ttl' => '1200', + ), + ), + 'exchange' => array( + 'arguments' => array( + 'alternate-exchange' => 'alternate', + ), + ), + ), true, $factory); + $connection->publish('body'); + } + + public function testItUsesANormalConnectionByDefault() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(), + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(), + $amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + ); + + $amqpConnection->expects($this->once())->method('connect'); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', array(), false, $factory); + $connection->publish('body'); + } + + public function testItAllowsToUseAPersistentConnection() + { + $factory = new TestAmqpFactory( + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(), + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(), + $amqpQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + ); + + $amqpConnection->expects($this->once())->method('pconnect'); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', array(), false, $factory); + $connection->publish('body'); + } +} + +class TestAmqpFactory extends AmqpFactory +{ + private $connection; + private $channel; + private $queue; + private $exchange; + + public function __construct(\AMQPConnection $connection, \AMQPChannel $channel, \AMQPQueue $queue, \AMQPExchange $exchange) + { + $this->connection = $connection; + $this->channel = $channel; + $this->queue = $queue; + $this->exchange = $exchange; + } + + public function createConnection(array $credentials): \AMQPConnection + { + return $this->connection; + } + + public function createChannel(\AMQPConnection $connection): \AMQPChannel + { + return $this->channel; + } + + public function createQueue(\AMQPChannel $channel): \AMQPQueue + { + return $this->queue; + } + + public function createExchange(\AMQPChannel $channel): \AMQPExchange + { + return $this->exchange; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/Fixtures/long_receiver.php b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/Fixtures/long_receiver.php new file mode 100644 index 000000000000..ac973e4a6cfe --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Adapter/AmqpExt/Fixtures/long_receiver.php @@ -0,0 +1,43 @@ + new JsonEncoder())) +); + +$connection = Connection::fromDsn(getenv('DSN')); +$sender = new AmqpSender($serializer, $connection); +$receiver = new AmqpReceiver($serializer, $connection); + +$worker = new Worker($receiver, new class() implements MessageBusInterface { + public function dispatch($message) + { + echo 'Get message: '.get_class($message)."\n"; + sleep(30); + echo "Done.\n"; + } +}); + +echo "Receiving messages...\n"; +$worker->run(); diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php index 436a6df71a0e..6398aff36168 100644 --- a/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Middleware/SendMessageMiddlewareTest.php @@ -23,8 +23,8 @@ class SendMessageMiddlewareTest extends TestCase public function testItSendsTheMessageToAssignedSender() { $message = new DummyMessage('Hey'); - $sender = $this->createMock(SenderInterface::class); - $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( $sender, @@ -39,8 +39,8 @@ public function testItSendsTheMessageToAssignedSender() public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull() { $message = new DummyMessage('Hey'); - $sender = $this->createMock(SenderInterface::class); - $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( $sender, @@ -56,7 +56,7 @@ public function testItAlsoCallsTheNextMiddlewareIfASenderIsNull() public function testItCallsTheNextMiddlewareWhenNoSenderForThisMessage() { $message = new DummyMessage('Hey'); - $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array())); @@ -70,8 +70,8 @@ public function testItSkipsReceivedMessages() $innerMessage = new DummyMessage('Hey'); $message = new ReceivedMessage($innerMessage); - $sender = $this->createMock(SenderInterface::class); - $next = $this->createPartialMock(\stdClass::class, ['__invoke']); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); + $next = $this->createPartialMock(\stdClass::class, array('__invoke')); $middleware = new SendMessageMiddleware(new InMemorySenderLocator(array( $sender, diff --git a/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php b/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php index e92882da39ab..caf952526439 100644 --- a/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php +++ b/src/Symfony/Component/Messenger/Tests/Asynchronous/Routing/SenderLocatorTest.php @@ -22,40 +22,40 @@ class SenderLocatorTest extends TestCase { public function testItReturnsTheSenderBasedOnTheMessageClass() { - $sender = $this->createMock(SenderInterface::class); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $container = new Container(); $container->set('my_amqp_sender', $sender); - $locator = new SenderLocator($container, [ - DummyMessage::class => [ + $locator = new SenderLocator($container, array( + DummyMessage::class => array( 'my_amqp_sender', - ] - ]); + ), + )); - $this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello'))); - $this->assertEquals([], $locator->getSendersForMessage(new SecondMessage())); + $this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello'))); + $this->assertEquals(array(), $locator->getSendersForMessage(new SecondMessage())); } public function testItSupportsAWildcardInsteadOfTheMessageClass() { $container = new Container(); - $sender = $this->createMock(SenderInterface::class); + $sender = $this->getMockBuilder(SenderInterface::class)->getMock(); $container->set('my_amqp_sender', $sender); - $apiSender = $this->createMock(SenderInterface::class); + $apiSender = $this->getMockBuilder(SenderInterface::class)->getMock(); $container->set('my_api_sender', $apiSender); - $locator = new SenderLocator($container, [ - DummyMessage::class => [ + $locator = new SenderLocator($container, array( + DummyMessage::class => array( 'my_amqp_sender', - ], - '*' => [ - 'my_api_sender' - ] - ]); - - $this->assertEquals([$sender], $locator->getSendersForMessage(new DummyMessage('Hello'))); - $this->assertEquals([$apiSender], $locator->getSendersForMessage(new SecondMessage())); + ), + '*' => array( + 'my_api_sender', + ), + )); + + $this->assertEquals(array($sender), $locator->getSendersForMessage(new DummyMessage('Hello'))); + $this->assertEquals(array($apiSender), $locator->getSendersForMessage(new SecondMessage())); } } diff --git a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php index 9189fd7cdd77..b9e73ecd3d70 100644 --- a/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php +++ b/src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php @@ -221,12 +221,16 @@ public function __invoke(DummyMessage $message): void class DummyReceiver implements ReceiverInterface { - public function receive(): iterable + public function receive(callable $handler): void { for ($i = 0; $i < 3; ++$i) { - yield new DummyMessage("Dummy $i"); + $handler(new DummyMessage("Dummy $i")); } } + + public function stop(): void + { + } } class UndefinedMessageHandler diff --git a/src/Symfony/Component/Messenger/Tests/MessageBusTest.php b/src/Symfony/Component/Messenger/Tests/MessageBusTest.php index 86ff30aed089..786def636a3a 100644 --- a/src/Symfony/Component/Messenger/Tests/MessageBusTest.php +++ b/src/Symfony/Component/Messenger/Tests/MessageBusTest.php @@ -31,24 +31,24 @@ public function testItCallsTheMiddlewaresAndChainTheReturnValue() $message = new DummyMessage('Hello'); $responseFromDepthMiddleware = 1234; - $firstMiddleware = $this->createMock(MiddlewareInterface::class); + $firstMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock(); $firstMiddleware->expects($this->once()) ->method('handle') ->with($message, $this->anything()) - ->will($this->returnCallback(function($message, $next) { + ->will($this->returnCallback(function ($message, $next) { return $next($message); })); - $secondMiddleware = $this->createMock(MiddlewareInterface::class); + $secondMiddleware = $this->getMockBuilder(MiddlewareInterface::class)->getMock(); $secondMiddleware->expects($this->once()) ->method('handle') ->with($message, $this->anything()) ->willReturn($responseFromDepthMiddleware); - $bus = new MessageBus([ + $bus = new MessageBus(array( $firstMiddleware, $secondMiddleware, - ]); + )); $this->assertEquals($responseFromDepthMiddleware, $bus->dispatch($message)); } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 0b0411e412f8..3c9aeb12e9f0 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -22,12 +22,12 @@ class WorkerTest extends TestCase { public function testWorkerDispatchTheReceivedMessage() { - $receiver = new CallbackReceiver(function() { - yield new DummyMessage('API'); - yield new DummyMessage('IPA'); + $receiver = new CallbackReceiver(function ($handler) { + $handler(new DummyMessage('API')); + $handler(new DummyMessage('IPA')); }); - $bus = $this->createMock(MessageBusInterface::class); + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); $bus->expects($this->at(1))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('IPA'))); @@ -38,11 +38,11 @@ public function testWorkerDispatchTheReceivedMessage() public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages() { - $receiver = new CallbackReceiver(function() { - yield new ReceivedMessage(new DummyMessage('API')); + $receiver = new CallbackReceiver(function ($handler) { + $handler(new ReceivedMessage(new DummyMessage('API'))); }); - $bus = $this->createMock(MessageBusInterface::class); + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->expects($this->at(0))->method('dispatch')->with(new ReceivedMessage(new DummyMessage('API'))); @@ -52,9 +52,9 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedInReceivedMessages() public function testWorkerIsThrowingExceptionsBackToGenerators() { - $receiver = new CallbackReceiver(function() { + $receiver = new CallbackReceiver(function ($handler) { try { - yield new DummyMessage('Hello'); + $handler(new DummyMessage('Hello')); $this->assertTrue(false, 'This should not be called because the exception is sent back to the generator.'); } catch (\InvalidArgumentException $e) { @@ -63,13 +63,25 @@ public function testWorkerIsThrowingExceptionsBackToGenerators() } }); - $bus = $this->createMock(MessageBusInterface::class); - + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); $bus->method('dispatch')->willThrowException(new \InvalidArgumentException('Why not')); $worker = new Worker($receiver, $bus); $worker->run(); } + + public function testWorkerDoesNotSendNullMessagesToTheBus() + { + $receiver = new CallbackReceiver(function ($handler) { + $handler(null); + }); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + $bus->expects($this->never())->method('dispatch'); + + $worker = new Worker($receiver, $bus); + $worker->run(); + } } class CallbackReceiver implements ReceiverInterface @@ -81,10 +93,13 @@ public function __construct(callable $callable) $this->callable = $callable; } - public function receive(): iterable + public function receive(callable $handler): void { $callable = $this->callable; + $callable($handler); + } - return $callable(); + public function stop(): void + { } } diff --git a/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php b/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php index 489d52cd5482..37fa2b649a29 100644 --- a/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/Enhancers/MaximumCountReceiver.php @@ -27,25 +27,21 @@ public function __construct(ReceiverInterface $decoratedReceiver, int $maximumNu $this->maximumNumberOfMessages = $maximumNumberOfMessages; } - public function receive(): iterable + public function receive(callable $handler): void { - $iterator = $this->decoratedReceiver->receive(); $receivedMessages = 0; - foreach ($iterator as $message) { - try { - yield $message; - } catch (\Throwable $e) { - if (!$iterator instanceof \Generator) { - throw $e; - } - - $iterator->throw($e); - } + $this->decoratedReceiver->receive(function ($message) use ($handler, &$receivedMessages) { + $handler($message); if (++$receivedMessages >= $this->maximumNumberOfMessages) { - break; + $this->stop(); } - } + }); + } + + public function stop(): void + { + $this->decoratedReceiver->stop(); } } diff --git a/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php index d7fa8673eabd..1c29fbe43abe 100644 --- a/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/ReceiverInterface.php @@ -18,5 +18,16 @@ */ interface ReceiverInterface { - public function receive(): iterable; + /** + * Receive some messages to the given handler. + * + * The handler will have, as argument, the received message. Note that this message + * can be `null` if the timeout to receive something has expired. + */ + public function receive(callable $handler) : void; + + /** + * Stop receiving some messages. + */ + public function stop(): void; } diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 25c2897fe60c..3a4a0433183f 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -35,22 +35,22 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu */ public function run() { - $iterator = $this->receiver->receive(); + if (function_exists('pcntl_signal')) { + pcntl_signal(SIGTERM, function () { + $this->receiver->stop(); + }); + } + + $this->receiver->receive(function($message) { + if (null === $message) { + return; + } - foreach ($iterator as $message) { if (!$message instanceof ReceivedMessage) { $message = new ReceivedMessage($message); } - try { - $this->bus->dispatch($message); - } catch (\Throwable $e) { - if (!$iterator instanceof \Generator) { - throw $e; - } - - $iterator->throw($e); - } - } + $this->bus->dispatch($message); + }); } } diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 720edc8c912a..00f6b3779466 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -23,7 +23,9 @@ "symfony/dependency-injection": "~3.4.6|~4.0", "symfony/http-kernel": "~3.4|~4.0", "symfony/property-access": "~3.4|~4.0", - "symfony/var-dumper": "~3.4|~4.0" + "symfony/var-dumper": "~3.4|~4.0", + "symfony/property-access": "~3.4|~4.0", + "symfony/process": "~4.0" }, "suggest": { "sroze/enqueue-bridge": "For using the php-enqueue library as an adapter."