Skip to content

Commit

Permalink
[Messenger] fix AMQP delay queue to be per exchange
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobion authored and fabpot committed Jun 17, 2019
1 parent a9bcdcc commit 5bc3364
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 75 deletions.
Expand Up @@ -23,6 +23,8 @@
*/
class ConnectionTest extends TestCase
{
private const DEFAULT_EXCHANGE_NAME = 'messages';

/**
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
Expand All @@ -40,9 +42,9 @@ public function testItCanBeConstructedWithDefaults()
'port' => 5672,
'vhost' => '/',
], [
'name' => 'messages',
'name' => self::DEFAULT_EXCHANGE_NAME,
], [
'messages' => [],
self::DEFAULT_EXCHANGE_NAME => [],
]),
Connection::fromDsn('amqp://')
);
Expand Down Expand Up @@ -196,7 +198,7 @@ public function testItUsesANormalConnectionByDefault()
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('connect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

Expand All @@ -213,7 +215,7 @@ public function testItAllowsToUseAPersistentConnection()
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
$amqpConnection->expects($this->once())->method('pconnect');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory);
$connection->publish('body');
}

Expand All @@ -226,13 +228,12 @@ public function testItSetupsTheConnectionWithDefaults()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
$amqpQueue->expects($this->once())->method('declareQueue');
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body');
}

Expand All @@ -250,21 +251,20 @@ public function testItSetupsTheConnection()
$factory->method('createExchange')->willReturn($amqpExchange);
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->once())->method('declareExchange');
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
$amqpQueue0->expects($this->once())->method('declareQueue');
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key0'],
['exchange_name', 'binding_key1']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
);
$amqpQueue1->expects($this->once())->method('declareQueue');
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
['exchange_name', 'binding_key2'],
['exchange_name', 'binding_key3']
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
);

$dsn = 'amqp://localhost/%2f/messages?'.
$dsn = 'amqp://localhost?'.
'exchange[default_publish_routing_key]=routing_key&'.
'queues[queue0][binding_keys][0]=binding_key0&'.
'queues[queue0][binding_keys][1]=binding_key1&'.
Expand All @@ -284,18 +284,17 @@ public function testItCanDisableTheSetup()
$amqpExchange = $this->createMock(\AMQPExchange::class)
);

$amqpExchange->method('getName')->willReturn('exchange_name');
$amqpExchange->expects($this->never())->method('declareExchange');
$amqpQueue->expects($this->never())->method('declareQueue');
$amqpQueue->expects($this->never())->method('bind');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory);
$connection->publish('body');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory);
$connection->publish('body');
}

Expand All @@ -312,9 +311,9 @@ public function testSetChannelPrefetchWhenSetup()
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);

$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
$connection->setup();
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory);
$connection->setup();
}

Expand All @@ -329,29 +328,29 @@ public function testItDelaysTheMessage()
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}

Expand All @@ -366,41 +365,41 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000);
}

/**
* @expectedException \AMQPException
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
*/
public function testObfuscatePasswordInDsn()
{
Expand All @@ -415,7 +414,7 @@ public function testObfuscatePasswordInDsn()
new \AMQPConnectionException('Oups.')
);

$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory);
$connection->channel();
}

Expand All @@ -430,7 +429,7 @@ public function testItCanPublishWithTheDefaultRoutingKey()

$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory);
$connection->publish('body');
}

Expand All @@ -445,7 +444,7 @@ public function testItCanPublishWithASuppliedRoutingKey()

$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');

$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
}

Expand All @@ -460,39 +459,35 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->willReturn($delayQueue);
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$delayExchange = $this->createMock(\AMQPExchange::class),
$amqpExchange = $this->createMock(\AMQPExchange::class)
$amqpExchange = $this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));

$amqpExchange->expects($this->once())->method('setName')->with('messages');
$amqpExchange->method('getName')->willReturn('messages');
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
$amqpExchange->expects($this->once())->method('declareExchange');

$delayExchange->expects($this->once())->method('setName')->with('delay');
$delayExchange->expects($this->once())->method('declareExchange');
$delayExchange->method('getName')->willReturn('delay');

$connectionOptions = [
'retry' => [
'dead_routing_key' => 'my_dead_routing_key',
],
];

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-dead-letter-exchange' => 'messages',
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => 'routing_key',
]);
$delayQueue->expects($this->once())->method('setArgument')->with(
'x-dead-letter-routing-key',
'routing_key'
);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000');
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}

Expand All @@ -512,7 +507,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
);

$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
}
}
Expand Down

0 comments on commit 5bc3364

Please sign in to comment.