Skip to content

Commit

Permalink
feature #32454 [Messenger] Add SQS transport (jderusse)
Browse files Browse the repository at this point in the history
This PR was merged into the 5.1-dev branch.

Discussion
----------

[Messenger] Add SQS transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | N/A
| License       | MIT
| Doc PR        | TODO

This PR add the AWS SQS transport in messenger.

It also add a `DisconnectedReceiverInterface` that allows the worker to release not-proceeded message (which are not automatically released in SQS and have to wait a TTL). Tell me if you prefer to move it in a dedicated PR.

accepted DNS:
- `sqs://default/accountId/queueName`
- `sqs://default/queueName`
- `sqs://default/queueName?region=us-east-2`
- `sqs://my_custome_endpoint:12345/queueName?sslmode=disabled`

To reduce AWS costs, the implementation performs a long polling call and prefetch several messages.
TO get ~real time worker, one could use `./bin/console messenger:consume --sleep 0.001`

Commits
-------

c226479 [Messenger] Add SQS transport
  • Loading branch information
fabpot committed Feb 10, 2020
2 parents 11f1312 + c226479 commit 4003700
Show file tree
Hide file tree
Showing 22 changed files with 1,317 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Expand Up @@ -21,6 +21,7 @@ env:
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1

matrix:
Expand Down Expand Up @@ -69,6 +70,11 @@ before_install:
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
- |
# Start Sqs server
docker pull feathj/fake-sqs
docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
- |
# Start Kafka and install an up-to-date librdkafka
docker network create kafka_network
Expand Down
Expand Up @@ -85,6 +85,10 @@
<tag name="kernel.reset" method="reset" />
</service>

<service id="messenger.transport.sqs.factory" class="Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory">
<tag name="messenger.transport_factory" />
</service>

<!-- retry -->
<service id="messenger.retry_strategy_locator">
<tag name="container.service_locator" />
Expand Down
@@ -0,0 +1,3 @@
/Tests export-ignore
/phpunit.xml.dist export-ignore
/.gitignore export-ignore
3 changes: 3 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore
@@ -0,0 +1,3 @@
vendor/
composer.lock
phpunit.xml
8 changes: 8 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md
@@ -0,0 +1,8 @@
CHANGELOG
=========

5.1.0
-----

* Introduced the Amazon SQS bridge.

19 changes: 19 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE
@@ -0,0 +1,19 @@
Copyright (c) 2020 Fabien Potencier

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
12 changes: 12 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md
@@ -0,0 +1,12 @@
Amazon SQS Messenger
====================

Provides Amazon SQS integration for Symfony Messenger.

Resources
---------

* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)
@@ -0,0 +1,18 @@
<?php

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures;

class DummyMessage
{
private $message;

public function __construct(string $message)
{
$this->message = $message;
}

public function getMessage(): string
{
return $this->message;
}
}
@@ -0,0 +1,58 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;

class AmazonSqsIntegrationTest extends TestCase
{
private $connection;

protected function setUp(): void
{
if (!getenv('MESSENGER_SQS_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
}

$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
$this->connection->setup();
$this->clearSqs();
}

public function testConnectionSendAndGet()
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $this->connection->getMessageCount());

$wait = 0;
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
usleep(5000);
}

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

private function clearSqs()
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $this->connection->get()) {
usleep(5000);
continue;
}
$this->connection->delete($message['id']);
}
}
}
@@ -0,0 +1,76 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
use Symfony\Component\Serializer\Encoder\JsonEncoder;
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;

class AmazonSqsReceiverTest extends TestCase
{
public function testItReturnsTheDecodedMessageToTheHandler()
{
$serializer = $this->createSerializer();

$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($sqsEnvelop);

$receiver = new AmazonSqsReceiver($connection, $serializer);
$actualEnvelopes = iterator_to_array($receiver->get());
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
{
$this->expectException(MessageDecodingFailedException::class);

$serializer = $this->createMock(PhpSerializer::class);
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());

$sqsEnvelop = $this->createSqsEnvelope();
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($sqsEnvelop);
$connection->expects($this->once())->method('delete');

$receiver = new AmazonSqsReceiver($connection, $serializer);
iterator_to_array($receiver->get());
}

private function createSqsEnvelope()
{
return [
'id' => 1,
'body' => '{"message": "Hi"}',
'headers' => [
'type' => DummyMessage::class,
],
];
}

private function createSerializer(): Serializer
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

return $serializer;
}
}
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class AmazonSqsSenderTest extends TestCase
{
public function testSend()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);

$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
}
@@ -0,0 +1,27 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;

class AmazonSqsTransportFactoryTest extends TestCase
{
public function testSupportsOnlySqsTransports()
{
$factory = new AmazonSqsTransportFactory();

$this->assertTrue($factory->supports('sqs://localhost', []));
$this->assertFalse($factory->supports('redis://localhost', []));
$this->assertFalse($factory->supports('invalid-dsn', []));
}
}
@@ -0,0 +1,60 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class AmazonSqsTransportTest extends TestCase
{
public function testItIsATransport()
{
$transport = $this->getTransport();

$this->assertInstanceOf(TransportInterface::class, $transport);
}

public function testReceivesMessages()
{
$transport = $this->getTransport(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
);

$decodedMessage = new DummyMessage('Decoded.');

$sqsEnvelope = [
'id' => '5',
'body' => 'body',
'headers' => ['my' => 'header'],
];

$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('get')->willReturn($sqsEnvelope);

$envelopes = iterator_to_array($transport->get());
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}

private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
{
$serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
$connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();

return new AmazonSqsTransport($connection, $serializer);
}
}

0 comments on commit 4003700

Please sign in to comment.