Skip to content

Commit

Permalink
Using AMQP auto-setup in all cases, not just in debug
Browse files Browse the repository at this point in the history
  • Loading branch information
weaverryan committed Mar 17, 2019
1 parent b7e798e commit 503c209
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 27 deletions.
Expand Up @@ -61,7 +61,6 @@
<service id="messenger.transport.amqp.factory" class="Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransportFactory">
<tag name="messenger.transport_factory" />
<argument type="service" id="messenger.transport.serializer" />
<argument>%kernel.debug%</argument>
</service>
</services>
</container>
5 changes: 5 additions & 0 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Expand Up @@ -13,6 +13,11 @@ CHANGELOG
* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
no longer thrown in favor of `TransportException`.
* Deprecated `LoggingMiddleware`, pass a logger to `SendMessageMiddleware` instead.
* [BC BREAK] `Connection::__construct()` and `Connection::fromDsn()`
both no longer have `$isDebug` arguments.
* [BC BREAK] The Amqp Transport now automatically sets up the exchanges
and queues by default. Previously, this was done when in "debug" mode
only. Pass the `auto-setup` connection option to control this.

4.2.0
-----
Expand Down
Expand Up @@ -22,8 +22,7 @@ class AmqpTransportFactoryTest extends TestCase
public function testSupportsOnlyAmqpTransports()
{
$factory = new AmqpTransportFactory(
$this->getMockBuilder(SerializerInterface::class)->getMock(),
true
$this->getMockBuilder(SerializerInterface::class)->getMock()
);

$this->assertTrue($factory->supports('amqp://localhost', []));
Expand All @@ -34,11 +33,10 @@ public function testSupportsOnlyAmqpTransports()
public function testItCreatesTheTransport()
{
$factory = new AmqpTransportFactory(
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock(),
true
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
);

$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar'], true), $serializer);
$expectedTransport = new AmqpTransport(Connection::fromDsn('amqp://localhost', ['foo' => 'bar']), $serializer);

$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', ['foo' => 'bar']));
}
Expand Down
Expand Up @@ -128,7 +128,7 @@ public function testSetsParametersOnTheQueueAndExchange()
'alternate-exchange' => 'alternate',
],
],
], true, $factory);
], $factory);
$connection->publish('body');
}

Expand Down Expand Up @@ -171,9 +171,11 @@ public function testItUsesANormalConnectionByDefault()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);

// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('connect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], false, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection->publish('body');
}

Expand All @@ -186,13 +188,15 @@ public function testItAllowsToUseAPersistentConnection()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
);

// makes sure the channel looks connected, so it's not re-created
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('pconnect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], false, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
$connection->publish('body');
}

public function testItSetupsTheConnectionWhenDebug()
public function testItSetupsTheConnectionByDefault()
{
$factory = new TestAmqpFactory(
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(),
Expand All @@ -206,7 +210,7 @@ public function testItSetupsTheConnectionWhenDebug()
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', 'my_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', [], $factory);
$connection->publish('body');
}

Expand All @@ -224,13 +228,13 @@ public function testItCanDisableTheSetup()
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => 'false'], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key', ['auto-setup' => false], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', [], $factory);
$connection->publish('body');
}

Expand All @@ -248,7 +252,7 @@ public function testPublishWithQueueOptions()
$amqpExchange->expects($this->once())->method('publish')
->with('body', null, 1, ['delivery_mode' => 2, 'headers' => ['token' => 'uuid', 'type' => '*']]);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], true, $factory);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[attributes][delivery_mode]=2&queue[attributes][headers][token]=uuid&queue[flags]=1', [], $factory);
$connection->publish('body', $headers);
}
}
Expand Down
Expand Up @@ -24,17 +24,15 @@
class AmqpTransportFactory implements TransportFactoryInterface
{
private $serializer;
private $debug;

public function __construct(SerializerInterface $serializer = null, bool $debug = false)
public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer ?? new PhpSerializer();
$this->debug = $debug;
}

public function createTransport(string $dsn, array $options): TransportInterface
{
return new AmqpTransport(Connection::fromDsn($dsn, $options, $this->debug), $this->serializer);
return new AmqpTransport(Connection::fromDsn($dsn, $options), $this->serializer);
}

public function supports(string $dsn, array $options): bool
Expand Down
22 changes: 14 additions & 8 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Expand Up @@ -36,7 +36,6 @@ class Connection
private $connectionCredentials;
private $exchangeConfiguration;
private $queueConfiguration;
private $debug;
private $amqpFactory;

/**
Expand All @@ -54,16 +53,15 @@ class Connection
*/
private $amqpQueue;

public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null)
public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, AmqpFactory $amqpFactory = null)
{
$this->connectionCredentials = $connectionCredentials;
$this->debug = $debug;
$this->exchangeConfiguration = $exchangeConfiguration;
$this->queueConfiguration = $queueConfiguration;
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
}

public static function fromDsn(string $dsn, array $options = [], bool $debug = false, AmqpFactory $amqpFactory = null): self
public static function fromDsn(string $dsn, array $options = [], AmqpFactory $amqpFactory = null): self
{
if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException(sprintf('The given AMQP DSN "%s" is invalid.', $dsn));
Expand Down Expand Up @@ -104,7 +102,7 @@ public static function fromDsn(string $dsn, array $options = [], bool $debug = f
$queueOptions['arguments'] = self::normalizeQueueArguments($queueOptions['arguments']);
}

return new self($amqpOptions, $exchangeOptions, $queueOptions, $debug, $amqpFactory);
return new self($amqpOptions, $exchangeOptions, $queueOptions, $amqpFactory);
}

private static function normalizeQueueArguments(array $arguments): array
Expand All @@ -129,7 +127,7 @@ private static function normalizeQueueArguments(array $arguments): array
*/
public function publish(string $body, array $headers = []): void
{
if ($this->debug && $this->shouldSetup()) {
if ($this->shouldSetup()) {
$this->setup();
}

Expand All @@ -146,7 +144,7 @@ public function publish(string $body, array $headers = []): void
*/
public function get(): ?\AMQPEnvelope
{
if ($this->debug && $this->shouldSetup()) {
if ($this->shouldSetup()) {
$this->setup();
}

Expand Down Expand Up @@ -256,6 +254,14 @@ private function clear(): void

private function shouldSetup(): bool
{
return !\array_key_exists('auto-setup', $this->connectionCredentials) || !\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true);
if (!\array_key_exists('auto-setup', $this->connectionCredentials)) {
return true;
}

if (\in_array($this->connectionCredentials['auto-setup'], [false, 'false'], true)) {
return false;
}

return true;
}
}

0 comments on commit 503c209

Please sign in to comment.