From 00d84c125ec32ef680a6754317511fded1065b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=A9my=20Deruss=C3=A9?= Date: Wed, 22 Apr 2020 13:43:49 +0200 Subject: [PATCH] Improve SQS interoperability --- .travis.yml | 8 +++--- .../Tests/Transport/ConnectionTest.php | 2 +- .../Bridge/AmazonSqs/Transport/Connection.php | 26 +++++++++++++++---- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/.travis.yml b/.travis.yml index bce17066f71b..e4354aa72857 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,8 +21,8 @@ env: - SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php - MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages - MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages - - MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable - - MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable + - MESSENGER_SQS_DSN="sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01" + - MESSENGER_SQS_FIFO_QUEUE_DSN="sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01" - SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1 matrix: @@ -73,8 +73,8 @@ before_install: - | # Start Sqs server - docker pull feathj/fake-sqs - docker run -d -p 9494:9494 --name sqs feathj/fake-sqs + docker pull asyncaws/testing-sqs + docker run -d -p 9494:9494 --name sqs asyncaws/testing-sqs - | # Start Kafka and install an up-to-date librdkafka diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php index ef6ac5875a47..f6d295c27537 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php @@ -109,7 +109,7 @@ public function testKeepGettingPendingMessages() $queueUrl = $this->handleGetQueueUrl(0, $httpClient); $httpClient->expects($this->at(1))->method('request') - ->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]]) + ->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']]) ->willReturn($response); $response->expects($this->once())->method('getContent')->willReturn(' diff --git a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php index f65d28d1486d..de84c11184ac 100644 --- a/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php @@ -176,6 +176,7 @@ private function getNewMessages(): \Generator 'Action' => 'ReceiveMessage', 'VisibilityTimeout' => $this->configuration['visibility_timeout'], 'MaxNumberOfMessages' => $this->configuration['buffer_size'], + 'MessageAttributeName.1' => 'All', 'WaitTimeSeconds' => $this->configuration['wait_time'], ]); } @@ -186,9 +187,18 @@ private function getNewMessages(): \Generator $xml = new \SimpleXMLElement($this->currentResponse->getContent()); foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) { + $headers = []; + foreach ($xmlMessage->MessageAttribute as $item) { + if ('String' !== (string) $item->Value->DataType) { + continue; + } + $headers[(string) $item->Name] = (string) $item->Value->StringValue; + } $this->buffer[] = [ 'id' => (string) $xmlMessage->ReceiptHandle, - ] + json_decode($xmlMessage->Body, true); + 'body' => (string) $xmlMessage->Body, + 'headers' => $headers, + ]; } $this->currentResponse = null; @@ -246,17 +256,23 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess $this->setup(); } - $messageBody = json_encode(['body' => $body, 'headers' => $headers]); - $parameters = [ 'Action' => 'SendMessage', - 'MessageBody' => $messageBody, + 'MessageBody' => $body, 'DelaySeconds' => $delay, ]; + $index = 0; + foreach ($headers as $name => $value) { + ++$index; + $parameters["MessageAttribute.$index.Name"] = $name; + $parameters["MessageAttribute.$index.Value.DataType"] = 'String'; + $parameters["MessageAttribute.$index.Value.StringValue"] = $value; + } + if ($this->isFifoQueue($this->configuration['queue_name'])) { $parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__; - $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody); + $parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers])); } $this->call($this->getQueueUrl(), $parameters);