Permalink
Browse files

fix compatibility with phpredis

* implement a fork helper method that closes the connection to redis before forking (instead of resetting after)
  to work around bugs with phpredis/socket fork handling
* phpredis does not automatically typecast to string, so worker name must be typecasted when registering
  • Loading branch information...
1 parent f082ec8 commit 6800fbe5ac8000c617f53c94a3621b74908f52e7 @chrisboulton committed Jan 12, 2013
Showing with 31 additions and 39 deletions.
  1. +1 −1 bin/resque
  2. +28 −15 lib/Resque.php
  3. +2 −23 lib/Resque/Worker.php
View
@@ -68,7 +68,7 @@ if(!empty($COUNT) && $COUNT > 1) {
if($count > 1) {
for($i = 0; $i < $count; ++$i) {
- $pid = pcntl_fork();
+ $pid = Resque::fork();
if($pid == -1) {
die("Could not fork worker ".$i."\n");
}
View
@@ -27,12 +27,6 @@ class Resque
protected static $redisDatabase = 0;
/**
- * @var int PID of current process. Used to detect changes when forking
- * and implement "thread" safety to avoid race conditions.
- */
- protected static $pid = null;
-
- /**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
*
@@ -54,15 +48,7 @@ public static function setBackend($server, $database = 0)
*/
public static function redis()
{
- // Detect when the PID of the current process has changed (from a fork, etc)
- // and force a reconnect to redis.
- $pid = getmypid();
- if (self::$pid !== $pid) {
- self::$redis = null;
- self::$pid = $pid;
- }
-
- if(!is_null(self::$redis)) {
+ if (self::$redis !== null) {
return self::$redis;
}
@@ -76,6 +62,33 @@ public static function redis()
}
/**
+ * fork() helper method for php-resque that handles issues PHP socket
+ * and phpredis have with passing around sockets between child/parent
+ * processes.
+ *
+ * Will close connection to Redis before forking.
+ *
+ * @return int Return vars as per pcntl_fork()
+ */
+ public static function fork()
+ {
+ if(!function_exists('pcntl_fork')) {
+ return -1;
+ }
+
+ // Close the connection to Redis before forking.
+ // This is a workaround for issues phpredis has.
+ self::$redis = null;
+
+ $pid = pcntl_fork();
+ if($pid === -1) {
+ throw new RuntimeException('Unable to fork child worker.');
+ }
+
+ return $pid;
+ }
+
+ /**
* Push a job to the end of a specific queue. If the queue does not
* exist, then create it as well.
*
View
@@ -184,7 +184,7 @@ public function work($interval = 5)
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
- $this->child = $this->fork();
+ $this->child = Resque::fork();
// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
@@ -287,27 +287,6 @@ public function queues($fetch = true)
}
/**
- * Attempt to fork a child process from the parent to run a job in.
- *
- * Return values are those of pcntl_fork().
- *
- * @return int -1 if the fork failed, 0 for the forked child, the PID of the child for the parent.
- */
- private function fork()
- {
- if(!function_exists('pcntl_fork')) {
- return false;
- }
-
- $pid = pcntl_fork();
- if($pid === -1) {
- throw new RuntimeException('Unable to fork child worker.');
- }
-
- return $pid;
- }
-
- /**
* Perform necessary actions to start a worker.
*/
private function startup()
@@ -474,7 +453,7 @@ public function workerPids()
*/
public function registerWorker()
{
- Resque::redis()->sadd('workers', $this);
+ Resque::redis()->sadd('workers', (string)$this);
Resque::redis()->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
}

0 comments on commit 6800fbe

Please sign in to comment.