Skip to content

Commit

Permalink
bug #31425 [Messenger] On failure retry, make message appear received…
Browse files Browse the repository at this point in the history
… from original sender (weaverryan)

This PR was squashed before being merged into the 4.3 branch (closes #31425).

Discussion
----------

[Messenger] On failure retry, make message appear received from original sender

| Q             | A
| ------------- | ---
| Branch?       | master (4.3)
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #31413
| License       | MIT
| Doc PR        | symfony/symfony-docs#11236

Fixes a bug when using the transport-based handler config from #30958. This also adds a pretty robust integration test that dispatches a complex message with transport-based handler config, failures, failure transport, etc - and verifies the correct behavior.

Cheers!

Commits
-------

80b5df2 [Messenger] On failure retry, make message appear received from original sender
  • Loading branch information
fabpot committed May 11, 2019
2 parents fe0f324 + 80b5df2 commit 7800396
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 70 deletions.
Expand Up @@ -1659,6 +1659,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
'before' => [
['id' => 'add_bus_name_stamp_middleware'],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
],
'after' => [
['id' => 'send_message'],
Expand Down
Expand Up @@ -48,6 +48,8 @@
<argument type="service" id="validator" />
</service>

<service id="messenger.middleware.failed_message_processing_middleware" class="Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware" />

<service id="messenger.middleware.traceable" class="Symfony\Component\Messenger\Middleware\TraceableMiddleware" abstract="true">
<argument type="service" id="debug.stopwatch" />
</service>
Expand Down
Expand Up @@ -753,6 +753,7 @@ public function testMessengerWithMultipleBuses()
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'send_message'],
['id' => 'handle_message'],
], $container->getParameter('messenger.bus.commands.middleware'));
Expand All @@ -761,6 +762,7 @@ public function testMessengerWithMultipleBuses()
$this->assertEquals([
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
['id' => 'dispatch_after_current_bus'],
['id' => 'failed_message_processing_middleware'],
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
['id' => 'send_message'],
['id' => 'handle_message'],
Expand Down
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
Expand Down Expand Up @@ -59,8 +60,10 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
{
$io->title('Failed Message Details');

/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
/** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);

$rows = [
['Class', \get_class($envelope->getMessage())],
Expand All @@ -70,25 +73,34 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
$rows[] = ['Message Id', $id];
}

$flattenException = null === $lastRedeliveryStamp ? null : $lastRedeliveryStamp->getFlattenException();
if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
['Failed at', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')],
['Error', null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage()],
['Error Class', null === $flattenException ? '(unknown)' : $flattenException->getClass()],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}

$io->table([], $rows);

/** @var RedeliveryStamp[] $redeliveryStamps */
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
$io->writeln(' Message history:');
foreach ($redeliveryStamps as $redeliveryStamp) {
$io->writeln(sprintf(' * Message failed and redelivered to the <info>%s</info> transport at <info>%s</info>', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
}
$io->newLine();

if ($io->isVeryVerbose()) {
$io->title('Message:');
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
$io->writeln(null === $flattenException ? '(no data)' : $flattenException->getTraceAsString());
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}
Expand Down
Expand Up @@ -154,7 +154,7 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)
}

// avoid success message if nothing was processed
if (1 < $count) {
if (1 <= $count) {
$io->success('All failed messages have been handled or removed!');
}
}
Expand Down
Expand Up @@ -18,7 +18,7 @@
use Symfony\Component\Console\Output\ConsoleOutputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;

/**
Expand Down Expand Up @@ -83,14 +83,14 @@ private function listMessages(SymfonyStyle $io, int $max)

$rows = [];
foreach ($envelopes as $envelope) {
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
/** @var RedeliveryStamp|null $lastRedeliveryStamp */
$lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);

$rows[] = [
$this->getMessageId($envelope),
\get_class($envelope->getMessage()),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s'),
null === $sentToFailureTransportStamp ? '' : $sentToFailureTransportStamp->getExceptionMessage(),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s'),
null === $lastRedeliveryStamp ? '' : $lastRedeliveryStamp->getExceptionMessage(),
];
}

Expand Down
Expand Up @@ -16,9 +16,9 @@
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;

Expand Down Expand Up @@ -51,11 +51,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
$envelope = $event->getEnvelope();

// avoid re-sending to the failed sender
foreach ($envelope->all(SentStamp::class) as $sentStamp) {
/** @var SentStamp $sentStamp */
if ($sentStamp->getSenderAlias() === $this->failureSenderAlias) {
return;
}
if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
return;
}

// remove the received stamp so it's redelivered
Expand All @@ -67,8 +64,9 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
$flattenedException = \class_exists(FlattenException::class) ? FlattenException::createFromThrowable($throwable) : null;
$envelope = $envelope->withoutAll(ReceivedStamp::class)
->withoutAll(TransportMessageIdStamp::class)
->with(new SentToFailureTransportStamp($throwable->getMessage(), $event->getReceiverName(), $flattenedException))
->with(new RedeliveryStamp(0, $this->failureSenderAlias));
->with(new SentToFailureTransportStamp($event->getReceiverName()))
->with(new DelayStamp(0))
->with(new RedeliveryStamp(0, $this->failureSenderAlias, $throwable->getMessage(), $flattenedException));

if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
Expand Down
@@ -0,0 +1,38 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;

/**
* @author Ryan Weaver <ryan@symfonycasts.com>
*
* @experimental in 4.3
*/
class FailedMessageProcessingMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
// look for "received" messages decorated with the SentToFailureTransportStamp
/** @var SentToFailureTransportStamp|null $sentToFailureStamp */
$sentToFailureStamp = $envelope->last(SentToFailureTransportStamp::class);
if (null !== $sentToFailureStamp && null !== $envelope->last(ReceivedStamp::class)) {
// mark the message as "received" from the original transport
// this guarantees the same behavior as when originally received
$envelope = $envelope->with(new ReceivedStamp($sentToFailureStamp->getOriginalReceiverName()));
}

return $stack->next()->handle($envelope, $stack);
}
}
27 changes: 25 additions & 2 deletions src/Symfony/Component/Messenger/Stamp/RedeliveryStamp.php
Expand Up @@ -11,6 +11,8 @@

