Skip to content

Commit

Permalink
do not automatically redeliver messages on failures, throw a transpor…
Browse files Browse the repository at this point in the history
…t exception instead
  • Loading branch information
alekitto committed Jul 7, 2021
1 parent b259f69 commit 0739c23
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 25 deletions.
16 changes: 9 additions & 7 deletions lib/Transport/Dbal/DbalReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver\ResultStatement;
use Doctrine\DBAL\Exception\RetryableException;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Types\Types;
Expand All @@ -15,6 +16,7 @@
use Ramsey\Uuid\UuidFactory;
use Safe\DateTimeImmutable;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
Expand Down Expand Up @@ -48,6 +50,7 @@ class DbalReceiver implements ReceiverInterface, MessageCountAwareInterface, Lis
private StringCodec $codec;
private QueryBuilder $select;
private QueryBuilder $update;
private int $retryingSafetyCounter = 0;

public function __construct(Connection $connection, string $tableName, ?SerializerInterface $serializer = null)
{
Expand Down Expand Up @@ -91,14 +94,13 @@ public function get(): iterable
try {
yield $envelope;

$this->ack($envelope);
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $e) {
if (++$this->retryingSafetyCounter > 3) {
throw new TransportException($e->getMessage(), 0, $e);
}
} catch (Throwable $e) {
$stamp = $envelope->last(TransportMessageIdStamp::class);
assert($stamp instanceof TransportMessageIdStamp);

$this->redeliver(hex2bin($stamp->getId()));

throw $e;
throw new TransportException($e->getMessage(), 0, $e);
}
}

Expand Down
16 changes: 9 additions & 7 deletions lib/Transport/Mongo/MongoReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Kcs\MessengerExtra\Transport\Mongo;

use Doctrine\DBAL\Exception\RetryableException;
use MongoDB\BSON\ObjectId;
use MongoDB\Collection;
use Ramsey\Uuid\Uuid;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
Expand All @@ -28,6 +30,7 @@ class MongoReceiver implements ReceiverInterface, ListableReceiverInterface, Mes
private SerializerInterface $serializer;
private Collection $collection;
private float $removeExpiredMessagesLastExecutedAt;
private int $retryingSafetyCounter = 0;

public function __construct(Collection $collection, ?SerializerInterface $serializer = null)
{
Expand All @@ -50,14 +53,13 @@ public function get(): iterable
try {
yield $envelope;

$this->ack($envelope);
$this->retryingSafetyCounter = 0; // reset counter
} catch (RetryableException $e) {
if (++$this->retryingSafetyCounter > 3) {
throw new TransportException($e->getMessage(), 0, $e);
}
} catch (Throwable $e) {
$stamp = $envelope->last(TransportMessageIdStamp::class);
assert($stamp instanceof TransportMessageIdStamp);

$this->redeliver($stamp->getId());

throw $e;
throw new TransportException($e->getMessage(), 0, $e);
}
}

Expand Down
4 changes: 2 additions & 2 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
enforceTimeLimit="true"
failOnRisky="true"
timeoutForSmallTests="5"
timeoutForMediumTests="5"
timeoutForLargeTests="10"
timeoutForMediumTests="10"
timeoutForLargeTests="30"
>
<coverage>
<include>
Expand Down
19 changes: 19 additions & 0 deletions tests/Fixtures/DummyMessageHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php declare(strict_types=1);

namespace Kcs\MessengerExtra\Tests\Fixtures;

use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class DummyMessageHandler implements MessageHandlerInterface
{
public static int $count = 0;

public function __invoke(DummyMessage $message)
{
if (self::$count++ === 1) {
return;
}

throw new \Exception();
}
}
102 changes: 101 additions & 1 deletion tests/Transport/Dbal/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,29 @@

namespace Kcs\MessengerExtra\Tests\Transport\Dbal;

use Composer\InstalledVersions;
use Doctrine\DBAL\DriverManager;
use Kcs\MessengerExtra\Tests\Fixtures\DummyMessage;
use Kcs\MessengerExtra\Tests\Fixtures\DummyMessageHandler;
use Kcs\MessengerExtra\Tests\Fixtures\UniqueDummyMessage;
use Kcs\MessengerExtra\Transport\Dbal\DbalTransport;
use Kcs\MessengerExtra\Transport\Dbal\DbalTransportFactory;
use PHPUnit\Framework\TestCase;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
use Symfony\Component\Messenger\Transport\Serialization\Normalizer\FlattenExceptionNormalizer;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
Expand All @@ -27,6 +40,9 @@ class IntegrationTest extends TestCase
private DbalTransport $transport;
private string $dsn;

