diff --git a/.travis.yml b/.travis.yml
index 1fd075d0b0c2..c894464d541a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,6 +21,7 @@ env:
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
+ - MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1
matrix:
@@ -69,6 +70,11 @@ before_install:
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
+ - |
+ # Start Sqs server
+ docker pull feathj/fake-sqs
+ docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
+
- |
# Start Kafka and install an up-to-date librdkafka
docker network create kafka_network
diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
index aa5c50e5cdc8..5c7c2f6f5fbc 100644
--- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
@@ -85,6 +85,10 @@
+
+
+
+
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes
new file mode 100644
index 000000000000..ebb9287043dc
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitattributes
@@ -0,0 +1,3 @@
+/Tests export-ignore
+/phpunit.xml.dist export-ignore
+/.gitignore export-ignore
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore
new file mode 100644
index 000000000000..c49a5d8df5c6
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/.gitignore
@@ -0,0 +1,3 @@
+vendor/
+composer.lock
+phpunit.xml
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md
new file mode 100644
index 000000000000..cf996bb4f6cd
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md
@@ -0,0 +1,8 @@
+CHANGELOG
+=========
+
+5.1.0
+-----
+
+ * Introduced the Amazon SQS bridge.
+
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE
new file mode 100644
index 000000000000..5593b1d84f74
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2020 Fabien Potencier
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is furnished
+to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md
new file mode 100644
index 000000000000..eabf1d6100b2
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/README.md
@@ -0,0 +1,12 @@
+Amazon SQS Messenger
+====================
+
+Provides Amazon SQS integration for Symfony Messenger.
+
+Resources
+---------
+
+ * [Contributing](https://symfony.com/doc/current/contributing/index.html)
+ * [Report issues](https://github.com/symfony/symfony/issues) and
+ [send Pull Requests](https://github.com/symfony/symfony/pulls)
+ in the [main Symfony repository](https://github.com/symfony/symfony)
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php
new file mode 100644
index 000000000000..59d4a2ad35cb
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Fixtures/DummyMessage.php
@@ -0,0 +1,18 @@
+message = $message;
+ }
+
+ public function getMessage(): string
+ {
+ return $this->message;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php
new file mode 100644
index 000000000000..cd398edcbff7
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php
@@ -0,0 +1,58 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
+
+class AmazonSqsIntegrationTest extends TestCase
+{
+ private $connection;
+
+ protected function setUp(): void
+ {
+ if (!getenv('MESSENGER_SQS_DSN')) {
+ $this->markTestSkipped('The "MESSENGER_SQS_DSN" environment variable is required.');
+ }
+
+ $this->connection = Connection::fromDsn(getenv('MESSENGER_SQS_DSN'), []);
+ $this->connection->setup();
+ $this->clearSqs();
+ }
+
+ public function testConnectionSendAndGet()
+ {
+ $this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
+ $this->assertSame(1, $this->connection->getMessageCount());
+
+ $wait = 0;
+ while ((null === $encoded = $this->connection->get()) && $wait++ < 200) {
+ usleep(5000);
+ }
+
+ $this->assertEquals('{"message": "Hi"}', $encoded['body']);
+ $this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
+ }
+
+ private function clearSqs()
+ {
+ $wait = 0;
+ while ($wait++ < 50) {
+ if (null === $message = $this->connection->get()) {
+ usleep(5000);
+ continue;
+ }
+ $this->connection->delete($message['id']);
+ }
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php
new file mode 100644
index 000000000000..b624dcb4868e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsReceiverTest.php
@@ -0,0 +1,76 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
+use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\Serializer;
+use Symfony\Component\Serializer as SerializerComponent;
+use Symfony\Component\Serializer\Encoder\JsonEncoder;
+use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
+
+class AmazonSqsReceiverTest extends TestCase
+{
+ public function testItReturnsTheDecodedMessageToTheHandler()
+ {
+ $serializer = $this->createSerializer();
+
+ $sqsEnvelop = $this->createSqsEnvelope();
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($sqsEnvelop);
+
+ $receiver = new AmazonSqsReceiver($connection, $serializer);
+ $actualEnvelopes = iterator_to_array($receiver->get());
+ $this->assertCount(1, $actualEnvelopes);
+ $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
+ }
+
+ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
+ {
+ $this->expectException(MessageDecodingFailedException::class);
+
+ $serializer = $this->createMock(PhpSerializer::class);
+ $serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
+
+ $sqsEnvelop = $this->createSqsEnvelope();
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+ $connection->method('get')->willReturn($sqsEnvelop);
+ $connection->expects($this->once())->method('delete');
+
+ $receiver = new AmazonSqsReceiver($connection, $serializer);
+ iterator_to_array($receiver->get());
+ }
+
+ private function createSqsEnvelope()
+ {
+ return [
+ 'id' => 1,
+ 'body' => '{"message": "Hi"}',
+ 'headers' => [
+ 'type' => DummyMessage::class,
+ ],
+ ];
+ }
+
+ private function createSerializer(): Serializer
+ {
+ $serializer = new Serializer(
+ new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
+ );
+
+ return $serializer;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php
new file mode 100644
index 000000000000..f0a1178d4167
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php
@@ -0,0 +1,39 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+
+class AmazonSqsSenderTest extends TestCase
+{
+ public function testSend()
+ {
+ $envelope = new Envelope(new DummyMessage('Oy'));
+ $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
+
+ $connection = $this->getMockBuilder(Connection::class)
+ ->disableOriginalConstructor()
+ ->getMock();
+ $connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers']);
+
+ $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
+ $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
+
+ $sender = new AmazonSqsSender($connection, $serializer);
+ $sender->send($envelope);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php
new file mode 100644
index 000000000000..ed8f085d670c
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportFactoryTest.php
@@ -0,0 +1,27 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
+
+class AmazonSqsTransportFactoryTest extends TestCase
+{
+ public function testSupportsOnlySqsTransports()
+ {
+ $factory = new AmazonSqsTransportFactory();
+
+ $this->assertTrue($factory->supports('sqs://localhost', []));
+ $this->assertFalse($factory->supports('redis://localhost', []));
+ $this->assertFalse($factory->supports('invalid-dsn', []));
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php
new file mode 100644
index 000000000000..9e6430506303
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsTransportTest.php
@@ -0,0 +1,60 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+class AmazonSqsTransportTest extends TestCase
+{
+ public function testItIsATransport()
+ {
+ $transport = $this->getTransport();
+
+ $this->assertInstanceOf(TransportInterface::class, $transport);
+ }
+
+ public function testReceivesMessages()
+ {
+ $transport = $this->getTransport(
+ $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
+ $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock()
+ );
+
+ $decodedMessage = new DummyMessage('Decoded.');
+
+ $sqsEnvelope = [
+ 'id' => '5',
+ 'body' => 'body',
+ 'headers' => ['my' => 'header'],
+ ];
+
+ $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
+ $connection->method('get')->willReturn($sqsEnvelope);
+
+ $envelopes = iterator_to_array($transport->get());
+ $this->assertSame($decodedMessage, $envelopes[0]->getMessage());
+ }
+
+ private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)
+ {
+ $serializer = $serializer ?: $this->getMockBuilder(SerializerInterface::class)->getMock();
+ $connection = $connection ?: $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
+
+ return new AmazonSqsTransport($connection, $serializer);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php
new file mode 100644
index 000000000000..ef6ac5875a47
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php
@@ -0,0 +1,228 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
+
+use PHPUnit\Framework\TestCase;
+use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
+use Symfony\Component\Messenger\Exception\TransportException;
+use Symfony\Contracts\HttpClient\HttpClientInterface;
+use Symfony\Contracts\HttpClient\ResponseInterface;
+
+class ConnectionTest extends TestCase
+{
+ public function testFromInvalidDsn()
+ {
+ $this->expectException(\InvalidArgumentException::class);
+ $this->expectExceptionMessage('The given Amazon SQS DSN "sqs://" is invalid.');
+
+ Connection::fromDsn('sqs://');
+ }
+
+ public function testFromDsn()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
+ Connection::fromDsn('sqs://default/queue', [], $httpClient)
+ );
+ }
+
+ public function testFromDsnWithRegion()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
+ Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
+ );
+ }
+
+ public function testFromDsnWithCustomEndpoint()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
+ Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
+ );
+ }
+
+ public function testFromDsnWithCustomEndpointAndPort()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
+ Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
+ );
+ }
+
+ public function testFromDsnWithOptions()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
+ Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
+ );
+ }
+
+ public function testFromDsnWithQueryOptions()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $this->assertEquals(
+ new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
+ Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
+ );
+ }
+
+ private function handleGetQueueUrl(int $index, $mock): string
+ {
+ $response = $this->getMockBuilder(ResponseInterface::class)->getMock();
+
+ $mock->expects($this->at($index))->method('request')
+ ->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
+ ->willReturn($response);
+ $response->expects($this->once())->method('getStatusCode')->willReturn(200);
+ $response->expects($this->once())->method('getContent')->willReturn('
+
+ https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue
+
+
+ 470a6f13-2ed9-4181-ad8a-2fdea142988e
+
+ ');
+
+ return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
+ }
+
+ public function testKeepGettingPendingMessages()
+ {
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $response = $this->getMockBuilder(ResponseInterface::class)->getMock();
+
+ $queueUrl = $this->handleGetQueueUrl(0, $httpClient);
+
+ $httpClient->expects($this->at(1))->method('request')
+ ->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
+ ->willReturn($response);
+ $response->expects($this->once())->method('getContent')->willReturn('
+
+
+ 5fea7756-0ea4-451a-a703-a558b933e274
+
+ MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
+ Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
+ auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
+
+ fafb00f5732ab283681e124bf8747ed1
+ {"body":"this is a test","headers":{}}
+
+ SenderId
+ 195004372649
+
+
+ SentTimestamp
+ 1238099229000
+
+
+ ApproximateReceiveCount
+ 5
+
+
+ ApproximateFirstReceiveTimestamp
+ 1250700979248
+
+
+
+ 5fea7756-0ea4-451a-a703-a558b933e274
+
+ MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
+ Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
+ auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
+
+ fafb00f5732ab283681e124bf8747ed1
+ {"body":"this is a test","headers":{}}
+
+ SenderId
+ 195004372649
+
+
+ SentTimestamp
+ 1238099229000
+
+
+ ApproximateReceiveCount
+ 5
+
+
+ ApproximateFirstReceiveTimestamp
+ 1250700979248
+
+
+
+ 5fea7756-0ea4-451a-a703-a558b933e274
+
+ MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
+ Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
+ auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
+
+ fafb00f5732ab283681e124bf8747ed1
+ {"body":"this is a test","headers":{}}
+
+ SenderId
+ 195004372649
+
+
+ SentTimestamp
+ 1238099229000
+
+
+ ApproximateReceiveCount
+ 5
+
+
+ ApproximateFirstReceiveTimestamp
+ 1250700979248
+
+
+
+
+ b6633655-283d-45b4-aee4-4e84e0ae6afa
+
+ ');
+
+ $connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
+ $this->assertNotNull($connection->get());
+ $this->assertNotNull($connection->get());
+ $this->assertNotNull($connection->get());
+ }
+
+ public function testUnexpectedSqsError()
+ {
+ $this->expectException(TransportException::class);
+ $this->expectExceptionMessage('SQS error happens');
+
+ $httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
+ $response = $this->getMockBuilder(ResponseInterface::class)->getMock();
+
+ $httpClient->expects($this->once())->method('request')->willReturn($response);
+ $response->expects($this->once())->method('getStatusCode')->willReturn(400);
+ $response->expects($this->once())->method('getContent')->willReturn('
+
+ Sender
+ boom
+ SQS error happens
+
+
+ 30441e49-5246-5231-9c87-4bd704b81ce9
+ ');
+ $connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
+ $connection->get();
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php
new file mode 100644
index 000000000000..363f4d4f7868
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceivedStamp.php
@@ -0,0 +1,32 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
+
+/**
+ * @author Jérémy Derussé
+ */
+class AmazonSqsReceivedStamp implements NonSendableStampInterface
+{
+ private $id;
+
+ public function __construct(string $id)
+ {
+ $this->id = $id;
+ }
+
+ public function getId(): string
+ {
+ return $this->id;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php
new file mode 100644
index 000000000000..3da773fa4d9b
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php
@@ -0,0 +1,113 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\LogicException;
+use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
+use Symfony\Component\Messenger\Exception\TransportException;
+use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
+use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
+
+/**
+ * @author Jérémy Derussé
+ */
+class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface
+{
+ private $connection;
+ private $serializer;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer = null)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ try {
+ $sqsEnvelope = $this->connection->get();
+ } catch (HttpExceptionInterface $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+ if (null === $sqsEnvelope) {
+ return;
+ }
+
+ try {
+ $envelope = $this->serializer->decode([
+ 'body' => $sqsEnvelope['body'],
+ 'headers' => $sqsEnvelope['headers'],
+ ]);
+ } catch (MessageDecodingFailedException $exception) {
+ $this->connection->delete($sqsEnvelope['id']);
+
+ throw $exception;
+ }
+
+ yield $envelope->with(new AmazonSqsReceivedStamp($sqsEnvelope['id']));
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ try {
+ $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
+ } catch (HttpExceptionInterface $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ try {
+ $this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
+ } catch (HttpExceptionInterface $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function getMessageCount(): int
+ {
+ try {
+ $this->connection->getMessageCount();
+ } catch (HttpExceptionInterface $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+ }
+
+ private function findSqsReceivedStamp(Envelope $envelope): AmazonSqsReceivedStamp
+ {
+ /** @var AmazonSqsReceivedStamp|null $sqsReceivedStamp */
+ $sqsReceivedStamp = $envelope->last(AmazonSqsReceivedStamp::class);
+
+ if (null === $sqsReceivedStamp) {
+ throw new LogicException('No AmazonSqsReceivedStamp found on the Envelope.');
+ }
+
+ return $sqsReceivedStamp;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php
new file mode 100644
index 000000000000..146cacf6d027
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php
@@ -0,0 +1,54 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Exception\TransportException;
+use Symfony\Component\Messenger\Stamp\DelayStamp;
+use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
+
+/**
+ * @author Jérémy Derussé
+ */
+class AmazonSqsSender implements SenderInterface
+{
+ private $connection;
+ private $serializer;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer;
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ $encodedMessage = $this->serializer->encode($envelope);
+
+ /** @var DelayStamp|null $delayStamp */
+ $delayStamp = $envelope->last(DelayStamp::class);
+ $delay = null !== $delayStamp ? (int) ceil($delayStamp->getDelay() / 1000) : 0;
+
+ try {
+ $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay);
+ } catch (HttpExceptionInterface $e) {
+ throw new TransportException($e->getMessage(), 0, $e);
+ }
+
+ return $envelope;
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php
new file mode 100644
index 000000000000..6560b937f12d
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransport.php
@@ -0,0 +1,91 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+use Symfony\Contracts\Service\ResetInterface;
+
+/**
+ * @author Jérémy Derussé
+ */
+class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, ResetInterface
+{
+ private $serializer;
+ private $connection;
+ private $receiver;
+ private $sender;
+
+ public function __construct(Connection $connection, SerializerInterface $serializer = null)
+ {
+ $this->connection = $connection;
+ $this->serializer = $serializer ?? new PhpSerializer();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function get(): iterable
+ {
+ return ($this->receiver ?? $this->getReceiver())->get();
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function ack(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->ack($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function reject(Envelope $envelope): void
+ {
+ ($this->receiver ?? $this->getReceiver())->reject($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function send(Envelope $envelope): Envelope
+ {
+ return ($this->sender ?? $this->getSender())->send($envelope);
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function setup(): void
+ {
+ $this->connection->setup();
+ }
+
+ public function reset()
+ {
+ $this->connection->reset();
+ }
+
+ private function getReceiver(): AmazonSqsReceiver
+ {
+ return $this->receiver = new AmazonSqsReceiver($this->connection, $this->serializer);
+ }
+
+ private function getSender(): AmazonSqsSender
+ {
+ return $this->sender = new AmazonSqsSender($this->connection, $this->serializer);
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php
new file mode 100644
index 000000000000..aecde4d5df9e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsTransportFactory.php
@@ -0,0 +1,34 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
+use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
+use Symfony\Component\Messenger\Transport\TransportInterface;
+
+/**
+ * @author Jérémy Derussé
+ */
+class AmazonSqsTransportFactory implements TransportFactoryInterface
+{
+ public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
+ {
+ unset($options['transport_name']);
+
+ return new AmazonSqsTransport(Connection::fromDsn($dsn, $options), $serializer);
+ }
+
+ public function supports(string $dsn, array $options): bool
+ {
+ return 0 === strpos($dsn, 'sqs://');
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php
new file mode 100644
index 000000000000..13931dd0c00d
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php
@@ -0,0 +1,362 @@
+
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
+
+use Symfony\Component\HttpClient\HttpClient;
+use Symfony\Component\Messenger\Exception\InvalidArgumentException;
+use Symfony\Component\Messenger\Exception\TransportException;
+use Symfony\Contracts\HttpClient\HttpClientInterface;
+use Symfony\Contracts\HttpClient\ResponseInterface;
+
+/**
+ * A SQS connection.
+ *
+ * @author Jérémy Derussé
+ *
+ * @internal
+ * @final
+ */
+class Connection
+{
+ private const DEFAULT_OPTIONS = [
+ 'buffer_size' => 9,
+ 'wait_time' => 20,
+ 'poll_timeout' => 0.1,
+ 'visibility_timeout' => null,
+ 'auto_setup' => true,
+ 'access_key' => null,
+ 'secret_key' => null,
+ 'endpoint' => 'https://sqs.eu-west-1.amazonaws.com',
+ 'region' => 'eu-west-1',
+ 'queue_name' => 'messages',
+ 'account' => null,
+ ];
+
+ private $configuration;
+ private $client;
+
+ /** @var ResponseInterface */
+ private $currentResponse;
+ /** @var array[] */
+ private $buffer = [];
+ /** @var string|null */
+ private $queueUrl;
+
+ public function __construct(array $configuration, HttpClientInterface $client = null)
+ {
+ $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
+ $this->client = $client ?? HttpClient::create();
+ }
+
+ public function __destruct()
+ {
+ $this->reset();
+ }
+
+ /**
+ * Creates a connection based on the DSN and options.
+ *
+ * Available options:
+ *
+ * * endpoint: absolute URL to the SQS service (Default: https://sqs.eu-west-1.amazonaws.com)
+ * * region: name of the AWS region (Default: eu-west-1)
+ * * queue_name: name of the queue (Default: messages)
+ * * account: identifier of the AWS account
+ * * access_key: AWS access key
+ * * secret_key: AWS secret key
+ * * buffer_size: number of messages to prefetch (Default: 9)
+ * * wait_time: long polling duration in seconds (Default: 20)
+ * * poll_timeout: amount of seconds the transport should wait for new message
+ * * visibility_timeout: amount of seconds the message won't be visible
+ * * auto_setup: Whether the queue should be created automatically during send / get (Default: true)
+ */
+ public static function fromDsn(string $dsn, array $options = [], HttpClientInterface $client = null): self
+ {
+ if (false === $parsedUrl = parse_url($dsn)) {
+ throw new InvalidArgumentException(sprintf('The given Amazon SQS DSN "%s" is invalid.', $dsn));
+ }
+
+ $query = [];
+ if (isset($parsedUrl['query'])) {
+ parse_str($parsedUrl['query'], $query);
+ }
+
+ $configuration = [
+ 'region' => $options['region'] ?? ($query['region'] ?? self::DEFAULT_OPTIONS['region']),
+ 'buffer_size' => $options['buffer_size'] ?? (int) ($query['buffer_size'] ?? self::DEFAULT_OPTIONS['buffer_size']),
+ 'wait_time' => $options['wait_time'] ?? (int) ($query['wait_time'] ?? self::DEFAULT_OPTIONS['wait_time']),
+ 'poll_timeout' => $options['poll_timeout'] ?? ($query['poll_timeout'] ?? self::DEFAULT_OPTIONS['poll_timeout']),
+ 'visibility_timeout' => $options['visibility_timeout'] ?? ($query['visibility_timeout'] ?? self::DEFAULT_OPTIONS['visibility_timeout']),
+ 'auto_setup' => $options['auto_setup'] ?? (bool) ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
+ 'access_key' => $options['access_key'] ?? (urldecode($parsedUrl['user'] ?? '') ?: self::DEFAULT_OPTIONS['access_key']),
+ 'secret_key' => $options['secret_key'] ?? (urldecode($parsedUrl['pass'] ?? '') ?: self::DEFAULT_OPTIONS['secret_key']),
+ ];
+
+ if ('default' === ($parsedUrl['host'] ?? 'default')) {
+ $configuration['endpoint'] = sprintf('https://sqs.%s.amazonaws.com', $configuration['region']);
+ } else {
+ $configuration['endpoint'] = sprintf('%s://%s%s', ($query['sslmode'] ?? null) === 'disable' ? 'http' : 'https', $parsedUrl['host'], ($parsedUrl['port'] ?? null) ? ':'.$parsedUrl['port'] : '');
+ unset($query['sslmode']);
+ }
+
+ $parsedPath = explode('/', ltrim($parsedUrl['path'] ?? '/', '/'));
+ if (\count($parsedPath) > 0) {
+ $configuration['queue_name'] = end($parsedPath);
+ }
+ $configuration['account'] = 2 === \count($parsedPath) ? $parsedPath[0] : null;
+
+ // check for extra keys in options
+ $optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
+ if (0 < \count($optionsExtraKeys)) {
+ throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s]', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
+ }
+
+ // check for extra keys in options
+ $queryExtraKeys = array_diff(array_keys($query), array_keys($configuration));
+ if (0 < \count($queryExtraKeys)) {
+ throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s]', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
+ }
+
+ return new self($configuration, $client);
+ }
+
+ public function get(): ?array
+ {
+ if ($this->configuration['auto_setup']) {
+ $this->setup();
+ }
+
+ foreach ($this->getNextMessages() as $message) {
+ return $message;
+ }
+
+ return null;
+ }
+
+ /**
+ * @return array[]
+ */
+ private function getNextMessages(): \Generator
+ {
+ yield from $this->getPendingMessages();
+ yield from $this->getNewMessages();
+ }
+
+ /**
+ * @return array[]
+ */
+ private function getPendingMessages(): \Generator
+ {
+ while (!empty($this->buffer)) {
+ yield array_shift($this->buffer);
+ }
+ }
+
+ /**
+ * @return array[]
+ */
+ private function getNewMessages(): \Generator
+ {
+ if (null === $this->currentResponse) {
+ $this->currentResponse = $this->request($this->getQueueUrl(), [
+ 'Action' => 'ReceiveMessage',
+ 'VisibilityTimeout' => $this->configuration['visibility_timeout'],
+ 'MaxNumberOfMessages' => $this->configuration['buffer_size'],
+ 'WaitTimeSeconds' => $this->configuration['wait_time'],
+ ]);
+ }
+
+ if ($this->client->stream($this->currentResponse, $this->configuration['poll_timeout'])->current()->isTimeout()) {
+ return;
+ }
+
+ $xml = new \SimpleXMLElement($this->currentResponse->getContent());
+ foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
+ $this->buffer[] = [
+ 'id' => (string) $xmlMessage->ReceiptHandle,
+ ] + json_decode($xmlMessage->Body, true);
+ }
+
+ $this->currentResponse = null;
+
+ yield from $this->getPendingMessages();
+ }
+
+ public function setup(): void
+ {
+ $this->call($this->configuration['endpoint'], [
+ 'Action' => 'CreateQueue',
+ 'QueueName' => $this->configuration['queue_name'],
+ ]);
+ $this->queueUrl = null;
+
+ $this->configuration['auto_setup'] = false;
+ }
+
+ public function delete(string $id): void
+ {
+ $this->call($this->getQueueUrl(), [
+ 'Action' => 'DeleteMessage',
+ 'ReceiptHandle' => $id,
+ ]);
+ }
+
+ public function getMessageCount(): int
+ {
+ $response = $this->request($this->getQueueUrl(), [
+ 'Action' => 'GetQueueAttributes',
+ 'AttributeNames' => ['ApproximateNumberOfMessages'],
+ ]);
+ $this->checkResponse($response);
+ $xml = new \SimpleXMLElement($response->getContent());
+ foreach ($xml->GetQueueAttributesResult->Attribute as $attribute) {
+ if ('ApproximateNumberOfMessages' !== (string) $attribute->Name) {
+ continue;
+ }
+
+ return (int) $attribute->Value;
+ }
+
+ return 0;
+ }
+
+ public function send(string $body, array $headers, int $delay = 0): void
+ {
+ if ($this->configuration['auto_setup']) {
+ $this->setup();
+ }
+
+ $this->call($this->getQueueUrl(), [
+ 'Action' => 'SendMessage',
+ 'MessageBody' => json_encode(['body' => $body, 'headers' => $headers]),
+ 'DelaySeconds' => $delay,
+ ]);
+ }
+
+ public function reset(): void
+ {
+ if (null !== $this->currentResponse) {
+ $this->currentResponse->cancel();
+ }
+
+ foreach ($this->getPendingMessages() as $message) {
+ $this->call($this->getQueueUrl(), [
+ 'Action' => 'ChangeMessageVisibility',
+ 'ReceiptHandle' => $message['id'],
+ 'VisibilityTimeout' => 0,
+ ]);
+ }
+ }
+
+ private function getQueueUrl(): string
+ {
+ if (null === $this->queueUrl) {
+ $parameters = [
+ 'Action' => 'GetQueueUrl',
+ 'QueueName' => $this->configuration['queue_name'],
+ ];
+ if (isset($this->configuration['account'])) {
+ $parameters['QueueOwnerAWSAccountId'] = $this->configuration['account'];
+ }
+
+ $response = $this->request($this->configuration['endpoint'], $parameters);
+ $this->checkResponse($response);
+ $xml = new \SimpleXMLElement($response->getContent());
+
+ $this->queueUrl = (string) $xml->GetQueueUrlResult->QueueUrl;
+ }
+
+ return $this->queueUrl;
+ }
+
+ private function call(string $endpoint, array $body): void
+ {
+ $this->checkResponse($this->request($endpoint, $body));
+ }
+
+ private function request(string $endpoint, array $body): ResponseInterface
+ {
+ if (!$this->configuration['access_key']) {
+ return $this->client->request('POST', $endpoint, ['body' => $body]);
+ }
+
+ $region = $this->configuration['region'];
+ $service = 'sqs';
+
+ $method = 'POST';
+ $requestParameters = http_build_query($body, '', '&', PHP_QUERY_RFC1738);
+ $amzDate = gmdate('Ymd\THis\Z');
+ $parsedUrl = parse_url($endpoint);
+
+ $headers = [
+ 'host' => $parsedUrl['host'],
+ 'x-amz-date' => $amzDate,
+ 'content-type' => 'application/x-www-form-urlencoded',
+ ];
+
+ $signedHeaders = ['host', 'x-amz-date'];
+ $canonicalHeaders = implode("\n", array_map(function ($headerName) use ($headers): string {
+ return sprintf('%s:%s', $headerName, $headers[$headerName]);
+ }, $signedHeaders))."\n";
+
+ $canonicalRequest = implode("\n", [
+ $method,
+ $parsedUrl['path'] ?? '/',
+ '',
+ $canonicalHeaders,
+ implode(';', $signedHeaders),
+ hash('sha256', $requestParameters),
+ ]);
+
+ $algorithm = 'AWS4-HMAC-SHA256';
+ $credentialScope = [gmdate('Ymd'), $region, $service, 'aws4_request'];
+
+ $signingKey = 'AWS4'.$this->configuration['secret_key'];
+ foreach ($credentialScope as $credential) {
+ $signingKey = hash_hmac('sha256', $credential, $signingKey, true);
+ }
+
+ $stringToSign = implode("\n", [
+ $algorithm,
+ $amzDate,
+ implode('/', $credentialScope),
+ hash('sha256', $canonicalRequest),
+ ]);
+
+ $authorizationHeader = sprintf(
+ '%s Credential=%s/%s, SignedHeaders=%s, Signature=%s',
+ $algorithm,
+ $this->configuration['access_key'],
+ implode('/', $credentialScope),
+ implode(';', $signedHeaders),
+ hash_hmac('sha256', $stringToSign, $signingKey)
+ );
+
+ $options = [
+ 'headers' => $headers + [
+ 'authorization' => $authorizationHeader,
+ ],
+ 'body' => $requestParameters,
+ ];
+
+ return $this->client->request($method, $endpoint, $options);
+ }
+
+ private function checkResponse(ResponseInterface $response): void
+ {
+ if (200 !== $response->getStatusCode()) {
+ $error = new \SimpleXMLElement($response->getContent(false));
+
+ throw new TransportException($error->Error->Message);
+ }
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json
new file mode 100644
index 000000000000..8515a1b77577
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/composer.json
@@ -0,0 +1,40 @@
+{
+ "name": "symfony/amazon-sqs-messenger",
+ "type": "symfony-bridge",
+ "description": "Symfony Amazon SQS extension Messenger Bridge",
+ "keywords": [],
+ "homepage": "https://symfony.com",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "Fabien Potencier",
+ "email": "fabien@symfony.com"
+ },
+ {
+ "name": "Symfony Community",
+ "homepage": "https://symfony.com/contributors"
+ }
+ ],
+ "require": {
+ "php": "^7.2.5",
+ "symfony/http-client": "^4.3|5.0",
+ "symfony/messenger": "^4.3|^5.0"
+ },
+ "require-dev": {
+ "symfony/http-client-contracts": "^1.0|^2.0",
+ "symfony/property-access": "^4.4|^5.0",
+ "symfony/serializer": "^4.4|^5.0"
+ },
+ "autoload": {
+ "psr-4": { "Symfony\\Component\\Messenger\\Bridge\\AmazonSqs\\": "" },
+ "exclude-from-classmap": [
+ "/Tests/"
+ ]
+ },
+ "minimum-stability": "dev",
+ "extra": {
+ "branch-alias": {
+ "dev-master": "5.1-dev"
+ }
+ }
+}
diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist
new file mode 100644
index 000000000000..b1d8e9608a3e
--- /dev/null
+++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/phpunit.xml.dist
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+ ./Tests/
+
+
+
+
+
+ ./
+
+ ./Tests
+ ./vendor
+
+
+
+