From c226479d5fb7e9fc82d5a12de306af12a17a288f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Deruss=C3=A9?= Date: Mon, 8 Jul 2019 23:33:42 +0200 Subject: [PATCH] =?UTF-8?q?[Messenger]=C2=A0Add=20SQS=20transport?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .travis.yml | 6 + .../Resources/config/messenger.xml | 4 + .../Messenger/Bridge/AmazonSqs/.gitattributes | 3 + .../Messenger/Bridge/AmazonSqs/.gitignore | 3 + .../Messenger/Bridge/AmazonSqs/CHANGELOG.md | 8 + .../Messenger/Bridge/AmazonSqs/LICENSE | 19 + .../Messenger/Bridge/AmazonSqs/README.md | 12 + .../AmazonSqs/Tests/Fixtures/DummyMessage.php | 18 + .../Transport/AmazonSqsIntegrationTest.php | 58 +++ .../Tests/Transport/AmazonSqsReceiverTest.php | 76 ++++ .../Tests/Transport/AmazonSqsSenderTest.php | 39 ++ .../AmazonSqsTransportFactoryTest.php | 27 ++ .../Transport/AmazonSqsTransportTest.php | 60 +++ .../Tests/Transport/ConnectionTest.php | 228 +++++++++++ .../Transport/AmazonSqsReceivedStamp.php | 32 ++ .../AmazonSqs/Transport/AmazonSqsReceiver.php | 113 ++++++ .../AmazonSqs/Transport/AmazonSqsSender.php | 54 +++ .../Transport/AmazonSqsTransport.php | 91 +++++ .../Transport/AmazonSqsTransportFactory.php | 34 ++ .../Bridge/AmazonSqs/Transport/Connection.php | 362 ++++++++++++++++++ .../Messenger/Bridge/AmazonSqs/composer.json | 40 ++ .../Bridge/AmazonSqs/phpunit.xml.dist | 30 ++ 22 files changed, 1317 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json create mode 100644 src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist diff --git a/.travis.yml b/.travis.yml index d1a316e09d03..a522f33b68b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ env: - SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages - MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages + - MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable - SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1 matrix: @@ -69,6 +70,11 @@ before_install: docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4 export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' + - | + # Start Sqs server + docker pull feathj/fake-sqs + docker run -d -p 9494:9494 --name sqs feathj/fake-sqs + - | # Start Kafka and install an up-to-date librdkafka docker network create kafka_network diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index aa5c50e5cdc8..5c7c2f6f5fbc 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -85,6 +85,10 @@ + + + + diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes new file mode 100644 index 000000000000..ebb9287043dc --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes @@ -0,0 +1,3 @@ +/Tests export-ignore +/phpunit.xml.dist export-ignore +/.gitignore export-ignore diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore new file mode 100644 index 000000000000..c49a5d8df5c6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore @@ -0,0 +1,3 @@ +vendor/ +composer.lock +phpunit.xml diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md new file mode 100644 index 000000000000..cf996bb4f6cd --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md @@ -0,0 +1,8 @@ +CHANGELOG +========= + +5.1.0 +----- + + * Introduced the Amazon SQS bridge. + diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE new file mode 100644 index 000000000000..5593b1d84f74 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2020 Fabien Potencier + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md new file mode 100644 index 000000000000..eabf1d6100b2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md @@ -0,0 +1,12 @@ +Amazon SQS Messenger +==================== + +Provides Amazon SQS integration for Symfony Messenger. + +Resources +--------- + + * [Contributing](https://symfony.com/doc/current/contributing/index.html) + * [Report issues](https://github.com/symfony/symfony/issues) and + [send Pull Requests](https://github.com/symfony/symfony/pulls) + in the [main Symfony repository](https://github.com/symfony/symfony) diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php new file mode 100644 index 000000000000..59d4a2ad35cb --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php @@ -0,0 +1,18 @@ +message = $message; + } + + public function getMessage(): string + { + return $this->message; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php new file mode 100644 index 000000000000..cd398edcbff7 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php @@ -0,0 +1,58 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; + +class AmazonSqsIntegrationTest extends TestCase +{ + private $connection; + + protected function setUp(): void + { + if (!getenv('MESSENGER_SQS_DSN')) { + $this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.'); + } + + $this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []); + $this->connection->setup(); + $this->clearSqs(); + } + + public function testConnectionSendAndGet() + { + $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]); + $this->assertSame(1, $this->connection->getMessageCount()); + + $wait = 0; + while ((null === $encoded = $this->connection->get()) && $wait++ < 200) { + usleep(5000); + } + + $this->assertEquals('{"message": "Hi"}', $encoded['body']); + $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']); + } + + private function clearSqs() + { + $wait = 0; + while ($wait++ < 50) { + if (null === $message = $this->connection->get()) { + usleep(5000); + continue; + } + $this->connection->delete($message['id']); + } + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php new file mode 100644 index 000000000000..b624dcb4868e --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php @@ -0,0 +1,76 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\Serializer; +use Symfony\Component\Serializer as SerializerComponent; +use Symfony\Component\Serializer\Encoder\JsonEncoder; +use Symfony\Component\Serializer\Normalizer\ObjectNormalizer; + +class AmazonSqsReceiverTest extends TestCase +{ + public function testItReturnsTheDecodedMessageToTheHandler() + { + $serializer = $this->createSerializer(); + + $sqsEnvelop = $this->createSqsEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($sqsEnvelop); + + $receiver = new AmazonSqsReceiver($connection, $serializer); + $actualEnvelopes = iterator_to_array($receiver->get()); + $this->assertCount(1, $actualEnvelopes); + $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); + } + + public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() + { + $this->expectException(MessageDecodingFailedException::class); + + $serializer = $this->createMock(PhpSerializer::class); + $serializer->method('decode')->willThrowException(new MessageDecodingFailedException()); + + $sqsEnvelop = $this->createSqsEnvelope(); + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($sqsEnvelop); + $connection->expects($this->once())->method('delete'); + + $receiver = new AmazonSqsReceiver($connection, $serializer); + iterator_to_array($receiver->get()); + } + + private function createSqsEnvelope() + { + return [ + 'id' => 1, + 'body' => '{"message": "Hi"}', + 'headers' => [ + 'type' => DummyMessage::class, + ], + ]; + } + + private function createSerializer(): Serializer + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + return $serializer; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php new file mode 100644 index 000000000000..f0a1178d4167 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php @@ -0,0 +1,39 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; + +class AmazonSqsSenderTest extends TestCase +{ + public function testSend() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->getMock(); + $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']); + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $sender = new AmazonSqsSender($connection, $serializer); + $sender->send($envelope); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php new file mode 100644 index 000000000000..ed8f085d670c --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php @@ -0,0 +1,27 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory; + +class AmazonSqsTransportFactoryTest extends TestCase +{ + public function testSupportsOnlySqsTransports() + { + $factory = new AmazonSqsTransportFactory(); + + $this->assertTrue($factory->supports('sqs://localhost', [])); + $this->assertFalse($factory->supports('redis://localhost', [])); + $this->assertFalse($factory->supports('invalid-dsn', [])); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php new file mode 100644 index 000000000000..9e6430506303 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php @@ -0,0 +1,60 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +class AmazonSqsTransportTest extends TestCase +{ + public function testItIsATransport() + { + $transport = $this->getTransport(); + + $this->assertInstanceOf(TransportInterface::class, $transport); + } + + public function testReceivesMessages() + { + $transport = $this->getTransport( + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(), + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock() + ); + + $decodedMessage = new DummyMessage('Decoded.'); + + $sqsEnvelope = [ + 'id' => '5', + 'body' => 'body', + 'headers' => ['my' => 'header'], + ]; + + $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); + $connection->method('get')->willReturn($sqsEnvelope); + + $envelopes = iterator_to_array($transport->get()); + $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); + } + + private function getTransport(SerializerInterface $serializer = null, Connection $connection = null) + { + $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock(); + $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + + return new AmazonSqsTransport($connection, $serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php new file mode 100644 index 000000000000..ef6ac5875a47 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php @@ -0,0 +1,228 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport; + +use PHPUnit\Framework\TestCase; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +class ConnectionTest extends TestCase +{ + public function testFromInvalidDsn() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The given Amazon SQS DSN "sqs://" is invalid.'); + + Connection::fromDsn('sqs://'); + } + + public function testFromDsn() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient), + Connection::fromDsn('sqs://default/queue', [], $httpClient) + ); + } + + public function testFromDsnWithRegion() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient), + Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient) + ); + } + + public function testFromDsnWithCustomEndpoint() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient), + Connection::fromDsn('sqs://localhost/queue', [], $httpClient) + ); + } + + public function testFromDsnWithCustomEndpointAndPort() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient), + Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient) + ); + } + + public function testFromDsnWithOptions() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), + Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient) + ); + } + + public function testFromDsnWithQueryOptions() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $this->assertEquals( + new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient), + Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient) + ); + } + + private function handleGetQueueUrl(int $index, $mock): string + { + $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); + + $mock->expects($this->at($index))->method('request') + ->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']]) + ->willReturn($response); + $response->expects($this->once())->method('getStatusCode')->willReturn(200); + $response->expects($this->once())->method('getContent')->willReturn(' + + https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue + + + 470a6f13-2ed9-4181-ad8a-2fdea142988e + + '); + + return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'; + } + + public function testKeepGettingPendingMessages() + { + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); + + $queueUrl = $this->handleGetQueueUrl(0, $httpClient); + + $httpClient->expects($this->at(1))->method('request') + ->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]]) + ->willReturn($response); + $response->expects($this->once())->method('getContent')->willReturn(' + + + 5fea7756-0ea4-451a-a703-a558b933e274 + + MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw + Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE + auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= + + fafb00f5732ab283681e124bf8747ed1 + {"body":"this is a test","headers":{}} + + SenderId + 195004372649 + + + SentTimestamp + 1238099229000 + + + ApproximateReceiveCount + 5 + + + ApproximateFirstReceiveTimestamp + 1250700979248 + + + + 5fea7756-0ea4-451a-a703-a558b933e274 + + MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw + Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE + auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= + + fafb00f5732ab283681e124bf8747ed1 + {"body":"this is a test","headers":{}} + + SenderId + 195004372649 + + + SentTimestamp + 1238099229000 + + + ApproximateReceiveCount + 5 + + + ApproximateFirstReceiveTimestamp + 1250700979248 + + + + 5fea7756-0ea4-451a-a703-a558b933e274 + + MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw + Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE + auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0= + + fafb00f5732ab283681e124bf8747ed1 + {"body":"this is a test","headers":{}} + + SenderId + 195004372649 + + + SentTimestamp + 1238099229000 + + + ApproximateReceiveCount + 5 + + + ApproximateFirstReceiveTimestamp + 1250700979248 + + + + + b6633655-283d-45b4-aee4-4e84e0ae6afa + + '); + + $connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient); + $this->assertNotNull($connection->get()); + $this->assertNotNull($connection->get()); + $this->assertNotNull($connection->get()); + } + + public function testUnexpectedSqsError() + { + $this->expectException(TransportException::class); + $this->expectExceptionMessage('SQS error happens'); + + $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock(); + $response = $this->getMockBuilder(ResponseInterface::class)->getMock(); + + $httpClient->expects($this->once())->method('request')->willReturn($response); + $response->expects($this->once())->method('getStatusCode')->willReturn(400); + $response->expects($this->once())->method('getContent')->willReturn(' + + Sender + boom + SQS error happens + + + 30441e49-5246-5231-9c87-4bd704b81ce9 + '); + $connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient); + $connection->get(); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php new file mode 100644 index 000000000000..363f4d4f7868 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php @@ -0,0 +1,32 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; + +/** + * @author Jérémy Derussé + */ +class AmazonSqsReceivedStamp implements NonSendableStampInterface +{ + private $id; + + public function __construct(string $id) + { + $this->id = $id; + } + + public function getId(): string + { + return $this->id; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php new file mode 100644 index 000000000000..3da773fa4d9b --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php @@ -0,0 +1,113 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\LogicException; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface; + +/** + * @author Jérémy Derussé + */ +class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + try { + $sqsEnvelope = $this->connection->get(); + } catch (HttpExceptionInterface $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + if (null === $sqsEnvelope) { + return; + } + + try { + $envelope = $this->serializer->decode([ + 'body' => $sqsEnvelope['body'], + 'headers' => $sqsEnvelope['headers'], + ]); + } catch (MessageDecodingFailedException $exception) { + $this->connection->delete($sqsEnvelope['id']); + + throw $exception; + } + + yield $envelope->with(new AmazonSqsReceivedStamp($sqsEnvelope['id'])); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + try { + $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); + } catch (HttpExceptionInterface $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + try { + $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId()); + } catch (HttpExceptionInterface $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + /** + * {@inheritdoc} + */ + public function getMessageCount(): int + { + try { + $this->connection->getMessageCount(); + } catch (HttpExceptionInterface $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + + private function findSqsReceivedStamp(Envelope $envelope): AmazonSqsReceivedStamp + { + /** @var AmazonSqsReceivedStamp|null $sqsReceivedStamp */ + $sqsReceivedStamp = $envelope->last(AmazonSqsReceivedStamp::class); + + if (null === $sqsReceivedStamp) { + throw new LogicException('No AmazonSqsReceivedStamp found on the Envelope.'); + } + + return $sqsReceivedStamp; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php new file mode 100644 index 000000000000..146cacf6d027 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php @@ -0,0 +1,54 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface; + +/** + * @author Jérémy Derussé + */ +class AmazonSqsSender implements SenderInterface +{ + private $connection; + private $serializer; + + public function __construct(Connection $connection, SerializerInterface $serializer) + { + $this->connection = $connection; + $this->serializer = $serializer; + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + $encodedMessage = $this->serializer->encode($envelope); + + /** @var DelayStamp|null $delayStamp */ + $delayStamp = $envelope->last(DelayStamp::class); + $delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0; + + try { + $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay); + } catch (HttpExceptionInterface $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + + return $envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php new file mode 100644 index 000000000000..6560b937f12d --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php @@ -0,0 +1,91 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\SetupableTransportInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; +use Symfony\Contracts\Service\ResetInterface; + +/** + * @author Jérémy Derussé + */ +class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, ResetInterface +{ + private $serializer; + private $connection; + private $receiver; + private $sender; + + public function __construct(Connection $connection, SerializerInterface $serializer = null) + { + $this->connection = $connection; + $this->serializer = $serializer ?? new PhpSerializer(); + } + + /** + * {@inheritdoc} + */ + public function get(): iterable + { + return ($this->receiver ?? $this->getReceiver())->get(); + } + + /** + * {@inheritdoc} + */ + public function ack(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->ack($envelope); + } + + /** + * {@inheritdoc} + */ + public function reject(Envelope $envelope): void + { + ($this->receiver ?? $this->getReceiver())->reject($envelope); + } + + /** + * {@inheritdoc} + */ + public function send(Envelope $envelope): Envelope + { + return ($this->sender ?? $this->getSender())->send($envelope); + } + + /** + * {@inheritdoc} + */ + public function setup(): void + { + $this->connection->setup(); + } + + public function reset() + { + $this->connection->reset(); + } + + private function getReceiver(): AmazonSqsReceiver + { + return $this->receiver = new AmazonSqsReceiver($this->connection, $this->serializer); + } + + private function getSender(): AmazonSqsSender + { + return $this->sender = new AmazonSqsSender($this->connection, $this->serializer); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php new file mode 100644 index 000000000000..aecde4d5df9e --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php @@ -0,0 +1,34 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; +use Symfony\Component\Messenger\Transport\TransportFactoryInterface; +use Symfony\Component\Messenger\Transport\TransportInterface; + +/** + * @author Jérémy Derussé + */ +class AmazonSqsTransportFactory implements TransportFactoryInterface +{ + public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface + { + unset($options['transport_name']); + + return new AmazonSqsTransport(Connection::fromDsn($dsn, $options), $serializer); + } + + public function supports(string $dsn, array $options): bool + { + return 0 === strpos($dsn, 'sqs://'); + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php new file mode 100644 index 000000000000..13931dd0c00d --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -0,0 +1,362 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport; + +use Symfony\Component\HttpClient\HttpClient; +use Symfony\Component\Messenger\Exception\InvalidArgumentException; +use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Contracts\HttpClient\HttpClientInterface; +use Symfony\Contracts\HttpClient\ResponseInterface; + +/** + * A SQS connection. + * + * @author Jérémy Derussé + * + * @internal + * @final + */ +class Connection +{ + private const DEFAULT_OPTIONS = [ + 'buffer_size' => 9, + 'wait_time' => 20, + 'poll_timeout' => 0.1, + 'visibility_timeout' => null, + 'auto_setup' => true, + 'access_key' => null, + 'secret_key' => null, + 'endpoint' => 'https://sqs.eu-west-1.amazonaws.com', + 'region' => 'eu-west-1', + 'queue_name' => 'messages', + 'account' => null, + ]; + + private $configuration; + private $client; + + /** @var ResponseInterface */ + private $currentResponse; + /** @var array[] */ + private $buffer = []; + /** @var string|null */ + private $queueUrl; + + public function __construct(array $configuration, HttpClientInterface $client = null) + { + $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration); + $this->client = $client ?? HttpClient::create(); + } + + public function __destruct() + { + $this->reset(); + } + + /** + * Creates a connection based on the DSN and options. + * + * Available options: + * + * * endpoint: absolute URL to the SQS service (Default: https://sqs.eu-west-1.amazonaws.com) + * * region: name of the AWS region (Default: eu-west-1) + * * queue_name: name of the queue (Default: messages) + * * account: identifier of the AWS account + * * access_key: AWS access key + * * secret_key: AWS secret key + * * buffer_size: number of messages to prefetch (Default: 9) + * * wait_time: long polling duration in seconds (Default: 20) + * * poll_timeout: amount of seconds the transport should wait for new message + * * visibility_timeout: amount of seconds the message won't be visible + * * auto_setup: Whether the queue should be created automatically during send / get (Default: true) + */ + public static function fromDsn(string $dsn, array $options = [], HttpClientInterface $client = null): self + { + if (false === $parsedUrl = parse_url($dsn)) { + throw new InvalidArgumentException(sprintf('The given Amazon SQS DSN "%s" is invalid.', $dsn)); + } + + $query = []; + if (isset($parsedUrl['query'])) { + parse_str($parsedUrl['query'], $query); + } + + $configuration = [ + 'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']), + 'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']), + 'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']), + 'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']), + 'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']), + 'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']), + 'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']), + 'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']), + ]; + + if ('default' === ($parsedUrl['host'] ?? 'default')) { + $configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']); + } else { + $configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : ''); + unset($query['sslmode']); + } + + $parsedPath = explode('/', ltrim($parsedUrl['path'] ?? '/', '/')); + if (\count($parsedPath) > 0) { + $configuration['queue_name'] = end($parsedPath); + } + $configuration['account'] = 2 === \count($parsedPath) ? $parsedPath[0] : null; + + // check for extra keys in options + $optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration)); + if (0 < \count($optionsExtraKeys)) { + throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); + } + + // check for extra keys in options + $queryExtraKeys = array_diff(array_keys($query), array_keys($configuration)); + if (0 < \count($queryExtraKeys)) { + throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS)))); + } + + return new self($configuration, $client); + } + + public function get(): ?array + { + if ($this->configuration['auto_setup']) { + $this->setup(); + } + + foreach ($this->getNextMessages() as $message) { + return $message; + } + + return null; + } + + /** + * @return array[] + */ + private function getNextMessages(): \Generator + { + yield from $this->getPendingMessages(); + yield from $this->getNewMessages(); + } + + /** + * @return array[] + */ + private function getPendingMessages(): \Generator + { + while (!empty($this->buffer)) { + yield array_shift($this->buffer); + } + } + + /** + * @return array[] + */ + private function getNewMessages(): \Generator + { + if (null === $this->currentResponse) { + $this->currentResponse = $this->request($this->getQueueUrl(), [ + 'Action' => 'ReceiveMessage', + 'VisibilityTimeout' => $this->configuration['visibility_timeout'], + 'MaxNumberOfMessages' => $this->configuration['buffer_size'], + 'WaitTimeSeconds' => $this->configuration['wait_time'], + ]); + } + + if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) { + return; + } + + $xml = new \SimpleXMLElement($this->currentResponse->getContent()); + foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) { + $this->buffer[] = [ + 'id' => (string) $xmlMessage->ReceiptHandle, + ] + json_decode($xmlMessage->Body, true); + } + + $this->currentResponse = null; + + yield from $this->getPendingMessages(); + } + + public function setup(): void + { + $this->call($this->configuration['endpoint'], [ + 'Action' => 'CreateQueue', + 'QueueName' => $this->configuration['queue_name'], + ]); + $this->queueUrl = null; + + $this->configuration['auto_setup'] = false; + } + + public function delete(string $id): void + { + $this->call($this->getQueueUrl(), [ + 'Action' => 'DeleteMessage', + 'ReceiptHandle' => $id, + ]); + } + + public function getMessageCount(): int + { + $response = $this->request($this->getQueueUrl(), [ + 'Action' => 'GetQueueAttributes', + 'AttributeNames' => ['ApproximateNumberOfMessages'], + ]); + $this->checkResponse($response); + $xml = new \SimpleXMLElement($response->getContent()); + foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) { + if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) { + continue; + } + + return (int) $attribute->Value; + } + + return 0; + } + + public function send(string $body, array $headers, int $delay = 0): void + { + if ($this->configuration['auto_setup']) { + $this->setup(); + } + + $this->call($this->getQueueUrl(), [ + 'Action' => 'SendMessage', + 'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]), + 'DelaySeconds' => $delay, + ]); + } + + public function reset(): void + { + if (null !== $this->currentResponse) { + $this->currentResponse->cancel(); + } + + foreach ($this->getPendingMessages() as $message) { + $this->call($this->getQueueUrl(), [ + 'Action' => 'ChangeMessageVisibility', + 'ReceiptHandle' => $message['id'], + 'VisibilityTimeout' => 0, + ]); + } + } + + private function getQueueUrl(): string + { + if (null === $this->queueUrl) { + $parameters = [ + 'Action' => 'GetQueueUrl', + 'QueueName' => $this->configuration['queue_name'], + ]; + if (isset($this->configuration['account'])) { + $parameters['QueueOwnerAWSAccountId'] = $this->configuration['account']; + } + + $response = $this->request($this->configuration['endpoint'], $parameters); + $this->checkResponse($response); + $xml = new \SimpleXMLElement($response->getContent()); + + $this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl; + } + + return $this->queueUrl; + } + + private function call(string $endpoint, array $body): void + { + $this->checkResponse($this->request($endpoint, $body)); + } + + private function request(string $endpoint, array $body): ResponseInterface + { + if (!$this->configuration['access_key']) { + return $this->client->request('POST', $endpoint, ['body' => $body]); + } + + $region = $this->configuration['region']; + $service = 'sqs'; + + $method = 'POST'; + $requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738); + $amzDate = gmdate('Ymd\THis\Z'); + $parsedUrl = parse_url($endpoint); + + $headers = [ + 'host' => $parsedUrl['host'], + 'x-amz-date' => $amzDate, + 'content-type' => 'application/x-www-form-urlencoded', + ]; + + $signedHeaders = ['host', 'x-amz-date']; + $canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string { + return sprintf('%s:%s', $headerName, $headers[$headerName]); + }, $signedHeaders))."\n"; + + $canonicalRequest = implode("\n", [ + $method, + $parsedUrl['path'] ?? '/', + '', + $canonicalHeaders, + implode(';', $signedHeaders), + hash('sha256', $requestParameters), + ]); + + $algorithm = 'AWS4-HMAC-SHA256'; + $credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request']; + + $signingKey = 'AWS4'.$this->configuration['secret_key']; + foreach ($credentialScope as $credential) { + $signingKey = hash_hmac('sha256', $credential, $signingKey, true); + } + + $stringToSign = implode("\n", [ + $algorithm, + $amzDate, + implode('/', $credentialScope), + hash('sha256', $canonicalRequest), + ]); + + $authorizationHeader = sprintf( + '%s Credential=%s/%s, SignedHeaders=%s, Signature=%s', + $algorithm, + $this->configuration['access_key'], + implode('/', $credentialScope), + implode(';', $signedHeaders), + hash_hmac('sha256', $stringToSign, $signingKey) + ); + + $options = [ + 'headers' => $headers + [ + 'authorization' => $authorizationHeader, + ], + 'body' => $requestParameters, + ]; + + return $this->client->request($method, $endpoint, $options); + } + + private function checkResponse(ResponseInterface $response): void + { + if (200 !== $response->getStatusCode()) { + $error = new \SimpleXMLElement($response->getContent(false)); + + throw new TransportException($error->Error->Message); + } + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json new file mode 100644 index 000000000000..8515a1b77577 --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json @@ -0,0 +1,40 @@ +{ + "name": "symfony/amazon-sqs-messenger", + "type": "symfony-bridge", + "description": "Symfony Amazon SQS extension Messenger Bridge", + "keywords": [], + "homepage": "https://symfony.com", + "license": "MIT", + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "require": { + "php": "^7.2.5", + "symfony/http-client": "^4.3|5.0", + "symfony/messenger": "^4.3|^5.0" + }, + "require-dev": { + "symfony/http-client-contracts": "^1.0|^2.0", + "symfony/property-access": "^4.4|^5.0", + "symfony/serializer": "^4.4|^5.0" + }, + "autoload": { + "psr-4": { "Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "5.1-dev" + } + } +} diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist new file mode 100644 index 000000000000..b1d8e9608a3e --- /dev/null +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + + + + ./Tests/ + + + + + + ./ + + ./Tests + ./vendor + + + +