namespace Symfony\Component\Messenger\Stamp;

use Symfony\Component\Debug\Exception\FlattenException;

/**
* Stamp applied when a messages needs to be redelivered.
*
Expand All @@ -20,14 +22,20 @@ class RedeliveryStamp implements StampInterface
{
private $retryCount;
private $senderClassOrAlias;
private $redeliveredAt;
private $exceptionMessage;
private $flattenException;

/**
* @param string $senderClassOrAlias Alias from SendersLocator or just the class name
*/
public function __construct(int $retryCount, string $senderClassOrAlias)
public function __construct(int $retryCount, string $senderClassOrAlias, string $exceptionMessage = null, FlattenException $flattenException = null)
{
$this->retryCount = $retryCount;
$this->senderClassOrAlias = $senderClassOrAlias;
$this->exceptionMessage = $exceptionMessage;
$this->flattenException = $flattenException;
$this->redeliveredAt = new \DateTimeImmutable();
}

public function getRetryCount(): int
Expand All @@ -36,12 +44,27 @@ public function getRetryCount(): int
}

/**
* Needed for this class to serialize through Symfony's serializer.
* The target sender this should be redelivered to.
*
* @internal
*/
public function getSenderClassOrAlias(): string
{
return $this->senderClassOrAlias;
}

public function getExceptionMessage(): ?string
{
return $this->exceptionMessage;
}

public function getFlattenException(): ?FlattenException
{
return $this->flattenException;
}

public function getRedeliveredAt(): \DateTimeInterface
{
return $this->redeliveredAt;
}
}
Expand Up @@ -11,8 +11,6 @@

