Skip to content

Commit

Permalink
bug #32052 [Messenger] fix AMQP delay queue to be per exchange (Tobion)
Browse files Browse the repository at this point in the history
This PR was squashed before being merged into the 4.3 branch (closes #32052).

Discussion
----------

[Messenger] fix AMQP delay queue to be per exchange

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | #32050
| License       | MIT
| Doc PR        |

this makes the delay/retry work when having several exchanges or renaming your exchange.

also the delay setup did not declare the target exchange. so if you only do delayed messages for a connection, auto-setup forgot to actually create the target exchange.

Commits
-------

5bc3364 [Messenger] fix AMQP delay queue to be per exchange
  • Loading branch information
fabpot committed Jun 17, 2019
2 parents 99c44a3 + 5bc3364 commit 12b852f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 75 deletions.
Expand Up @@ -23,6 +23,8 @@
*/
class ConnectionTest extends TestCase
{
private const DEFAULT_EXCHANGE_NAME = 'messages';

/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
Expand All @@ -40,9 +42,9 @@ public function testItCanBeConstructedWithDefaults()
'port' => 5672,
'vhost' => '/',
], [
'name' => 'messages',
'name' => self::DEFAULT_EXCHANGE_NAME,
], [
'messages' => [],
self::DEFAULT_EXCHANGE_NAME => [],
]),
Connection::fromDsn('amqp://')
);
Expand Down Expand Up @@ -196,7 +198,7 @@ public function testItUsesANormalConnectionByDefault()
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('connect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

Expand All @@ -213,7 +215,7 @@ public function testItAllowsToUseAPersistentConnection()
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('pconnect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory);
$connection->publish('body');
}

Expand All @@ -226,13 +228,12 @@ public function testItSetupsTheConnectionWithDefaults()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

Expand All @@ -250,21 +251,20 @@ public function testItSetupsTheConnection()
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key0'],
['exchange_name', 'binding_key1']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
);
$amqpQueue1->expects($this->once())->method('declareQueue');
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key2'],
['exchange_name', 'binding_key3']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
);

$dsn = 'amqp://localhost/%2f/messages?'.
$dsn = 'amqp://localhost?'.
'exchange[default_publish_routing_key]=routing_key&'.
'queues[queue0][binding_keys][0]=binding_key0&'.
'queues[queue0][binding_keys][1]=binding_key1&'.
Expand All @@ -284,18 +284,17 @@ public function testItCanDisableTheSetup()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory);
$connection->publish('body');
}

Expand All @@ -312,9 +311,9 @@ public function testSetChannelPrefetchWhenSetup()
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);

$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory);
$connection->setup();
}

Expand All @@ -329,29 +328,29 @@ public function testItDelaysTheMessage()
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}

Expand All @@ -366,41 +365,41 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000);
}

/**
* @expectedException \AMQPException
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
*/
public function testObfuscatePasswordInDsn()
{
Expand All @@ -415,7 +414,7 @@ public function testObfuscatePasswordInDsn()
new \AMQPConnectionException('Oups.')
);

$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory);
$connection->channel();
}

Expand All @@ -430,7 +429,7 @@ public function testItCanPublishWithTheDefaultRoutingKey()

$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection->publish('body');
}

Expand All @@ -445,7 +444,7 @@ public function testItCanPublishWithASuppliedRoutingKey()

$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
}

Expand All @@ -460,39 +459,35 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->createMock(\AMQPExchange::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);
$delayQueue->expects($this->once())->method('setArgument')->with(
'x-dead-letter-routing-key',
'routing_key'
);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}

Expand All @@ -512,7 +507,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
}
}
Expand Down

0 comments on commit 12b852f

Please sign in to comment.