diff --git a/README.md b/README.md index c2bf70a..3c0ce87 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ A unified front-end for different queuing backends. Includes a REST server, CLI ## Why PHP-Queue? ## -The pains of implementing a queueing system (eg. Beanstalk, Amazon SQS, RabbitMQ) for your application: +Implementing a queueing system (eg. Beanstalk, Amazon SQS, RabbitMQ) for your application can be painful: * Which one is most efficient? Performant? * Learning curve to effectively implement the queue backend & the libraries. @@ -170,6 +170,27 @@ You can extend the `PHPQueue\Cli` class to customize your own CLI batch jobs (eg You can read more about the [Runners here](https://github.com/CoderKungfu/php-queue/blob/master/demo/runners/README.md). +## Interfaces ## + +The queue backends will support one or more of these interfaces: + +* AtomicReadBuffer + +This is the recommended way to consume messages. AtomicReadBuffer provides the +popAtomic($callback) interface, which rolls back the popped record if the +callback returns by exception. For example: + $queue = new PHPQueue\Backend\PDO($options); + + $queue->popAtomic(function ($message) use ($processor) { + $processor->churn($message); + }); + +The message will only be popped if churn() returns successfully. + +* FifoQueueStore + +A first in first out queue accessed by push and pop. + --- ## License ## diff --git a/composer.json b/composer.json index ce6d682..085c069 100644 --- a/composer.json +++ b/composer.json @@ -20,7 +20,7 @@ "require": { "php": ">=5.3.0", "monolog/monolog": "~1.3", - "clio/clio": "@stable" + "clio/clio": "0.1.*" }, "require-dev": { "mrpoundsign/pheanstalk-5.3": "dev-master", diff --git a/src/PHPQueue/Backend/Beanstalkd.php b/src/PHPQueue/Backend/Beanstalkd.php index 1556409..f763e08 100644 --- a/src/PHPQueue/Backend/Beanstalkd.php +++ b/src/PHPQueue/Backend/Beanstalkd.php @@ -3,11 +3,11 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; -use PHPQueue\Interfaces\IndexedFifoQueueStore; +use PHPQueue\Interfaces\FifoQueueStore; class Beanstalkd extends Base - implements IndexedFifoQueueStore + implements FifoQueueStore { public $server_uri; public $tube; diff --git a/src/PHPQueue/Backend/IronMQ.php b/src/PHPQueue/Backend/IronMQ.php index eb195fd..0cbefd9 100644 --- a/src/PHPQueue/Backend/IronMQ.php +++ b/src/PHPQueue/Backend/IronMQ.php @@ -2,11 +2,11 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; -use PHPQueue\Interfaces\IndexedFifoQueueStore; +use PHPQueue\Interfaces\FifoQueueStore; class IronMQ extends Base - implements IndexedFifoQueueStore + implements FifoQueueStore { public $token = null; public $project_id = null; diff --git a/src/PHPQueue/Backend/Memcache.php b/src/PHPQueue/Backend/Memcache.php index 7c4165e..b7c03f1 100644 --- a/src/PHPQueue/Backend/Memcache.php +++ b/src/PHPQueue/Backend/Memcache.php @@ -2,11 +2,9 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; -use PHPQueue\Interfaces\KeyValueStore; class Memcache extends Base - implements KeyValueStore { public $servers; public $is_persistent = false; @@ -65,11 +63,18 @@ public function add($key, $data, $expiry=null) /** * @param string $key * @param mixed $data - * @param int $expiry Deprecated param + * @param array|int $properties array is preferred, "expiry" is the only key used here. Deprecated int argument will also be used as expiry. * @throws \PHPQueue\Exception */ - public function set($key, $data, $expiry=null) + public function set($key, $data, $properties=array()) { + if (is_array($properties) && isset($properties["expiry"])) { + $expiry = $properties["expiry"]; + } else if (is_numeric($properties)) { + $expiry = $properties; + } else { + $expiry = $this->expiry; + } if (empty($key) && !is_string($key)) { throw new BackendException("Key is invalid."); } @@ -77,9 +82,6 @@ public function set($key, $data, $expiry=null) throw new BackendException("No data."); } $this->beforeAdd(); - if (empty($expiry)) { - $expiry = $this->expiry; - } $status = $this->getConnection()->replace($key, json_encode($data), $this->use_compression, $expiry); if ($status == false) { $status = $this->getConnection()->set($key, json_encode($data), $this->use_compression, $expiry); diff --git a/src/PHPQueue/Backend/MongoDB.php b/src/PHPQueue/Backend/MongoDB.php index 776f555..6a4440f 100644 --- a/src/PHPQueue/Backend/MongoDB.php +++ b/src/PHPQueue/Backend/MongoDB.php @@ -5,11 +5,9 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; -use PHPQueue\Interfaces\KeyValueStore; class MongoDB extends Base - implements KeyValueStore { public $server_uri; public $db_name; @@ -75,7 +73,7 @@ public function add($data=null, $key=null) * @throws \PHPQueue\Exception\BackendException * @return boolean Deprecated (always true) */ - public function set($key, $data) + public function set($key, $data, $properties=array()) { if (empty($data) || !is_array($data)) { throw new BackendException("No data."); diff --git a/src/PHPQueue/Backend/PDO.php b/src/PHPQueue/Backend/PDO.php index c9a5610..86593b0 100644 --- a/src/PHPQueue/Backend/PDO.php +++ b/src/PHPQueue/Backend/PDO.php @@ -2,12 +2,13 @@ namespace PHPQueue\Backend; use PHPQueue\Exception\BackendException; -use PHPQueue\Interfaces\IndexedFifoQueueStore; -use PHPQueue\Interfaces\KeyValueStore; +use PHPQueue\Interfaces\AtomicReadBuffer; +use PHPQueue\Interfaces\FifoQueueStore; class PDO extends Base - implements IndexedFifoQueueStore, KeyValueStore + implements AtomicReadBuffer, + FifoQueueStore { private $connection_string; private $db_user; @@ -27,11 +28,15 @@ public function __construct($options=array()) if (!empty($options['db_password'])) { $this->db_password = $options['db_password']; } + if (!empty($options['queue'])) { + $this->db_table = $options['queue']; + } if (!empty($options['db_table'])) { $this->db_table = $options['db_table']; } if (!empty($options['pdo_options']) && is_array($options['pdo_options'])) { - $this->pdo_options = array_merge($this->pdo_options, $options['pdo_options']); + // Use + operator instead of array_merge to preserve integer keys + $this->pdo_options = $options['pdo_options'] + $this->pdo_options; } } @@ -64,26 +69,59 @@ public function add($data = null) public function push($data) { - $sql = sprintf('INSERT INTO `%s` (`data`) VALUES (?)', $this->db_table); - $sth = $this->getConnection()->prepare($sql); - $_tmp = json_encode($data); - $sth->bindParam(1, $_tmp, \PDO::PARAM_STR); - $sth->execute(); - $this->last_job_id = $this->getConnection()->lastInsertId(); + try { + $success = $this->insert($data); + if (!$success) { + throw new BackendException( + 'Statement failed: ' . + implode(' - ', $this->getConnection()->errorInfo()) + ); + } + } catch (\Exception $ex) { + // TODO: Log original error and table creation attempt. + $this->createTable($this->db_table); + + // Try again. + if (!$this->insert($data)) { + throw new BackendException('Statement failed: ' . + implode(' - ', $this->getConnection()->errorInfo()) + ); + } + } + $this->last_job_id = $this->getConnection()->lastInsertId(); return $this->last_job_id; } - public function set($id, $data) + protected function insert($data) + { + $sql = sprintf('INSERT INTO `%s` (`data`, `timestamp`) VALUES (?, ?)', $this->db_table); + $sth = $this->getConnection()->prepare($sql); + if ($sth === false) { + throw new \Exception('Could not prepare statement'); + } + $_tmp = json_encode($data); + $sth->bindValue(1, $_tmp, \PDO::PARAM_STR); + $sth->bindValue(2, self::getTimeStamp(), \PDO::PARAM_STR); + return $sth->execute(); + } + + public function set($id, $data, $properties=array()) { - $sql = sprintf('REPLACE INTO `%s` (`id`, `data`) VALUES (?, ?)', $this->db_table); + $sql = sprintf('REPLACE INTO `%s` (`id`, `data`, `timestamp`) VALUES (?, ?, ?)', $this->db_table); $sth = $this->getConnection()->prepare($sql); $_tmp = json_encode($data); - $sth->bindParam(1, $id, \PDO::PARAM_INT); - $sth->bindParam(2, $_tmp, \PDO::PARAM_STR); + $sth->bindValue(1, $id, \PDO::PARAM_INT); + $sth->bindValue(2, $_tmp, \PDO::PARAM_STR); + $sth->bindValue(3, self::getTimeStamp(), \PDO::PARAM_STR); $sth->execute(); } + protected static function getTimeStamp() + { + $now = new \DateTime('now', new \DateTimeZone('UTC')); + return $now->format('Y-m-d\TH:i:s.u'); + } /** * @return array|null The retrieved record, or null if nothing was found. */ @@ -96,7 +134,7 @@ public function get($id=null) $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE `id` = ?', $this->db_table); $sth = $this->getConnection()->prepare($sql); - $sth->bindParam(1, $id, \PDO::PARAM_INT); + $sth->bindValue(1, $id, \PDO::PARAM_INT); $sth->execute(); $result = $sth->fetch(\PDO::FETCH_ASSOC); @@ -110,19 +148,46 @@ public function get($id=null) public function pop() { - // Where $id is null, get oldest message + // Get oldest message. $sql = sprintf('SELECT `id`, `data` FROM `%s` WHERE 1 ORDER BY id ASC LIMIT 1', $this->db_table); $sth = $this->getConnection()->prepare($sql); + + // This will be false if the table or collection does not exist + if ( ! $sth ) { + return null; + } + $sth->execute(); $result = $sth->fetch(\PDO::FETCH_ASSOC); if ($result) { $this->last_job_id = $result['id']; + $this->clear($result['id']); return json_decode($result['data'], true); } return null; } + public function popAtomic($callback) { + try { + $this->getConnection()->beginTransaction(); + $data = $this->pop(); + + if ($data !== null) { + if (!is_callable($callback)) { + throw new \RuntimeException("Bad callback passed to " . __METHOD__); + } + call_user_func($callback, $data); + } + + $this->getConnection()->commit(); + return $data; + } catch (\Exception $ex) { + $this->getConnection()->rollBack(); + throw $ex; + } + } + public function clear($id = null) { if (empty($id)) { @@ -131,7 +196,7 @@ public function clear($id = null) try { $sql = sprintf('DELETE FROM `%s` WHERE `id` = ?', $this->db_table); $sth = $this->getConnection()->prepare($sql); - $sth->bindParam(1, $id, \PDO::PARAM_INT); + $sth->bindValue(1, $id, \PDO::PARAM_INT); $sth->execute(); } catch (\Exception $ex) { throw new BackendException('Invalid ID.'); @@ -156,16 +221,36 @@ public function createTable($table_name) if (empty($table_name)) { throw new BackendException('Invalid table name.'); } - $sql = sprintf("CREATE TABLE IF NOT EXISTS `%s` ( - `id` mediumint(20) NOT NULL AUTO_INCREMENT, - `data` mediumtext NULL DEFAULT '', - PRIMARY KEY (`id`) - ) ENGINE=InnoDB DEFAULT CHARSET=latin1 AUTO_INCREMENT=1 ;", $table_name); + switch ($this->getDriverName()) { + case 'mysql': + $sql = sprintf("CREATE TABLE IF NOT EXISTS `%s` ( + `id` mediumint(20) NOT NULL AUTO_INCREMENT, + `data` mediumtext NULL, + `timestamp` datetime NOT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1 ;", $table_name); + break; + case 'sqlite': + $sql = sprintf("CREATE TABLE IF NOT EXISTS `%s` ( + `id` INTEGER PRIMARY KEY, + `data` text NULL, + `timestamp` datetime NOT NULL + );", $table_name); + break; + default: + throw new BackendException('Unknown database driver: ' . $this->getDriverName()); + } $this->getConnection()->exec($sql); + // FIXME: we already signal failure using exceptions, should return void. return true; } + protected function getDriverName() + { + return $this->getConnection()->getAttribute(\PDO::ATTR_DRIVER_NAME); + } + public function deleteTable($table_name) { if (empty($table_name)) { diff --git a/src/PHPQueue/Backend/Predis.php b/src/PHPQueue/Backend/Predis.php index 1778f31..26e1306 100644 --- a/src/PHPQueue/Backend/Predis.php +++ b/src/PHPQueue/Backend/Predis.php @@ -1,31 +1,25 @@ expiry = $options['expiry']; } - if (!empty($options['order_key'])) { - $this->order_key = $options['order_key']; - $this->redis_options['prefix'] = $this->queue_name . ':'; - } - if (!empty($options['correlation_key'])) { - $this->correlation_key = $options['correlation_key']; - } } public function connect() @@ -92,23 +74,10 @@ public function push($data) throw new BackendException("No queue specified."); } $encoded_data = json_encode($data); - if ($this->order_key) { - if (!$this->correlation_key) { - throw new BackendException("Cannot push to indexed fifo queue without a correlation key."); - } - $key = $data[$this->correlation_key]; - if (!$key) { - throw new BackendException("Cannot push to indexed fifo queue without correlation data."); - } - $status = $this->addToIndexedFifoQueue($key, $data); - if (!$status) { - throw new BackendException("Couldn't push to indexed fifo queue."); - } - } else { - // Note that we're ignoring the "new length" return value, cos I don't - // see how to make it useful. - $this->getConnection()->rpush($this->queue_name, $encoded_data); - } + + // Note that we're ignoring the "new length" return value, cos I don't + // see how to make it useful. + $this->getConnection()->rpush($this->queue_name, $encoded_data); } /** @@ -121,34 +90,63 @@ public function pop() if (!$this->hasQueue()) { throw new BackendException("No queue specified."); } - if ($this->order_key) { - // Pop the first element. - // - // Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php - $options = array( - 'cas' => true, - 'watch' => self::FIFO_INDEX, - 'retry' => 3, - ); - $order_key = $this->order_key; - $this->getConnection()->transaction($options, function ($tx) use ($order_key, &$data) { - // Look up the first element in the FIFO ordering. - $values = $tx->zrange(self::FIFO_INDEX, 0, 0); - if ($values) { - // Use that value as a key into the key-value block, to get the data. - $key = $values[0]; - $data = $tx->get($key); - - // Begin transaction. - $tx->multi(); - - // Remove from both indexes. - $tx->zrem(self::FIFO_INDEX, $key); - $tx->del($key); - } - }); + $data = $this->getConnection()->lpop($this->queue_name); + if (!$data) { + return null; + } + $this->last_job = $data; + $this->last_job_id = time(); + $this->afterGet(); + + return Json::safe_decode($data); + } + + public function popAtomic($callback) { + if (!$this->hasQueue()) { + throw new BackendException("No queue specified."); + } + + // Pop and process the first element, erring on the side of + // at-least-once processing where the callback might get the same + // element before it's popped in the case of a race. + $options = array( + 'cas' => true, + 'watch' => $this->queue_name, + 'retry' => 3, + ); + $data = null; + $self = $this; + $this->getConnection()->transaction($options, function ($tx) use (&$data, $callback, $self) { + // Begin transaction. + $tx->multi(); + + $data = $tx->lpop($self->queue_name); + $data = Json::safe_decode($data); + if ($data !== null) { + call_user_func($callback, $data); + } + }); + return $data; + } + + /** + * Return the top element in the queue. + * + * @return array|null + */ + public function peek() + { + $data = null; + $this->beforeGet(); + if (!$this->hasQueue()) { + throw new BackendException("No queue specified."); + } + $data_range = $this->getConnection()->lrange($this->queue_name, 0, 0); + if (!$data_range) { + return null; } else { - $data = $this->getConnection()->lpop($this->queue_name); + // Unpack list. + $data = $data_range[0]; } if (!$data) { return null; @@ -157,7 +155,7 @@ public function pop() $this->last_job_id = time(); $this->afterGet(); - return json_decode($data, true); + return Json::safe_decode($data); } public function release($jobId=null) @@ -168,8 +166,8 @@ public function release($jobId=null) } $job_data = $this->open_items[$jobId]; $status = $this->getConnection()->rpush($this->queue_name, $job_data); - if (!$status) { - throw new BackendException("Unable to save data."); + if (!is_int($status)) { + throw new BackendException('Unable to save data: ' . $status->getMessage()); } $this->last_job_id = $jobId; $this->afterClearRelease(); @@ -185,9 +183,10 @@ public function setKey($key=null, $data=null) /** * @param string $key * @param array|string $data + * @param array $properties * @throws \PHPQueue\Exception */ - public function set($key, $data) + public function set($key, $data, $properties=array()) { if (!$key || !is_string($key)) { throw new BackendException("Key is invalid."); @@ -198,9 +197,7 @@ public function set($key, $data) $this->beforeAdd(); try { $status = false; - if ($this->order_key) { - $status = $this->addToIndexedFifoQueue($key, $data); - } elseif (is_array($data)) { + if (is_array($data)) { // FIXME: Assert $status = $this->getConnection()->hmset($key, $data); } elseif (is_string($data) || is_numeric($data)) { @@ -210,43 +207,14 @@ public function set($key, $data) $status = $this->getConnection()->set($key, $data); } } - if (!$status) { - throw new BackendException("Unable to save data."); + if (!self::boolStatus($status)) { + throw new BackendException('Unable to save data.: ' . $status->getMessage()); } } catch (\Exception $ex) { throw new BackendException($ex->getMessage(), $ex->getCode()); } } - /** - * Store the data under its order and correlation keys - * - * @param string $key - * @param array $data - */ - protected function addToIndexedFifoQueue($key, $data) - { - $options = array( - 'cas' => true, - 'watch' => self::FIFO_INDEX, - 'retry' => 3, - ); - $score = $data[$this->order_key]; - $encoded_data = json_encode($data); - $status = false; - $expiry = $this->expiry; - $this->getConnection()->transaction($options, function ($tx) use ($key, $score, $encoded_data, $expiry, &$status) { - $tx->multi(); - $tx->zadd(self::FIFO_INDEX, $score, $key); - if ($expiry) { - $status = $tx->setex($key, $expiry, $encoded_data); - } else { - $status = $tx->set($key, $encoded_data); - } - }); - return $status; - } - /** @deprecated */ public function getKey($key=null) { @@ -268,10 +236,6 @@ public function get($key=null) return null; } $this->beforeGet($key); - if ($this->order_key) { - $data = $this->getConnection()->get($key); - return json_decode($data, true); - } $type = $this->getConnection()->type($key); switch ($type) { case self::TYPE_STRING: @@ -307,16 +271,7 @@ public function clear($key) { $this->beforeClear($key); - if ($this->order_key) { - $result = $this->getConnection()->pipeline() - ->zrem(self::FIFO_INDEX, $key) - ->del($key) - ->execute(); - - $num_removed = $result[1]; - } else { - $num_removed = $this->getConnection()->del($key); - } + $num_removed = $this->getConnection()->del($key); $this->afterClearRelease(); @@ -334,7 +289,7 @@ public function incrKey($key, $count=1) $status = $this->getConnection()->incrby($key, $count); } - return $status; + return is_int($status); } public function decrKey($key, $count=1) @@ -348,7 +303,7 @@ public function decrKey($key, $count=1) $status = $this->getConnection()->decrby($key, $count); } - return $status; + return is_int($status); } public function keyExists($key) @@ -361,4 +316,8 @@ public function hasQueue() { return !empty($this->queue_name); } + + protected static function boolStatus(ResponseInterface $status) { + return ($status == 'OK' || $status == 'QUEUED'); + } } diff --git a/src/PHPQueue/Backend/Stomp.php b/src/PHPQueue/Backend/Stomp.php index e855af3..65bd705 100644 --- a/src/PHPQueue/Backend/Stomp.php +++ b/src/PHPQueue/Backend/Stomp.php @@ -7,7 +7,6 @@ use PHPQueue\Exception\BackendException; use PHPQueue\Exception\JobNotFoundException; use PHPQueue\Interfaces\FifoQueueStore; -use PHPQueue\Interfaces\KeyValueStore; /** * Wrap a STOMP queue @@ -18,7 +17,7 @@ */ class Stomp extends Base - implements FifoQueueStore, KeyValueStore + implements FifoQueueStore { public $queue_name; public $uri; diff --git a/src/PHPQueue/Exception/JsonException.php b/src/PHPQueue/Exception/JsonException.php new file mode 100644 index 0000000..2f825b7 --- /dev/null +++ b/src/PHPQueue/Exception/JsonException.php @@ -0,0 +1,9 @@ +markTestSkipped('Memcache not installed'); - } else { - $options = array( - 'servers' => array( - array('localhost', 11211) - ) - , 'expiry' => 600 - ); - $this->object = new Memcache($options); } + + $options = array( + 'servers' => array( + array('localhost', 11211) + ), + 'expiry' => 600, + ); + + // Try to connect to Memcache, skip test politely if unavailable. + try { + $connection = new \Memcache(); + $connection->addserver($options['servers'][0][0], $options['servers'][0][1]); + $success = $connection->set('test' . mt_rand(), 'foo', 1); + if ( !$success ) { + throw new \Exception("Couldn't store to Memcache"); + } + } catch (\Exception $ex) { + $this->markTestSkipped($ex->getMessage()); + } + + $this->object = new Memcache($options); } public function testAdd() diff --git a/test/PHPQueue/Backend/PDOBaseTest.php b/test/PHPQueue/Backend/PDOBaseTest.php new file mode 100644 index 0000000..267e952 --- /dev/null +++ b/test/PHPQueue/Backend/PDOBaseTest.php @@ -0,0 +1,174 @@ +markTestSkipped( 'PDO extension is not installed' ); + } + } + + public function tearDown() + { + if ($this->object) { + $result = $this->object->deleteTable('pdotest'); + $this->assertTrue($result); + } + + parent::tearDown(); + } + + public function testAddGet() + { + + $data1 = array('2', 'Boo', 'Moeow'); + $data2 = array('1','Willy','Wonka'); + + // Queue first message + $this->assertTrue($this->object->add($data1)); + $this->assertEquals(1, $this->object->last_job_id); + + // Queue second message + $this->assertTrue($this->object->add($data2)); + + // Check get method + $this->assertEquals($data2, $this->object->get($this->object->last_job_id)); + + // Check get method with no message ID. + $this->assertEquals($data1, $this->object->get()); + } + + /** + * @depends testAddGet + */ + public function testClear() + { + // TODO: Include test fixtures instead of relying on side effect. + $this->testAddGet(); + + $jobId = 1; + $result = $this->object->clear($jobId); + $this->assertTrue($result); + + $result = $this->object->get($jobId); + $this->assertNull($result); + } + + public function testSet() + { + $data = array(mt_rand(), 'Gas', 'Prom'); + + // Set message. + $this->object->set(3, $data); + + $this->assertEquals($data, $this->object->get(3)); + } + + public function testPush() + { + $data = array(mt_rand(), 'Snow', 'Den'); + + // Set message. + $id = $this->object->push($data); + $this->assertTrue($id > 0); + $this->assertEquals($data, $this->object->get($id)); + } + + public function testPop() + { + $data = array(mt_rand(), 'Snow', 'Den'); + + // Set message. + $id = $this->object->push($data); + $this->assertTrue($id > 0); + $this->assertEquals($data, $this->object->pop()); + } + + public function testPopEmpty() + { + $this->assertNull( $this->object->pop() ); + } + + /** + * popAtomic should pop if the processor callback is successful. + */ + public function testPopAtomicCommit() + { + $data = array(mt_rand(), 'Abbie', 'Hoffman'); + + $this->object->push($data); + $self = $this; + $did_run = false; + $callback = function ($message) use ($self, &$did_run, $data) { + $self->assertEquals($data, $message); + $did_run = true; + }; + $this->assertEquals($data, $this->object->popAtomic($callback)); + $this->assertEquals(true, $did_run); + // Record has really gone away. + $this->assertEquals(null, $this->object->pop()); + } + + /** + * popAtomic should not pop if the processor throws an error. + */ + public function testPopAtomicRollback() + { + $data = array(mt_rand(), 'Abbie', 'Hoffman'); + + $this->object->push($data); + $self = $this; + $callback = function ($message) use ($self, $data) { + $self->assertEquals($data, $message); + throw new \Exception("Foiled!"); + }; + try { + $this->assertEquals($data, $this->object->popAtomic($callback)); + $this->fail("Should have failed by this point"); + } catch (\Exception $ex) { + $this->assertEquals("Foiled!", $ex->getMessage()); + } + + // Punchline: data should still be available for the retry pop. + $this->assertEquals($data, $this->object->pop()); + } + + /** + * popAtomic should not call the callback if there are no messages + */ + public function testPopAtomicEmpty() + { + $did_run = false; + $callback = function ($unused) use (&$did_run) { + $did_run = true; + }; + $data = $this->object->popAtomic($callback); + $this->assertNull($data, 'Should return null on an empty queue'); + $this->assertFalse($did_run, 'Should not call callback without a message'); + } + + /** + * Should be able to push without creating the table first + */ + public function testImplicitCreateTable() + { + $this->object->deleteTable('pdotest'); // created in setUp + $data = array(mt_rand(), 'Daniel', 'Berrigan'); + try { + $id = $this->object->push($data); + $this->assertTrue($id > 0); + $this->assertEquals($data, $this->object->get($id)); + } catch (\Exception $ex) { + $this->fail('Should not throw exception when no table'); + } + } +} diff --git a/test/PHPQueue/Backend/PDOMysqlTest.php b/test/PHPQueue/Backend/PDOMysqlTest.php new file mode 100644 index 0000000..e19b1bb --- /dev/null +++ b/test/PHPQueue/Backend/PDOMysqlTest.php @@ -0,0 +1,28 @@ + 'mysql:host=localhost;dbname=phpqueuetest' + , 'db_table' => 'pdotest' + , 'pdo_options' => array( + \PDO::ATTR_PERSISTENT => true + ) + ); + + // Check that the database exists, and politely skip if not. + try { + new \PDO($options['connection_string']); + } catch ( \PDOException $ex ) { + $this->markTestSkipped('Database access failed: ' . $ex->getMessage()); + } + + $this->object = new PDO($options); + // Create table + $this->assertTrue($this->object->createTable('pdotest')); + $this->object->clearAll(); + } +} diff --git a/test/PHPQueue/Backend/PDOSqliteTest.php b/test/PHPQueue/Backend/PDOSqliteTest.php new file mode 100644 index 0000000..9a7f527 --- /dev/null +++ b/test/PHPQueue/Backend/PDOSqliteTest.php @@ -0,0 +1,18 @@ + 'sqlite::memory:' + , 'db_table' => 'pdotest' + ); + $this->object = new PDO($options); + // Create table + $this->assertTrue($this->object->createTable('pdotest')); + $this->object->clearAll(); + } +} + diff --git a/test/PHPQueue/Backend/PDOTest.php b/test/PHPQueue/Backend/PDOTest.php deleted file mode 100644 index a2cb8e5..0000000 --- a/test/PHPQueue/Backend/PDOTest.php +++ /dev/null @@ -1,110 +0,0 @@ -markTestSkipped('PDO extension is not installed'); - } - $options = array( - 'connection_string' => 'mysql:host=localhost;dbname=phpqueuetest' - , 'db_table' => 'pdotest' - , 'pdo_options' => array( - \PDO::ATTR_PERSISTENT => true - ) - ); - $this->object = new PDO($options); - - // Create table - $this->assertTrue($this->object->createTable('pdotest')); - $this->object->clearAll(); - } - - public function tearDown() - { - if ($this->object) { - $result = $this->object->deleteTable('pdotest'); - $this->assertTrue($result); - } - - parent::tearDown(); - } - - public function testAddGet() - { - - $data1 = array('2', 'Boo', 'Moeow'); - $data2 = array('1','Willy','Wonka'); - - // Queue first message - $this->assertTrue($this->object->add($data1)); - $this->assertEquals(1, $this->object->last_job_id); - - // Queue second message - $this->assertTrue($this->object->add($data2)); - - // Check get method - $this->assertEquals($data2, $this->object->get($this->object->last_job_id)); - - // Check get method with no message ID. - $this->assertEquals($data1, $this->object->get()); - } - - /** - * @depends testAddGet - */ - public function testClear() - { - // TODO: Include test fixtures instead of relying on side effect. - $this->testAddGet(); - - $jobId = 1; - $result = $this->object->clear($jobId); - $this->assertTrue($result); - - $result = $this->object->get($jobId); - $this->assertNull($result); - } - - public function testSet() - { - $data = array(mt_rand(), 'Gas', 'Prom'); - - // Set message. - $this->object->set(3, $data); - - $this->assertEquals($data, $this->object->get(3)); - } - - public function testPush() - { - $data = array(mt_rand(), 'Snow', 'Den'); - - // Set message. - $id = $this->object->push($data); - $this->assertTrue($id > 0); - $this->assertEquals($data, $this->object->get($id)); - } - - public function testPop() - { - $data = array(mt_rand(), 'Snow', 'Den'); - - // Set message. - $id = $this->object->push($data); - $this->assertTrue($id > 0); - $this->assertEquals($data, $this->object->pop()); - } - - public function testPopEmpty() - { - $this->assertNull( $this->object->pop() ); - } -} diff --git a/test/PHPQueue/Backend/PredisTest.php b/test/PHPQueue/Backend/PredisTest.php index 04d6384..ed35dfd 100644 --- a/test/PHPQueue/Backend/PredisTest.php +++ b/test/PHPQueue/Backend/PredisTest.php @@ -1,5 +1,8 @@ object->push($data); $this->assertEquals($data, $this->object->pop()); + + // Check that we did remove the object. + $this->assertNull($this->object->pop()); + } + + /** + * @expectedException PHPQueue\Exception\JsonException + */ + public function testPopBadJson() + { + // Bad JSON + $data = '{"a": bad "Weezle-' . mt_rand() . '"}'; + $this->object->getConnection()->rpush($this->object->queue_name, $data); + + $this->object->pop(); + + $this->fail(); } public function testPopEmpty() { $this->assertNull($this->object->pop()); } + + public function testPeek() + { + $data = 'Weezle-' . mt_rand(); + $this->object->push($data); + + $this->assertEquals($data, $this->object->peek()); + + // Check that we didn't remove the object by peeking. + $this->assertEquals($data, $this->object->pop()); + } } diff --git a/test/PHPQueue/Backend/PredisZsetTest.php b/test/PHPQueue/Backend/PredisZsetTest.php deleted file mode 100644 index ed3d1cf..0000000 --- a/test/PHPQueue/Backend/PredisZsetTest.php +++ /dev/null @@ -1,106 +0,0 @@ -markTestSkipped('Predis not installed'); - } else { - $options = array( - 'servers' => array('host' => '127.0.0.1', 'port' => 6379) - , 'queue' => 'testqueue-' . mt_rand() - , 'order_key' => 'timestamp' - , 'correlation_key' => 'txn_id' - ); - $this->object = new Predis($options); - } - } - - public function tearDown() - { - if ($this->object) { - $this->object->getConnection()->flushall(); - } - parent::tearDown(); - } - - public function testSet() - { - $key = 'A0001'; - $data = array('name' => 'Michael', 'timestamp' => 1); - $this->object->set($key, $data); - - $key = 'A0001'; - $data = array('name' => 'Michael Cheng', 'timestamp' => 2); - $this->object->set($key, $data); - - $key = 'A0002'; - $data = array('name' => 'Michael Cheng', 'timestamp' => 3); - $this->object->set($key, $data); - } - - public function testGet() - { - $key = 'A0001'; - $data1 = array('name' => 'Michael', 'timestamp' => 1); - $this->object->set($key, $data1); - - $key = 'A0001'; - $data2 = array('name' => 'Michael Cheng', 'timestamp' => 2); - $this->object->set($key, $data2); - - $key = 'A0002'; - $data3 = array('name' => 'Michael Cheng', 'timestamp' => 3); - $this->object->set($key, $data3); - - $result = $this->object->get('A0001'); - $this->assertEquals($data2, $result); - - $result = $this->object->getKey('A0002'); - $this->assertEquals($data3, $result); - } - - public function testClear() - { - $key = 'A0002'; - $data = array('name' => 'Adam Wight', 'timestamp' => 2718); - $result = $this->object->set($key, $data); - - $result = $this->object->clear($key); - $this->assertTrue($result); - - $result = $this->object->get($key); - $this->assertNull($result); - } - - public function testClearEmpty() - { - $jobId = 'xxx'; - $this->assertFalse($this->object->clear($jobId)); - } - - public function testPushPop() - { - $data = array( - 'name' => 'Weezle-' . mt_rand(), - 'timestamp' => mt_rand(), - 'txn_id' => mt_rand(), - ); - $this->object->push($data); - - $this->assertEquals($data, $this->object->get($data['txn_id'])); - - $this->assertEquals($data, $this->object->pop()); - - $this->assertNull($this->object->get($data['txn_id'])); - } - - public function testPopEmpty() - { - $this->assertNull($this->object->pop()); - } -}