namespace Symfony\Component\Messenger\Stamp;

use Symfony\Component\Debug\Exception\FlattenException;

/**
* Stamp applied when a message is sent to the failure transport.
*
Expand All @@ -22,36 +20,15 @@
*/
class SentToFailureTransportStamp implements StampInterface
{
private $exceptionMessage;
private $originalReceiverName;
private $flattenException;
private $sentAt;

public function __construct(string $exceptionMessage, string $originalReceiverName, FlattenException $flattenException = null)
public function __construct(string $originalReceiverName)
{
$this->exceptionMessage = $exceptionMessage;
$this->originalReceiverName = $originalReceiverName;
$this->flattenException = $flattenException;
$this->sentAt = new \DateTimeImmutable();
}

public function getExceptionMessage(): string
{
return $this->exceptionMessage;
}

public function getOriginalReceiverName(): string
{
return $this->originalReceiverName;
}

public function getFlattenException(): ?FlattenException
{
return $this->flattenException;
}

public function getSentAt(): \DateTimeInterface
{
return $this->sentAt;
}
}
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Console\Tester\CommandTester;
use Symfony\Component\Messenger\Command\FailedMessagesShowCommand;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
Expand All @@ -26,10 +27,12 @@ class FailedMessagesShowCommandTest extends TestCase
{
public function testBasicRun()
{
$sentToFailureStamp = new SentToFailureTransportStamp('Things are bad!', 'async');
$sentToFailureStamp = new SentToFailureTransportStamp('async');
$redeliveryStamp = new RedeliveryStamp(0, 'failure_receiver', 'Things are bad!');
$envelope = new Envelope(new \stdClass(), [
new TransportMessageIdStamp(15),
$sentToFailureStamp,
$redeliveryStamp,
]);
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
Expand All @@ -48,10 +51,11 @@ public function testBasicRun()
Message Id 15
Failed at %s
Error Things are bad!
Error Class (unknown)
Error Class (unknown)
Transport async
EOF
,
$sentToFailureStamp->getSentAt()->format('Y-m-d H:i:s')),
$redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')),
$tester->getDisplay(true));
}
}
Expand Up @@ -19,7 +19,6 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;

Expand All @@ -35,13 +34,13 @@ public function testItDispatchesToTheFailureTransport()
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('no!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName());
$this->assertSame('no!', $sentToFailureTransportStamp->getFlattenException()->getMessage());

/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertSame('failure_sender', $redeliveryStamp->getSenderClassOrAlias());
$this->assertSame('no!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage());

$this->assertNull($envelope->last(ReceivedStamp::class));
$this->assertNull($envelope->last(TransportMessageIdStamp::class));
Expand All @@ -65,11 +64,11 @@ public function testItGetsNestedHandlerFailedException()
$bus = $this->createMock(MessageBusInterface::class);
$bus->expects($this->once())->method('dispatch')->with($this->callback(function ($envelope) {
/** @var Envelope $envelope */
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$this->assertNotNull($sentToFailureTransportStamp);
$this->assertSame('I am inside!', $sentToFailureTransportStamp->getExceptionMessage());
$this->assertSame('Exception', $sentToFailureTransportStamp->getFlattenException()->getClass());
/** @var RedeliveryStamp $redeliveryStamp */
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
$this->assertNotNull($redeliveryStamp);
$this->assertSame('I am inside!', $redeliveryStamp->getExceptionMessage());
$this->assertSame('Exception', $redeliveryStamp->getFlattenException()->getClass());

return true;
}))->willReturn(new Envelope(new \stdClass()));
Expand Down Expand Up @@ -112,7 +111,7 @@ public function testDoNotRedeliverToFailed()
);

$envelope = new Envelope(new \stdClass(), [
new SentStamp('MySender', 'failure_sender'),
new SentToFailureTransportStamp('my_receiver'),
]);
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception(''), false);

Expand Down

0 comments on commit 7800396

Please sign in to comment.