Skip to content

Commit

Permalink
bug #34134 [Messenger] fix retry of messages losing the routing key a…
Browse files Browse the repository at this point in the history
…nd properties (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] fix retry of messages losing the routing key and properties

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       | Fix #32994 <!-- prefix each issue number with "Fix #", if any -->
| License       | MIT
| Doc PR        |

Messages sent for retry in rabbitmq lost the routing key and properties like the priority. Now we read those original properties and sent the retry message with the same properties (unless those properties have already been set manually before).

Commits
-------

75c674d [Messenger] fix retry of messages losing the routing key and properties
  • Loading branch information
fabpot committed Nov 4, 2019
2 parents 142bddd + 75c674d commit e057a9c
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 7 deletions.
Expand Up @@ -34,4 +34,40 @@ public function testFlagsAndAttributes()
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
}

public function testCreateFromAmqpEnvelope()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');

$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope);

$this->assertSame($amqpEnvelope->getRoutingKey(), $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame($amqpEnvelope->getPriority(), $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
}

public function testCreateFromAmqpEnvelopeWithPreviousStamp()
{
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getRoutingKey')->willReturn('routingkey');
$amqpEnvelope->method('getDeliveryMode')->willReturn(2);
$amqpEnvelope->method('getPriority')->willReturn(5);
$amqpEnvelope->method('getAppId')->willReturn('appid');

$previousStamp = new AmqpStamp('otherroutingkey', AMQP_MANDATORY, ['priority' => 8]);

$stamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp);

$this->assertSame('otherroutingkey', $stamp->getRoutingKey());
$this->assertSame($amqpEnvelope->getDeliveryMode(), $stamp->getAttributes()['delivery_mode']);
$this->assertSame(8, $stamp->getAttributes()['priority']);
$this->assertSame($amqpEnvelope->getAppId(), $stamp->getAttributes()['app_id']);
$this->assertSame(AMQP_MANDATORY, $stamp->getFlags());
}
}
14 changes: 8 additions & 6 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php
Expand Up @@ -47,20 +47,22 @@ public function send(Envelope $envelope): Envelope
$delayStamp = $envelope->last(DelayStamp::class);
$delay = $delayStamp ? $delayStamp->getDelay() : 0;

/** @var AmqpStamp|null $amqpStamp */
$amqpStamp = $envelope->last(AmqpStamp::class);
if (isset($encodedMessage['headers']['Content-Type'])) {
$contentType = $encodedMessage['headers']['Content-Type'];
unset($encodedMessage['headers']['Content-Type']);

$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];

if (!isset($attributes['content_type'])) {
$attributes['content_type'] = $contentType;

$amqpStamp = new AmqpStamp($amqpStamp ? $amqpStamp->getRoutingKey() : null, $amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM, $attributes);
if (!$amqpStamp || !isset($amqpStamp->getAttributes()['content_type'])) {
$amqpStamp = AmqpStamp::createWithAttributes(['content_type' => $contentType], $amqpStamp);
}
}

$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
}

try {
$this->connection->publish(
$encodedMessage['body'],
Expand Down
29 changes: 29 additions & 0 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php
Expand Up @@ -46,4 +46,33 @@ public function getAttributes(): array
{
return $this->attributes;
}

public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
{
$attr = $previousStamp->attributes ?? [];

$attr['headers'] = $attr['headers'] ?? $amqpEnvelope->getHeaders();
$attr['content_type'] = $attr['content_type'] ?? $amqpEnvelope->getContentType();
$attr['content_encoding'] = $attr['content_encoding'] ?? $amqpEnvelope->getContentEncoding();
$attr['delivery_mode'] = $attr['delivery_mode'] ?? $amqpEnvelope->getDeliveryMode();
$attr['priority'] = $attr['priority'] ?? $amqpEnvelope->getPriority();
$attr['timestamp'] = $attr['timestamp'] ?? $amqpEnvelope->getTimestamp();
$attr['app_id'] = $attr['app_id'] ?? $amqpEnvelope->getAppId();
$attr['message_id'] = $attr['message_id'] ?? $amqpEnvelope->getMessageId();
$attr['user_id'] = $attr['user_id'] ?? $amqpEnvelope->getUserId();
$attr['expiration'] = $attr['expiration'] ?? $amqpEnvelope->getExpiration();
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();

return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
}

public static function createWithAttributes(array $attributes, self $previousStamp = null): self
{
return new self(
$previousStamp->routingKey ?? null,
$previousStamp->flags ?? AMQP_NOPARAM,
array_merge($previousStamp->attributes ?? [], $attributes)
);
}
}
Expand Up @@ -225,7 +225,7 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
{
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);

$exchange->publish(
$body,
Expand Down

0 comments on commit e057a9c

Please sign in to comment.