Skip to content

Commit

Permalink
feature #36525 Improve SQS interoperability (jderusse)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 5.1-dev branch (closes #36525).

Discussion
----------

Improve SQS interoperability

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | NA
| License       | MIT
| Doc PR        | NA

The Symfony Messenger component provides a SerializerInterface to encode/decode the `Envelope`, this can be used to improve the Interoperability (see [article from jolicode](https://jolicode.com/blog/symfony-messenger-et-linteroperabilite) (french))

Sadly, the current implementation of SQS adapter json_encode the elements of the `Envelope` (`string body` + `string[] headers`) and store everything in the SQS message `Body`. That partially defect the interoperability: 3rd party have to also wrap (unwrap) message form json_encoded Body.

This PR leverage the AWS SQS `Body` and `MessageAttribute` properties to store message information:

```yaml
# before
SQS Message:
  Body: {"body": "hello world", "headers": {"foo": "bar"}}
  MessageAttributes: {}

# after
SQS Message:
  Body: hello world
  MessageAttributes:
    foor: bar
```

Commits
-------

00d84c1 Improve SQS interoperability
  • Loading branch information
chalasr committed Apr 23, 2020
2 parents f2f82d1 + 00d84c1 commit 83b37e8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
8 changes: 4 additions & 4 deletions .travis.yml
Expand Up @@ -21,8 +21,8 @@ env:
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_REDIS_DSN=redis://127.0.0.1:7006/messages
- MESSENGER_SQS_DSN=sqs://localhost:9494/messages?sslmode=disable
- MESSENGER_SQS_FIFO_QUEUE_DSN=sqs://localhost:9494/messages.fifo?sslmode=disable
- MESSENGER_SQS_DSN="sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01"
- MESSENGER_SQS_FIFO_QUEUE_DSN="sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01"
- SYMFONY_PHPUNIT_DISABLE_RESULT_CACHE=1

matrix:
Expand Down Expand Up @@ -73,8 +73,8 @@ before_install:
- |
# Start Sqs server
docker pull feathj/fake-sqs
docker run -d -p 9494:9494 --name sqs feathj/fake-sqs
docker pull asyncaws/testing-sqs
docker run -d -p 9494:9494 --name sqs asyncaws/testing-sqs
- |
# Start Kafka and install an up-to-date librdkafka
Expand Down
Expand Up @@ -109,7 +109,7 @@ public function testKeepGettingPendingMessages()
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);

$httpClient->expects($this->at(1))->method('request')
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
->willReturn($response);
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
<ReceiveMessageResult>
Expand Down
Expand Up @@ -176,6 +176,7 @@ private function getNewMessages(): \Generator
'Action' => 'ReceiveMessage',
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
'MessageAttributeName.1' => 'All',
'WaitTimeSeconds' => $this->configuration['wait_time'],
]);
}
Expand All @@ -186,9 +187,18 @@ private function getNewMessages(): \Generator

$xml = new \SimpleXMLElement($this->currentResponse->getContent());
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
$headers = [];
foreach ($xmlMessage->MessageAttribute as $item) {
if ('String' !== (string) $item->Value->DataType) {
continue;
}
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
}
$this->buffer[] = [
'id' => (string) $xmlMessage->ReceiptHandle,
] + json_decode($xmlMessage->Body, true);
'body' => (string) $xmlMessage->Body,
'headers' => $headers,
];
}

$this->currentResponse = null;
Expand Down Expand Up @@ -246,17 +256,23 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
$this->setup();
}

$messageBody = json_encode(['body' => $body, 'headers' => $headers]);

$parameters = [
'Action' => 'SendMessage',
'MessageBody' => $messageBody,
'MessageBody' => $body,
'DelaySeconds' => $delay,
];

$index = 0;
foreach ($headers as $name => $value) {
++$index;
$parameters["MessageAttribute.$index.Name"] = $name;
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
}

if ($this->isFifoQueue($this->configuration['queue_name'])) {
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
}

$this->call($this->getQueueUrl(), $parameters);
Expand Down

0 comments on commit 83b37e8

Please sign in to comment.