Skip to content

Commit

Permalink
[Messenger] return empty envelopes when RetryableException occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
surikman authored and Tobion committed Sep 27, 2019
1 parent ccb3a4c commit 9add32a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Driver\PDOException;
use Doctrine\DBAL\Exception\DeadlockException;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Doctrine\Connection;
Expand Down Expand Up @@ -68,6 +71,26 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
$receiver->get();
}

public function testOccursRetryableExceptionFromConnection()
{
$serializer = $this->createSerializer();
$connection = $this->createMock(Connection::class);
$driverException = new PDOException(new \PDOException('Deadlock', 40001));
$connection->method('get')->willThrowException(new DeadlockException('Deadlock', $driverException));
$receiver = new DoctrineReceiver($connection, $serializer);
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
try {
$receiver->get();
} catch (TransportException $exception) {
// skip, and retry
}
$this->assertSame([], $receiver->get());
$this->assertSame([], $receiver->get());
$this->expectException(TransportException::class);
$receiver->get();
}

public function testAll()
{
$serializer = $this->createSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\Doctrine;

use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Exception\RetryableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
Expand All @@ -30,6 +31,8 @@
*/
class DoctrineReceiver implements ReceiverInterface, MessageCountAwareInterface, ListableReceiverInterface
{
private const MAX_RETRIES = 3;
private $retryingSafetyCounter = 0;
private $connection;
private $serializer;

Expand All @@ -46,6 +49,17 @@ public function get(): iterable
{
try {
$doctrineEnvelope = $this->connection->get();
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $exception) {
// Do nothing when RetryableException occurs less than "MAX_RETRIES"
// as it will likely be resolved on the next call to get()
// Problem with concurrent consumers and database deadlocks
if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
$this->retryingSafetyCounter = 0; // reset counter
throw new TransportException($exception->getMessage(), 0, $exception);
}

return [];
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"psr/log": "~1.0"
},
"require-dev": {
"doctrine/dbal": "^2.5",
"doctrine/dbal": "^2.6",
"psr/cache": "~1.0",
"symfony/console": "~3.4|~4.0",
"symfony/debug": "~4.1",
Expand Down

0 comments on commit 9add32a

Please sign in to comment.