Skip to content

Commit

Permalink
Update RabbitMQ Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
byjg committed May 13, 2023
1 parent 9374d21 commit 63d0e08
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 30 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ If you use an existing Queue you might get the error:
PHP Fatal error: Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: PRECONDITION_FAILED - Existing queue 'test' declared with other arguments in AMQPChannel.php:224
```

You can change the behavior of the connection by using the `Pipe::withProperty()` and `Message::withHeader()` methods.
You can change the behavior of the connection by using the `Pipe::withProperty()` and `Message::withProperty()` methods.
Some of them are used by the RabbitMQConnector by setting some default values:

* `Pipe::withProperty(RabbitMQConnector::EXCHANGE)` - Set the exchange name. Default is the queue name.
* `Pipe::withProperty(RabbitMQConnector::ROUTING_KEY)` - Set the routing key. Default is the queue name.
* `Pipe::withProperty('x-message-ttl')` - Only affects dead letter queues. Set the time to live of the message in milliseconds. Default 3 days.
* `Pipe::withProperty('x-expires')` - Only affects dead letter queues. Set the time to live of the queue in milliseconds. Default 3 days.
* `Message::withHeader('content_type')` - Set the content type of the message. Default is text/plain.
* `Message::withHeader('delivery_mode')` - Set the delivery mode of the message. Default is 2 (persistent).
* `Message::withProperty('content_type')` - Set the content type of the message. Default is text/plain.
* `Message::withProperty('delivery_mode')` - Set the delivery mode of the message. Default is 2 (persistent).

Protocols:

Expand Down
40 changes: 21 additions & 19 deletions src/RabbitMQConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public function getConnection()
* @param Pipe $pipe
* @return AMQPChannel
*/
protected function createQueue($connection, Pipe &$pipe)
protected function createQueue($connection, Pipe &$pipe, $withExchange = true)
{
$pipe->setPropertyIfNull('exchange_type', AMQPExchangeType::DIRECT);
$pipe->setPropertyIfNull(self::EXCHANGE, $pipe->getName());
Expand All @@ -101,8 +101,8 @@ protected function createQueue($connection, Pipe &$pipe)
$dlqProperties = $dlq->getProperties();
$dlqProperties['x-dead-letter-exchange'] = $dlq->getProperty(self::EXCHANGE, $dlq->getName());
// $dlqProperties['x-dead-letter-routing-key'] = $routingKey;
$dlqProperties['x-message-ttl'] = $dlq->getProperty('x-message-ttl', 3600 * 72*1000);
$dlqProperties['x-expires'] = $dlq->getProperty('x-expires', 3600 * 72*1000 + 1000);
// $dlqProperties['x-message-ttl'] = $dlq->getProperty('x-message-ttl', 3600 * 72*1000);
// $dlqProperties['x-expires'] = $dlq->getProperty('x-expires', 3600 * 72*1000 + 1000);
$amqpTable = new AMQPTable($dlqProperties);
}

Expand All @@ -124,35 +124,37 @@ protected function createQueue($connection, Pipe &$pipe)
durable: true // the exchange will survive server restarts
auto_delete: false //the exchange won't be deleted once the channel is closed.
*/
$channel->exchange_declare($pipe->getProperty(self::EXCHANGE, $pipe->getName()), $pipe->getProperty('exchange_type'), false, true, false);
if ($withExchange) {
$channel->exchange_declare($pipe->getProperty(self::EXCHANGE, $pipe->getName()), $pipe->getProperty('exchange_type'), false, true, false);
}

$channel->queue_bind($pipe->getName(), $pipe->getProperty(self::EXCHANGE, $pipe->getName()), $pipe->getProperty(self::ROUTING_KEY, $pipe->getName()));

return $channel;
}

protected function lazyConnect(Pipe &$pipe)
protected function lazyConnect(Pipe &$pipe, $withExchange = true)
{
$connection = $this->getConnection();
$channel = $this->createQueue($connection, $pipe);
$channel = $this->createQueue($connection, $pipe, $withExchange);

return [$connection, $channel];
}


public function publish(Envelope $envelope)
{
$headers = $envelope->getMessage()->getHeaders();
$headers['content_type'] = $headers['content_type'] ?? 'text/plain';
$headers['delivery_mode'] = $headers['delivery_mode'] ?? AMQPMessage::DELIVERY_MODE_PERSISTENT;
$properties = $envelope->getMessage()->getProperties();
$properties['content_type'] = $properties['content_type'] ?? 'text/plain';
$properties['delivery_mode'] = $properties['delivery_mode'] ?? AMQPMessage::DELIVERY_MODE_PERSISTENT;

$pipe = clone $envelope->getPipe();

list($connection, $channel) = $this->lazyConnect($pipe);

$rabbitMQMessageBody = $envelope->getMessage()->getBody();

$rabbitMQMessage = new AMQPMessage($rabbitMQMessageBody, $headers);
$rabbitMQMessage = new AMQPMessage($rabbitMQMessageBody, $properties);

$channel->basic_publish($rabbitMQMessage, $pipe->getProperty(self::EXCHANGE, $pipe->getName()), $pipe->getName());

Expand All @@ -164,21 +166,21 @@ public function consume(Pipe $pipe, \Closure $onReceive, \Closure $onError, $ide
{
$pipe = clone $pipe;

list($connection, $channel) = $this->lazyConnect($pipe);
list($connection, $channel) = $this->lazyConnect($pipe, false);

/**
* @param \PhpAmqpLib\Message\AMQPMessage $rabbitMQMessage
*/
$closure = function ($rabbitMQMessage) use ($onReceive, $onError, $pipe) {
$message = new Message($rabbitMQMessage->body);
$message->withHeaders($rabbitMQMessage->get_properties());
$message->withHeader('consumer_tag', $rabbitMQMessage->getConsumerTag());
$message->withHeader('delivery_tag', $rabbitMQMessage->getDeliveryTag());
$message->withHeader('redelivered', $rabbitMQMessage->isRedelivered());
$message->withHeader('exchange', $rabbitMQMessage->getExchange());
$message->withHeader('routing_key', $rabbitMQMessage->getRoutingKey());
$message->withHeader('body_size', $rabbitMQMessage->getBodySize());
$message->withHeader('message_count', $rabbitMQMessage->getMessageCount());
$message->withProperties($rabbitMQMessage->get_properties());
$message->withProperty('consumer_tag', $rabbitMQMessage->getConsumerTag());
$message->withProperty('delivery_tag', $rabbitMQMessage->getDeliveryTag());
$message->withProperty('redelivered', $rabbitMQMessage->isRedelivered());
$message->withProperty('exchange', $rabbitMQMessage->getExchange());
$message->withProperty('routing_key', $rabbitMQMessage->getRoutingKey());
$message->withProperty('body_size', $rabbitMQMessage->getBodySize());
$message->withProperty('message_count', $rabbitMQMessage->getMessageCount());

$envelope = new Envelope($pipe, $message);

Expand Down
16 changes: 8 additions & 8 deletions tests/RabbitMQConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function testPublishConsume()
'routing_key' => 'test',
'body_size' => 4,
'message_count' => null,
], $envelope->getMessage()->getHeaders());
], $envelope->getMessage()->getProperties());
$this->assertEquals([
"exchange_type" => "direct",
'_x_exchange' => 'test',
Expand Down Expand Up @@ -91,7 +91,7 @@ public function testPublishConsumeRequeue()
'routing_key' => 'test',
'body_size' => 12,
'message_count' => null,
], $envelope->getMessage()->getHeaders());
], $envelope->getMessage()->getProperties());
$this->assertEquals([
"exchange_type" => "direct",
'_x_exchange' => 'test',
Expand Down Expand Up @@ -121,7 +121,7 @@ public function testConsumeMessageRequeued()
'routing_key' => 'test',
'body_size' => 12,
'message_count' => null,
], $envelope->getMessage()->getHeaders());
], $envelope->getMessage()->getProperties());
$this->assertEquals([
"exchange_type" => "direct",
'_x_exchange' => 'test',
Expand Down Expand Up @@ -157,7 +157,7 @@ public function testPublishConsumeWithDlq()
'routing_key' => 'test2',
'body_size' => 7,
'message_count' => null,
], $envelope->getMessage()->getHeaders());
], $envelope->getMessage()->getProperties());
$this->assertEquals([
"exchange_type" => "direct",
'_x_exchange' => 'test2',
Expand Down Expand Up @@ -186,7 +186,7 @@ public function testPublishConsumeWithDlq()
'routing_key' => 'test2',
'body_size' => 9,
'message_count' => null,
], $envelope->getMessage()->getHeaders());
], $envelope->getMessage()->getProperties());
$this->assertEquals([
"exchange_type" => "direct",
'_x_exchange' => 'test2',
Expand All @@ -202,8 +202,8 @@ public function testPublishConsumeWithDlq()
$this->assertEquals("bodydlq_2", $envelope->getMessage()->getBody());
$this->assertEquals("dlq_test2", $envelope->getPipe()->getName());
$this->assertEquals("dlq_test2", $envelope->getPipe()->getProperty(RabbitMQConnector::EXCHANGE));
$headers = $envelope->getMessage()->getHeaders();
unset($headers['application_headers']);
$properties = $envelope->getMessage()->getProperties();
unset($properties['application_headers']);
$this->assertEquals([
'content_type' => 'text/plain',
'delivery_mode' => 2,
Expand All @@ -214,7 +214,7 @@ public function testPublishConsumeWithDlq()
'routing_key' => 'test2',
'body_size' => 9,
'message_count' => null,
], $headers);
], $properties);
$this->assertEquals([
"exchange_type" => "fanout",
'_x_exchange' => 'dlq_test2',
Expand Down

0 comments on commit 63d0e08

Please sign in to comment.