Skip to content

Commit

Permalink
feature #30375 [Messenger] Added transport agnostic exception (nikoss…
Browse files Browse the repository at this point in the history
…vnk, lolmx)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Added transport agnostic exception

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30346
| License       | MIT
| Doc PR        | TODO

As described in #30346, client code shouldn't care about which transport is currently used by the message bus. This pr adds a new generic exception that is thrown by the `AmqpSender` if the message couldn't be delivered.

Commits
-------

7d6a3fa Updated changelog to document changes in AmqpReceiver
62a08ee Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportException
b2b0640 Chain new exception with previous one
06c8404 forgot one backslash, my bad
93c1001 [Messenger] Added new TransportException which is thrown if transport could not send a message
  • Loading branch information
fabpot committed Mar 4, 2019
2 parents a2aee03 + 7d6a3fa commit b727f59
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 3 deletions.
5 changes: 5 additions & 0 deletions UPGRADE-4.3.md
Expand Up @@ -50,6 +50,11 @@ HttpFoundation
* The `FileinfoMimeTypeGuesser` class has been deprecated,
use `Symfony\Component\Mime\FileinfoMimeTypeGuesser` instead.

Messenger
---------

* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.

Routing
-------

Expand Down
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -11,6 +11,11 @@ CHANGELOG
changed from `Serializer` to `PhpSerializer` inside `AmqpReceiver`,
`AmqpSender`, `AmqpTransport` and `AmqpTransportFactory`.

* Added `TransportException` to mark an exception transport-related

* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
no longer thrown in favor of `TransportException`.

4.2.0
-----

Expand Down
21 changes: 21 additions & 0 deletions src/Symfony/Component/Messenger/Exception/TransportException.php
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Exception;

/**
* @author Eric Masoero <em@studeal.fr>
*
* @experimental in 4.2
*/
class TransportException extends RuntimeException
{
}
Expand Up @@ -101,6 +101,83 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
throw new WillNeverWorkException('Well...');
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);

$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
$receiver->stop();
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new WillNeverWorkException('Well...');
});
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
{
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
$envelope->method('getHeaders')->willReturn([
'type' => DummyMessage::class,
]);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('get')->willReturn($envelope);

$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->receive(function () {
throw new InterruptException('Well...');
});
}
}

class InterruptException extends \Exception
Expand Down
Expand Up @@ -37,4 +37,22 @@ public function testItSendsTheEncodedMessage()
$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}

/**
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
*/
public function testItThrowsATransportExceptionIfItCannotSendTheMessage()
{
$envelope = new Envelope(new DummyMessage('Oy'));
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
$connection->method('publish')->with($encoded['body'], $encoded['headers'])->willThrowException(new \AMQPException());

$sender = new AmqpSender($connection, $serializer);
$sender->send($envelope);
}
}
15 changes: 13 additions & 2 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand Down Expand Up @@ -61,11 +62,21 @@ public function receive(callable $handler): void

$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($AMQPEnvelope);
try {
$this->connection->reject($AMQPEnvelope);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

throw $e;
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
} catch (\Throwable $e) {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
try {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

throw $e;
} finally {
Expand Down
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand Down Expand Up @@ -41,7 +42,11 @@ public function send(Envelope $envelope): Envelope
{
$encodedMessage = $this->serializer->encode($envelope);

$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
try {
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
} catch (\AMQPException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

return $envelope;
}
Expand Down

0 comments on commit b727f59

Please sign in to comment.