private DbalTransport $failureTransport;
private string $failureDsn;

protected function setUp(): void
{
@\unlink(__DIR__.'/messenger.db');
Expand Down Expand Up @@ -55,16 +71,19 @@ protected function setUp(): void
case 'mariadb':
$connection = DriverManager::getConnection(['url' => 'mysql://root@127.0.0.1/messenger']);
$this->transport = $factory->createTransport($this->dsn = 'mysql://root@127.0.0.1/messenger', [], $serializer);
$this->failureTransport = $factory->createTransport($this->failureDsn = 'mysql://root@127.0.0.1/messenger/failed', [], $serializer);
break;

case 'postgresql':
$connection = DriverManager::getConnection(['url' => 'pgsql://postgres@localhost/messenger']);
$this->transport = $factory->createTransport($this->dsn = 'pgsql://postgres@localhost/messenger', [], $serializer);
$this->failureTransport = $factory->createTransport($this->failureDsn = 'pgsql://postgres@localhost/messenger/failed', [], $serializer);
break;

case 'sqlite':
default:
$this->transport = $factory->createTransport($this->dsn = 'sqlite:///'.__DIR__.'/messenger.db', [], $serializer);
$this->transport = $factory->createTransport($this->dsn = 'sqlite:///'.__DIR__.'/messenger.db/messenger', [], $serializer);
$this->failureTransport = $factory->createTransport($this->failureDsn = 'sqlite:///'.__DIR__.'/messenger.db/failed', [], $serializer);
break;
}

Expand All @@ -73,13 +92,94 @@ protected function setUp(): void
}

$this->transport->createTable();
$this->failureTransport->createTable();
}

protected function tearDown(): void
{
@\unlink(__DIR__.'/messenger.db');
}

/**
* @medium
*/
public function testCorrectlyHandlesRejections(): void
{
DummyMessageHandler::$count = 0;
$container = new ServiceLocator([
'dummy_transport' => fn () => $this->transport,
]);

$messageBus = new MessageBus([
new AddBusNameStampMiddleware('dummy'),
new DispatchAfterCurrentBusMiddleware(),
new FailedMessageProcessingMiddleware(),
new SendMessageMiddleware(new SendersLocator([
DummyMessage::class => ['dummy_transport'],
], $container)),
new HandleMessageMiddleware(new HandlersLocator([
DummyMessage::class => [new DummyMessageHandler()],
]))
]);

$messageBus->dispatch(new DummyMessage('First'));
$messageBus->dispatch(new DummyMessage('Second'));

self::assertCount(2, $this->transport->all());
self::assertEquals(2, $this->transport->getMessageCount());

$receivedMessages = 0;
$workerClass = new \ReflectionClass(Worker::class);
$thirdArgument = $workerClass->getConstructor()->getParameters()[2];

$type = $thirdArgument->getType();
if ($type instanceof \ReflectionNamedType && EventDispatcherInterface::class === $type->getName()) {
$worker = new Worker(['dummy_transport' => $this->transport], $messageBus, $eventDispatcher = new EventDispatcher());
} else {
$worker = new Worker(['dummy_transport' => $this->transport], $messageBus, [], $eventDispatcher = new EventDispatcher());
}

$retryStrategy = new class implements RetryStrategyInterface {
private int $retry = 0;

public function isRetryable(Envelope $message): bool
{
return $this->retry++ < 2;
}

public function getWaitingTime(Envelope $message): int
{
return 1;
}
};

if (version_compare(InstalledVersions::getVersion('symfony/messenger'), '5.3.0', '<')) {
$eventDispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener($this->failureTransport));
} else {
$eventDispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener(new ServiceLocator([
'dummy_transport' => fn() => $this->failureTransport,
])));
}

$retryStrategyLocator = new ServiceLocator([
'dummy_transport' => fn () => $retryStrategy,
]);

$eventDispatcher->addSubscriber(new SendFailedMessageForRetryListener($container, $retryStrategyLocator));
$eventDispatcher->addListener(WorkerMessageReceivedEvent::class,
static function () use (&$receivedMessages, $worker) {
if (4 === ++$receivedMessages) {
$worker->stop();
}
});

$worker->run();

self::assertCount(0, $this->transport->all());
self::assertCount(1, $this->failureTransport->all());
self::assertEquals(4, $receivedMessages);
}

/**
* @medium
*/
Expand Down

0 comments on commit 0739c23

Please sign in to comment.