Skip to content

Commit

Permalink
bug #34107 [Messenger] prevent infinite redelivery loops and blocked …
Browse files Browse the repository at this point in the history
…queues (Tobion)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] prevent infinite redelivery loops and blocked queues

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tickets       | Fix #32055
| License       | MIT
| Doc PR        |

This PR solves a very common fitfall of amqp redeliveries. It's for example explained in https://blog.forma-pro.com/rabbitmq-redelivery-pitfalls-440e0347f4e0
Newer RabbitMQ versions provide a solution for this by itself but only for quorum queues and not the classic ones, see rabbitmq/rabbitmq-server#1889

This PR adds a middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.

The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly. The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.

AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry limit and potential delay.

Commits
-------

d211904 [Messenger] prevent infinite redelivery loops and blocked queues
  • Loading branch information
Tobion committed Oct 25, 2019
2 parents b9f6944 + d211904 commit 13853d5
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 7 deletions.
Expand Up @@ -1649,6 +1649,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$defaultMiddleware = [
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
Expand Down
Expand Up @@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.reject_redelivered_message_middleware" class="Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware" />

<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />

<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
Expand Down
Expand Up @@ -739,6 +739,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
Expand All @@ -748,6 +749,7 @@ public function testMessengerWithMultipleBuses()
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'reject_redelivered_message_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
Expand Down
4 changes: 2 additions & 2 deletions src/Symfony/Bundle/FrameworkBundle/composer.json
Expand Up @@ -42,7 +42,7 @@
"symfony/expression-language": "~3.4|~4.0",
"symfony/http-client": "^4.3",
"symfony/mailer": "^4.3",
"symfony/messenger": "^4.3",
"symfony/messenger": "^4.3.6",
"symfony/mime": "^4.3",
"symfony/process": "~3.4|~4.0",
"symfony/security-csrf": "~3.4|~4.0",
Expand Down Expand Up @@ -73,7 +73,7 @@
"symfony/dotenv": "<4.2",
"symfony/dom-crawler": "<4.3",
"symfony/form": "<4.3",
"symfony/messenger": "<4.3",
"symfony/messenger": "<4.3.6",
"symfony/property-info": "<3.4",
"symfony/serializer": "<4.2",
"symfony/stopwatch": "<3.4",
Expand Down
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

/**
* @author Tobias Schultze <http://tobion.de>
*
* @experimental in 4.3
*/
class RejectRedeliveredMessageException extends RuntimeException
{
}
@@ -0,0 +1,50 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;

/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @experimental in 4.3
*
* @author Tobias Schultze <http://tobion.de>
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// ignore the dispatched messages for retry
if (null !== $envelope->last(ReceivedStamp::class)) {
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);

if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}
}

return $stack->next()->handle($envelope, $stack);
}
}
4 changes: 2 additions & 2 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Expand Up @@ -118,8 +118,8 @@ public function testDispatchCausesRetry()
}
});

// old message acknowledged
$this->assertSame(1, $receiver->getAcknowledgeCount());
// old message rejected
$this->assertSame(1, $receiver->getRejectCount());
}

public function testUnrecoverableMessageHandlingExceptionPreventsRetries()
Expand Down
14 changes: 11 additions & 3 deletions src/Symfony/Component/Messenger/Worker.php
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
Expand Down Expand Up @@ -135,6 +136,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
try {
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
} catch (\Throwable $throwable) {
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
if ($rejectFirst) {
// redelivered messages are rejected first so that continuous failures in an event listener or while
// publishing for retry does not cause infinite redelivery loops
$receiver->reject($envelope);
}

if ($throwable instanceof HandlerFailedException) {
$envelope = $throwable->getEnvelope();
}
Expand All @@ -156,15 +164,15 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
->with(new RedeliveryStamp($retryCount, $transportName))
->withoutAll(ReceivedStamp::class);

// re-send the message
// re-send the message for retry
$this->bus->dispatch($retryEnvelope);
// acknowledge the previous message has received
$receiver->ack($envelope);
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}
}

if (!$rejectFirst) {
$receiver->reject($envelope);
}

Expand Down

0 comments on commit 13853d5

Please sign in to comment.