Skip to content

Commit

Permalink
bug #32979 [Messenger] return empty envelopes when RetryableException…
Browse files Browse the repository at this point in the history
… occurs (surikman)

This PR was squashed before being merged into the 4.3 branch (closes #32979).

Discussion
----------

[Messenger] return empty envelopes when RetryableException occurs

| Q             | A
| ------------- | ---
| Branch?       |  3.4 or 4.3 for bug fixes <!-- see below -->
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| License       | MIT
| ~~Doc PR~~ | ~~symfony/symfony-docs#12109~~

<!--
Replace this notice by a short README for your feature/bugfix. This will help people
understand your PR and can be used as a start for the documentation.

Additionally (see https://symfony.com/roadmap):
 - Bug fixes must be submitted against the lowest maintained branch where they apply
   (lowest branches are regularly merged to upper ones so they get the fixes too).
 - Features and deprecations must be submitted against branch 4.4.
 - Legacy code removals go to the master branch.
-->

Problem occurs when you are using more than 1 worker with Doctrine Transport.

`Symfony\Component\Messenger\Transport\Doctrine\Connection::get` does a query `SELECT ... FOR UPDATE` and this locking query could lock table and workers stops. But using locks can result in dead locks or lock timeouts. Doctrine renders these SQL errors as RetryableExceptions. These exceptions are often normal if you are in a high concurrency environment. They can happen very often and your application should handle them properly.

Commits
-------

9add32a [Messenger] return empty envelopes when RetryableException occurs
  • Loading branch information
Tobion committed Sep 27, 2019
2 parents dff71ce + 9add32a commit abf11d8
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
Expand Up @@ -11,9 +11,12 @@

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Driver\PDOException;
use Doctrine\DBAL\Exception\DeadlockException;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
Expand Down Expand Up @@ -68,6 +71,26 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
$receiver->get();
}

public function testOccursRetryableExceptionFromConnection()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
$receiver = new DoctrineReceiver($connection, $serializer);
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
try {
$receiver->get();
} catch (TransportException $exception) {
// skip, and retry
}
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
$this->expectException(TransportException::class);
$receiver->get();
}

public function testAll()
{
$serializer = $this->createSerializer();
Expand Down
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\RetryableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
Expand All @@ -30,6 +31,8 @@
*/
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private const MAX_RETRIES = 3;
private $retryingSafetyCounter = 0;
private $connection;
private $serializer;

Expand All @@ -46,6 +49,17 @@ public function get(): iterable
{
try {
$doctrineEnvelope = $this->connection->get();
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
// as it will likely be resolved on the next call to get()
// Problem with concurrent consumers and database deadlocks
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
$this->retryingSafetyCounter = 0; // reset counter
throw new TransportException($exception->getMessage(), 0, $exception);
}

return [];
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/composer.json
Expand Up @@ -20,7 +20,7 @@
"psr/log": "~1.0"
},
"require-dev": {
"doctrine/dbal": "^2.5",
"doctrine/dbal": "^2.6",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/debug": "~4.1",
Expand Down

0 comments on commit abf11d8

Please sign in to comment.