Permalink
Browse files

merge latest fixes from upstream

  • Loading branch information...
2 parents 737ef97 + 45c49cf commit 8b37122337bd2c1a7803f953ea065716b27a027f Salimane Adjao Moustapha committed Dec 1, 2011
Showing with 124 additions and 59 deletions.
  1. +2 −0 CHANGELOG.markdown
  2. +52 −49 lib/Redisent/Redisent.php
  3. +3 −3 lib/Resque/Job.php
  4. +20 −3 lib/Resque/Redis.php
  5. +19 −2 lib/Resque/RedisCluster.php
  6. +11 −0 lib/Resque/Worker.php
  7. +17 −2 test/Resque/Tests/JobTest.php
View
@@ -2,6 +2,8 @@
* Allow alternate redis database to be selected when calling setBackend by supplying a second argument (patrickbajao)
* Use `require_once` when including php-resque after the app has been included in the sample resque.php to prevent include conflicts (andrewjshults)
+* Wrap job arguments in an array to improve compatibility with ruby resque (warezthebeef)
+* Fix a bug where the worker would spin out of control taking the server with it, if the redis connection was interrupted even briefly. Use SIGPIPE to trap this scenario cleanly. (d11wtq)
## 1.1 (2011-03-27) ##
View
@@ -23,58 +23,62 @@ class RedisException extends Exception {
*/
class Redisent {
- /**
- * Socket connection to the Redis server
- * @var resource
- * @access private
- */
- private $__sock;
+ /**
+ * Socket connection to the Redis server
+ * @var resource
+ * @access private
+ */
+ private $__sock;
- /**
- * Host of the Redis server
- * @var string
- * @access public
- */
- public $host;
+ /**
+ * Host of the Redis server
+ * @var string
+ * @access public
+ */
+ public $host;
- /**
- * Port on which the Redis server is running
- * @var integer
- * @access public
- */
- public $port;
+ /**
+ * Port on which the Redis server is running
+ * @var integer
+ * @access public
+ */
+ public $port;
- /**
- * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
- * @param string $host The hostname of the Redis server
- * @param integer $port The port number of the Redis server
- */
- function __construct($host, $port = 6379) {
- $this->host = $host;
- $this->port = $port;
- $this->__sock = fsockopen($this->host, $this->port, $errno, $errstr);
- if (!$this->__sock) {
- throw new Exception("{$errno} - {$errstr}");
- }
- }
+ /**
+ * Creates a Redisent connection to the Redis server on host {@link $host} and port {@link $port}.
+ * @param string $host The hostname of the Redis server
+ * @param integer $port The port number of the Redis server
+ */
+ function __construct($host, $port = 6379) {
+ $this->host = $host;
+ $this->port = $port;
+ $this->establishConnection();
+ }
- function __destruct() {
- fclose($this->__sock);
- }
+ function establishConnection() {
+ $this->__sock = fsockopen($this->host, $this->port, $errno, $errstr);
+ if (!$this->__sock) {
+ throw new Exception("{$errno} - {$errstr}");
+ }
+ }
- function __call($name, $args) {
+ function __destruct() {
+ fclose($this->__sock);
+ }
- /* Build the Redis unified protocol command */
- array_unshift($args, strtoupper($name));
- $command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array($this, 'formatArgument'), $args), CRLF), CRLF);
+ function __call($name, $args) {
- /* Open a Redis connection and execute the command */
- for ($written = 0; $written < strlen($command); $written += $fwrite) {
- $fwrite = fwrite($this->__sock, substr($command, $written));
- if ($fwrite === FALSE) {
- throw new Exception('Failed to write entire command to stream');
- }
- }
+ /* Build the Redis unified protocol command */
+ array_unshift($args, strtoupper($name));
+ $command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(array($this, 'formatArgument'), $args), CRLF), CRLF);
+
+ /* Open a Redis connection and execute the command */
+ for ($written = 0; $written < strlen($command); $written += $fwrite) {
+ $fwrite = fwrite($this->__sock, substr($command, $written));
+ if ($fwrite === FALSE) {
+ throw new Exception('Failed to write entire command to stream');
+ }
+ }
/* Parse the response based on the reply identifier */
$reply = trim(fgets($this->__sock, 512));
@@ -144,8 +148,7 @@ function __call($name, $args) {
return $response;
}
- private function formatArgument($arg) {
- return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
- }
- }
+ private function formatArgument($arg) {
+ return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
+ }
}
View
@@ -63,7 +63,7 @@ public static function create($queue, $class, $args = null, $monitor = false)
$id = md5(uniqid('', true));
Resque::push($queue, array(
'class' => $class,
- 'args' => $args,
+ 'args' => array($args),
'id' => $id,
));
@@ -128,7 +128,7 @@ public function getArguments()
return array();
}
- return $this->payload['args'];
+ return $this->payload['args'][0];
}
/**
@@ -248,4 +248,4 @@ public function __toString()
return '(' . implode(' | ', $name) . ')';
}
}
-?>
+?>
View
@@ -16,6 +16,11 @@
*/
class Resque_Redis extends Redisent
{
+ /**
+ * Redis namespace
+ * @var string
+ */
+ private static $defaultNamespace = 'resque:';
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
@@ -76,10 +81,22 @@ class Resque_Redis extends Redisent
// msetnx
// mset
// renamenx
-
+
+ /**
+ * Set Redis namespace (prefix) default: resque
+ * @param string $namespace
+ */
+ public static function prefix($namespace)
+ {
+ if (strpos($namespace, ':') === false) {
+ $namespace .= ':';
+ }
+ self::$defaultNamespace = $namespace;
+ }
+
/**
* Magic method to handle all function requests and prefix key based
- * operations with the 'resque:' key prefix.
+ * operations with the {self::$defaultNamespace} key prefix.
*
* @param string $name The name of the method called.
* @param array $args Array of supplied arguments to the method.
@@ -88,7 +105,7 @@ class Resque_Redis extends Redisent
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
- $args[1][0] = 'resque:' . $args[1][0];
+ $args[1][0] = self::$defaultNamespace . $args[1][0];
}
try {
return parent::__call($name, $args[1]);
@@ -16,6 +16,11 @@
*/
class Resque_RedisCluster extends RedisentCluster
{
+ /**
+ * Redis namespace
+ * @var string
+ */
+ private static $defaultNamespace = 'resque:';
/**
* @var array List of all commands in Redis that supply a key as their
* first argument. Used to prefix keys with the Resque namespace.
@@ -76,10 +81,22 @@ class Resque_RedisCluster extends RedisentCluster
// msetnx
// mset
// renamenx
+
+ /**
+ * Set Redis namespace (prefix) default: resque
+ * @param string $namespace
+ */
+ public static function prefix($namespace)
+ {
+ if (strpos($namespace, ':') === false) {
+ $namespace .= ':';
+ }
+ self::$defaultNamespace = $namespace;
+ }
/**
* Magic method to handle all function requests and prefix key based
- * operations with the 'resque:' key prefix.
+ * operations with the '{self::$defaultNamespace}' key prefix.
*
* @param string $name The name of the method called.
* @param array $args Array of supplied arguments to the method.
@@ -88,7 +105,7 @@ class Resque_RedisCluster extends RedisentCluster
public function __call($name, $args) {
$args = func_get_args();
if(in_array($name, $this->keyCommands)) {
- $args[1][0] = 'resque:' . $args[1][0];
+ $args[1][0] = self::$defaultNamespace . $args[1][0];
}
try {
return parent::__call($name, $args[1]);
View
@@ -358,6 +358,7 @@ private function registerSigHandlers()
pcntl_signal(SIGUSR1, array($this, 'killChild'));
pcntl_signal(SIGUSR2, array($this, 'pauseProcessing'));
pcntl_signal(SIGCONT, array($this, 'unPauseProcessing'));
+ pcntl_signal(SIGPIPE, array($this, 'reestablishRedisConnection'));
$this->log('Registered signals', self::LOG_VERBOSE);
}
@@ -381,6 +382,16 @@ public function unPauseProcessing()
}
/**
+ * Signal handler for SIGPIPE, in the event the redis connection has gone away.
+ * Attempts to reconnect to redis, or raises an Exception.
+ */
+ public function reestablishRedisConnection()
+ {
+ $this->log('SIGPIPE received; attempting to reconnect');
+ Resque::redis()->establishConnection();
+ }
+
+ /**
* Schedule a worker for shutdown. Will finish processing the current job
* and when the timeout interval is reached, the worker will shut down.
*/
@@ -65,7 +65,7 @@ public function testQueuedJobReturnsExactSamePassedInArguments()
Resque::enqueue('jobs', 'Test_Job', $args);
$job = Resque_Job::reserve('jobs');
- $this->assertEquals($args, $job->payload['args']);
+ $this->assertEquals($args, $job->getArguments());
}
public function testAfterJobIsReservedItIsRemoved()
@@ -97,9 +97,10 @@ public function testRecreatedJobMatchesExistingJob()
$newJob = Resque_Job::reserve('jobs');
$this->assertEquals($job->payload['class'], $newJob->payload['class']);
- $this->assertEquals($job->payload['args'], $newJob->payload['args']);
+ $this->assertEquals($job->payload['args'], $newJob->getArguments());
}
+
public function testFailedJobExceptionsAreCaught()
{
$payload = array(
@@ -166,4 +167,18 @@ public function testJobWithTearDownCallbackFiresTearDown()
$this->assertTrue(Test_Job_With_TearDown::$called);
}
+
+ public function testJobWithNamespace()
+ {
+ Resque_Redis::prefix('php');
+ $queue = 'jobs';
+ $payload = array('another_value');
+ Resque::enqueue($queue, 'Test_Job_With_TearDown', $payload);
+
+ $this->assertEquals(Resque::queues(), array('jobs'));
+ $this->assertEquals(Resque::size($queue), 1);
+
+ Resque_Redis::prefix('resque');
+ $this->assertEquals(Resque::size($queue), 0);
+ }
}

0 comments on commit 8b37122

Please sign in to comment.