Skip to content

Commit

Permalink
feature #36431 [Messenger] Add FIFO support to the SQS transport (cv6…
Browse files Browse the repository at this point in the history
…5kr)

This PR was squashed before being merged into the 5.1-dev branch.

Discussion
----------

[Messenger] Add FIFO support to the SQS transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | no
| License       | MIT
| Doc PR        | --

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html

Commits
-------

3760175 [Messenger] Add FIFO support to the SQS transport
  • Loading branch information
fabpot committed Apr 17, 2020
2 parents a85545f + 3760175 commit 67948a7
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 21 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -22,6 +22,7 @@ env:
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1

matrix:
Expand Down
Expand Up @@ -5,4 +5,4 @@ CHANGELOG
-----

* Introduced the Amazon SQS bridge.

* Added FIFO support to the SQS transport
Expand Up @@ -17,42 +17,51 @@

class AmazonSqsIntegrationTest extends TestCase
{
private $connection;
public function testConnectionSendToFifoQueueAndGet(): void
{
if (!getenv('MESSENGER_SQS_FIFO_QUEUE_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_FIFO_QUEUE_DSN" environment variable is required.');
}

protected function setUp(): void
$this->execute(getenv('MESSENGER_SQS_FIFO_QUEUE_DSN'));
}

public function testConnectionSendAndGet(): void
{
if (!getenv('MESSENGER_SQS_DSN')) {
$this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
}

$this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
$this->connection->setup();
$this->clearSqs();
$this->execute(getenv('MESSENGER_SQS_DSN'));
}

public function testConnectionSendAndGet()
private function execute(string $dsn): void
{
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $this->connection->getMessageCount());
$connection = Connection::fromDsn($dsn, []);
$connection->setup();
$this->clearSqs($connection);

$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
$this->assertSame(1, $connection->getMessageCount());

$wait = 0;
while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
while ((null === $encoded = $connection->get()) && $wait++ < 200) {
usleep(5000);
}

$this->assertEquals('{"message": "Hi"}', $encoded['body']);
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
}

private function clearSqs()
private function clearSqs(Connection $connection): void
{
$wait = 0;
while ($wait++ < 50) {
if (null === $message = $this->connection->get()) {
if (null === $message = $connection->get()) {
usleep(5000);
continue;
}
$this->connection->delete($message['id']);
$connection->delete($message['id']);
}
}
}
Expand Up @@ -13,14 +13,15 @@

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class AmazonSqsSenderTest extends TestCase
{
public function testSend()
public function testSend(): void
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
Expand All @@ -36,4 +37,24 @@ public function testSend()
$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}

public function testSendWithAmazonSqsFifoStamp(): void
{
$envelope = (new Envelope(new DummyMessage('Oy')))
->with($stamp = new AmazonSqsFifoStamp('testGroup', 'testDeduplicationId'));

$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->getMock();
$connection->expects($this->once())->method('send')
->with($encoded['body'], $encoded['headers'], 0, $stamp->getMessageGroupId(), $stamp->getMessageDeduplicationId());

$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$sender = new AmazonSqsSender($connection, $serializer);
$sender->send($envelope);
}
}
@@ -0,0 +1,37 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;

use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

final class AmazonSqsFifoStamp implements NonSendableStampInterface
{
private $messageGroupId;

private $messageDeduplicationId;

public function __construct(?string $messageGroupId = null, ?string $messageDeduplicationId = null)
{
$this->messageGroupId = $messageGroupId;
$this->messageDeduplicationId = $messageDeduplicationId;
}

public function getMessageGroupId(): ?string
{
return $this->messageGroupId;
}

public function getMessageDeduplicationId(): ?string
{
return $this->messageDeduplicationId;
}
}
Expand Up @@ -43,8 +43,24 @@ public function send(Envelope $envelope): Envelope
$delayStamp = $envelope->last(DelayStamp::class);
$delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;

$messageGroupId = null;
$messageDeduplicationId = null;

/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
if (null !== $amazonSqsFifoStamp) {
$messageGroupId = $amazonSqsFifoStamp->getMessageGroupId();
$messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
}

try {
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
$this->connection->send(
$encodedMessage['body'],
$encodedMessage['headers'] ?? [],
$delay,
$messageGroupId,
$messageDeduplicationId
);
} catch (HttpExceptionInterface $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
*/
class Connection
{
private const AWS_SQS_FIFO_SUFFIX = '.fifo';

private const DEFAULT_OPTIONS = [
'buffer_size' => 9,
'wait_time' => 20,
Expand Down Expand Up @@ -196,10 +198,16 @@ private function getNewMessages(): \Generator

public function setup(): void
{
$this->call($this->configuration['endpoint'], [
$parameters = [
'Action' => 'CreateQueue',
'QueueName' => $this->configuration['queue_name'],
]);
];

if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['FifoQueue'] = true;
}

$this->call($this->configuration['endpoint'], $parameters);
$this->queueUrl = null;

$this->configuration['auto_setup'] = false;
Expand Down Expand Up @@ -232,17 +240,26 @@ public function getMessageCount(): int
return 0;
}

public function send(string $body, array $headers, int $delay = 0): void
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
{
if ($this->configuration['auto_setup']) {
$this->setup();
}

$this->call($this->getQueueUrl(), [
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);

$parameters = [
'Action' => 'SendMessage',
'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
'MessageBody' => $messageBody,
'DelaySeconds' => $delay,
]);
];

if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
}

$this->call($this->getQueueUrl(), $parameters);
}

public function reset(): void
Expand Down Expand Up @@ -362,4 +379,9 @@ private function checkResponse(ResponseInterface $response): void
throw new TransportException($error->Error->Message);
}
}

private function isFifoQueue(string $queueName): bool
{
return self::AWS_SQS_FIFO_SUFFIX === substr($queueName, -\strlen(self::AWS_SQS_FIFO_SUFFIX));
}
}

0 comments on commit 67948a7

Please sign in to comment.