Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
473 lines (446 sloc) 12.2 KB
<?php
namespace Codeception\Module;
use Codeception\Exception\ModuleException;
use Codeception\Lib\Interfaces\RequiresPackage;
use Codeception\Module as CodeceptionModule;
use Codeception\TestInterface;
use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Message\AMQPMessage;
/**
* This module interacts with message broker software that implements
* the Advanced Message Queuing Protocol (AMQP) standard. For example, RabbitMQ (tested).
*
* <div class="alert alert-info">
* To use this module with Composer you need <em>"php-amqplib/php-amqplib": "~2.4"</em> package.
* </div>
*
* ## Config
*
* * host: localhost - host to connect
* * username: guest - username to connect
* * password: guest - password to connect
* * vhost: '/' - vhost to connect
* * cleanup: true - defined queues will be purged before running every test.
* * queues: [mail, twitter] - queues to cleanup
* * single_channel - create and use only one channel during test execution
*
* ### Example
*
* modules:
* enabled:
* - AMQP:
* host: 'localhost'
* port: '5672'
* username: 'guest'
* password: 'guest'
* vhost: '/'
* queues: [queue1, queue2]
* single_channel: false
*
* ## Public Properties
*
* * connection - AMQPStreamConnection - current connection
*/
class AMQP extends CodeceptionModule implements RequiresPackage
{
protected $config = [
'host' => 'localhost',
'username' => 'guest',
'password' => 'guest',
'port' => '5672',
'vhost' => '/',
'cleanup' => true,
'single_channel' => false,
'queues' => []
];
/**
* @var AMQPStreamConnection
*/
public $connection;
/**
* @var int
*/
protected $channelId;
protected $requiredFields = ['host', 'username', 'password', 'vhost'];
public function _requires()
{
return ['PhpAmqpLib\Connection\AMQPStreamConnection' => '"php-amqplib/php-amqplib": "~2.4"'];
}
public function _initialize()
{
$host = $this->config['host'];
$port = $this->config['port'];
$username = $this->config['username'];
$password = $this->config['password'];
$vhost = $this->config['vhost'];
try {
$this->connection = new AMQPStreamConnection($host, $port, $username, $password, $vhost);
} catch (Exception $e) {
throw new ModuleException(__CLASS__, $e->getMessage() . ' while establishing connection to MQ server');
}
}
public function _before(TestInterface $test)
{
if ($this->config['cleanup']) {
$this->cleanup();
}
}
/**
* Sends message to exchange by sending exchange name, message
* and (optionally) a routing key
*
* ``` php
* <?php
* $I->pushToExchange('exchange.emails', 'thanks');
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'));
* $I->pushToExchange('exchange.emails', new AMQPMessage('Thanks!'), 'severity');
* ?>
* ```
*
* @param string $exchange
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
* @param string $routing_key
*/
public function pushToExchange($exchange, $message, $routing_key = null)
{
$message = $message instanceof AMQPMessage
? $message
: new AMQPMessage($message);
$this->getChannel()->basic_publish($message, $exchange, $routing_key);
}
/**
* Sends message to queue
*
* ``` php
* <?php
* $I->pushToQueue('queue.jobs', 'create user');
* $I->pushToQueue('queue.jobs', new AMQPMessage('create'));
* ?>
* ```
*
* @param string $queue
* @param string|\PhpAmqpLib\Message\AMQPMessage $message
*/
public function pushToQueue($queue, $message)
{
$message = $message instanceof AMQPMessage
? $message
: new AMQPMessage($message);
$this->getChannel()->queue_declare($queue);
$this->getChannel()->basic_publish($message, '', $queue);
}
/**
* Declares an exchange
*
* This is an alias of method `exchange_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
*
* ```php
* <?php
* $I->declareExchange(
* 'nameOfMyExchange', // exchange name
* 'topic' // exchange type
* )
* ```
*
* @param string $exchange
* @param string $type
* @param bool $passive
* @param bool $durable
* @param bool $auto_delete
* @param bool $internal
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
public function declareExchange(
$exchange,
$type,
$passive = false,
$durable = false,
$auto_delete = true,
$internal = false,
$nowait = false,
$arguments = null,
$ticket = null
) {
return $this->getChannel()->exchange_declare(
$exchange,
$type,
$passive,
$durable,
$auto_delete,
$internal,
$nowait,
$arguments,
$ticket
);
}
/**
* Declares queue, creates if needed
*
* This is an alias of method `queue_declare` of `PhpAmqpLib\Channel\AMQPChannel`.
*
* ```php
* <?php
* $I->declareQueue(
* 'nameOfMyQueue', // exchange name
* )
* ```
*
* @param string $queue
* @param bool $passive
* @param bool $durable
* @param bool $exclusive
* @param bool $auto_delete
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
public function declareQueue(
$queue = '',
$passive = false,
$durable = false,
$exclusive = false,
$auto_delete = true,
$nowait = false,
$arguments = null,
$ticket = null
) {
return $this->getChannel()->queue_declare(
$queue,
$passive,
$durable,
$exclusive,
$auto_delete,
$nowait,
$arguments,
$ticket
);
}
/**
* Binds a queue to an exchange
*
* This is an alias of method `queue_bind` of `PhpAmqpLib\Channel\AMQPChannel`.
*
* ```php
* <?php
* $I->bindQueueToExchange(
* 'nameOfMyQueueToBind', // name of the queue
* 'transactionTracking.transaction', // exchange name to bind to
* 'your.routing.key' // Optionally, provide a binding key
* )
* ```
*
* @param string $queue
* @param string $exchange
* @param string $routing_key
* @param bool $nowait
* @param array $arguments
* @param int $ticket
* @return mixed|null
*/
public function bindQueueToExchange(
$queue,
$exchange,
$routing_key = '',
$nowait = false,
$arguments = null,
$ticket = null
) {
return $this->getChannel()->queue_bind(
$queue,
$exchange,
$routing_key,
$nowait,
$arguments,
$ticket
);
}
/**
* Add a queue to purge list
*
* @param string $queue
*/
public function scheduleQueueCleanup($queue)
{
if (!in_array($queue, $this->config['queues'])) {
$this->config['queues'][] = $queue;
}
}
/**
* Checks if message containing text received.
*
* **This method drops message from queue**
* **This method will wait for message. If none is sent the script will stuck**.
*
* ``` php
* <?php
* $I->pushToQueue('queue.emails', 'Hello, davert');
* $I->seeMessageInQueueContainsText('queue.emails','davert');
* ?>
* ```
*
* @param string $queue
* @param string $text
*/
public function seeMessageInQueueContainsText($queue, $text)
{
$msg = $this->getChannel()->basic_get($queue);
if (!$msg) {
$this->fail("Message was not received");
}
if (!$msg instanceof AMQPMessage) {
$this->fail("Received message is not format of AMQPMessage");
}
$this->debugSection("Message", $msg->body);
$this->assertStringContainsString($text, $msg->body);
}
/**
* Count messages in queue.
*
* @param string $queue
*
* @return int
*/
public function _countMessage($queue)
{
list($queue, $messageCount) = $this->getChannel()->queue_declare($queue, true);
return $messageCount;
}
/**
* Checks that queue have expected number of message
*
* ``` php
* <?php
* $I->pushToQueue('queue.emails', 'Hello, davert');
* $I->seeNumberOfMessagesInQueue('queue.emails',1);
* ?>
* ```
*
* @param string $queue
* @param int $expected
*/
public function seeNumberOfMessagesInQueue($queue, $expected)
{
$messageCount = $this->_countMessage($queue);
$this->assertEquals($expected, $messageCount);
}
/**
* Checks that queue is empty
*
* ``` php
* <?php
* $I->pushToQueue('queue.emails', 'Hello, davert');
* $I->purgeQueue('queue.emails');
* $I->seeQueueIsEmpty('queue.emails');
* ?>
* ```
*
* @param string $queue
* @param int $expected
*/
public function seeQueueIsEmpty($queue)
{
$messageCount = $this->_countMessage($queue);
$this->assertEquals(0, $messageCount);
}
/**
* Checks if queue is not empty.
*
* ``` php
* <?php
* $I->pushToQueue('queue.emails', 'Hello, davert');
* $I->dontSeeQueueIsEmpty('queue.emails');
* ?>
* ```
*
* @param string $queue
*/
public function dontSeeQueueIsEmpty($queue)
{
$messageCount = $this->_countMessage($queue);
$this->assertNotEquals(0, $messageCount);
}
/**
* Takes last message from queue.
*
* ``` php
* <?php
* $message = $I->grabMessageFromQueue('queue.emails');
* ?>
* ```
*
* @param string $queue
* @return \PhpAmqpLib\Message\AMQPMessage
*/
public function grabMessageFromQueue($queue)
{
$message = $this->getChannel()->basic_get($queue);
return $message;
}
/**
* Purge a specific queue defined in config.
*
* ``` php
* <?php
* $I->purgeQueue('queue.emails');
* ?>
* ```
*
* @param string $queueName
*/
public function purgeQueue($queueName = '')
{
if (! in_array($queueName, $this->config['queues'])) {
throw new ModuleException(__CLASS__, "'$queueName' doesn't exist in queues config list");
}
$this->getChannel()->queue_purge($queueName, true);
}
/**
* Purge all queues defined in config.
*
* ``` php
* <?php
* $I->purgeAllQueues();
* ?>
* ```
*/
public function purgeAllQueues()
{
$this->cleanup();
}
/**
* @return \PhpAmqpLib\Channel\AMQPChannel
*/
protected function getChannel()
{
if ($this->config['single_channel'] && $this->channelId === null) {
$this->channelId = $this->connection->get_free_channel_id();
}
return $this->connection->channel($this->channelId);
}
protected function cleanup()
{
if (!isset($this->config['queues'])) {
throw new ModuleException(__CLASS__, "please set queues for cleanup");
}
if (!$this->connection) {
return;
}
foreach ($this->config['queues'] as $queue) {
try {
$this->getChannel()->queue_purge($queue);
} catch (AMQPProtocolChannelException $e) {
// ignore if exchange/queue doesn't exist and rethrow exception if it's something else
if ($e->getCode() !== 404) {
throw $e;
}
}
}
}
}
You can’t perform that action at this time.