From 93c10013fa3370b38f73e2a0febd22d1b14aa385 Mon Sep 17 00:00:00 2001 From: Eric Masoero Date: Mon, 25 Feb 2019 10:32:52 +0100 Subject: [PATCH 1/5] [Messenger] Added new TransportException which is thrown if transport could not send a message --- UPGRADE-4.3.md | 5 +++++ src/Symfony/Component/Messenger/CHANGELOG.md | 5 +++++ .../Exception/TransportException.php | 21 +++++++++++++++++++ .../Transport/AmqpExt/AmqpSenderTest.php | 18 ++++++++++++++++ .../Transport/AmqpExt/AmqpSender.php | 7 ++++++- 5 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 src/Symfony/Component/Messenger/Exception/TransportException.php diff --git a/UPGRADE-4.3.md b/UPGRADE-4.3.md index 6c348a340363..d81e393c4407 100644 --- a/UPGRADE-4.3.md +++ b/UPGRADE-4.3.md @@ -50,6 +50,11 @@ HttpFoundation * The `FileinfoMimeTypeGuesser` class has been deprecated, use `Symfony\Component\Mime\FileinfoMimeTypeGuesser` instead. +Messenger +--------- + + * `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead. + Routing ------- diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index ff901be210f1..c7d0aad8ea73 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -11,6 +11,11 @@ CHANGELOG changed from `Serializer` to `PhpSerializer` inside `AmqpReceiver`, `AmqpSender`, `AmqpTransport` and `AmqpTransportFactory`. + * Added `TransportException` to mark an exception transport-related + + * [BC BREAK] If listening to exceptions while using `AmqpSender`, `\AMQPException` is + no longer thrown in favor of `TransportException`. + 4.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Exception/TransportException.php b/src/Symfony/Component/Messenger/Exception/TransportException.php new file mode 100644 index 000000000000..e94daba20993 --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/TransportException.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Eric Masoero + * + * @experimental in 4.2 + */ +class TransportException extends RuntimeException +{ +} diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index caef14c9e2b1..4ca0287d68a0 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -37,4 +37,22 @@ public function testItSendsTheEncodedMessage() $sender = new AmqpSender($connection, $serializer); $sender->send($envelope); } + + /** + * @expectedException Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotSendTheMessage() + { + $envelope = new Envelope(new DummyMessage('Oy')); + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(); + $serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('publish')->with($encoded['body'], $encoded['headers'])->willThrowException(new \AMQPException()); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } } diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 74ab0dfaa92a..53ba10e4faab 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -41,7 +42,11 @@ public function send(Envelope $envelope): Envelope { $encodedMessage = $this->serializer->encode($envelope); - $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + try { + $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); + } catch (\AMQPException $e) { + throw new TransportException('Current transport was not able to send given message, please try again'); + } return $envelope; } From 06c84040c4bbf3228351333b0a066fcf924871ee Mon Sep 17 00:00:00 2001 From: "nikos.sotiropoulos" Date: Wed, 20 Feb 2019 01:41:21 +0200 Subject: [PATCH 2/5] forgot one backslash, my bad --- .../Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php index 4ca0287d68a0..7fcb41c15970 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php @@ -39,7 +39,7 @@ public function testItSendsTheEncodedMessage() } /** - * @expectedException Symfony\Component\Messenger\Exception\TransportException + * @expectedException \Symfony\Component\Messenger\Exception\TransportException */ public function testItThrowsATransportExceptionIfItCannotSendTheMessage() { From b2b0640d807264973883121b55ff1cfbf3f63a29 Mon Sep 17 00:00:00 2001 From: Eric Masoero Date: Mon, 25 Feb 2019 11:24:06 +0100 Subject: [PATCH 3/5] Chain new exception with previous one --- .../Component/Messenger/Transport/AmqpExt/AmqpSender.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 53ba10e4faab..72731c6312f3 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -45,7 +45,7 @@ public function send(Envelope $envelope): Envelope try { $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); } catch (\AMQPException $e) { - throw new TransportException('Current transport was not able to send given message, please try again'); + throw new TransportException('Current transport was not able to send given message, please try again', 0, $e); } return $envelope; From 62a08eeea07e4f926e6f9839fda844d3bc8c3bbe Mon Sep 17 00:00:00 2001 From: Eric Masoero Date: Wed, 27 Feb 2019 11:06:42 +0100 Subject: [PATCH 4/5] Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportException --- .../Transport/AmqpExt/AmqpReceiverTest.php | 77 +++++++++++++++++++ .../Transport/AmqpExt/AmqpReceiver.php | 15 +++- .../Transport/AmqpExt/AmqpSender.php | 2 +- 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 23f1c8b2defd..8e224e0653df 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -101,6 +101,83 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn throw new WillNeverWorkException('Well...'); }); } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->method('ack')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function (?Envelope $envelope) use ($receiver) { + $receiver->stop(); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotRejectMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->method('reject')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function () { + throw new WillNeverWorkException('Well...'); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Exception\TransportException + */ + public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage() + { + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn([ + 'type' => DummyMessage::class, + ]); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + + $connection->method('nack')->with($envelope)->willThrowException(new \AMQPException()); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->receive(function () { + throw new InterruptException('Well...'); + }); + } } class InterruptException extends \Exception diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index fdd81cac543a..cb7a4db013fa 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; @@ -61,11 +62,21 @@ public function receive(callable $handler): void $this->connection->ack($AMQPEnvelope); } catch (RejectMessageExceptionInterface $e) { - $this->connection->reject($AMQPEnvelope); + try { + $this->connection->reject($AMQPEnvelope); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } throw $e; + } catch (\AMQPException $e) { + throw new TransportException($e->getMessage(), 0, $e); } catch (\Throwable $e) { - $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + try { + $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } throw $e; } finally { diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php index 72731c6312f3..e9760ac2eb6e 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php @@ -45,7 +45,7 @@ public function send(Envelope $envelope): Envelope try { $this->connection->publish($encodedMessage['body'], $encodedMessage['headers']); } catch (\AMQPException $e) { - throw new TransportException('Current transport was not able to send given message, please try again', 0, $e); + throw new TransportException($e->getMessage(), 0, $e); } return $envelope; From 7d6a3fa487940afcaadf7f531d4a3e51fa596c41 Mon Sep 17 00:00:00 2001 From: Eric Masoero Date: Wed, 27 Feb 2019 11:20:43 +0100 Subject: [PATCH 5/5] Updated changelog to document changes in AmqpReceiver --- src/Symfony/Component/Messenger/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index c7d0aad8ea73..b7f604f41c00 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -13,7 +13,7 @@ CHANGELOG * Added `TransportException` to mark an exception transport-related - * [BC BREAK] If listening to exceptions while using `AmqpSender`, `\AMQPException` is + * [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is no longer thrown in favor of `TransportException`. 4.2.0