Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost jobs in multiple workers with count > 1 (issue #32) #43

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion lib/Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ class Resque
*/
public static $redis = null;

/**
* @var redis backend server address
*/
public static $server = null;

/**
* @var integer Db on which the Redis backend server is selected
*/
public static $database = null;

/**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
Expand All @@ -28,6 +38,10 @@ class Resque
*/
public static function setBackend($server, $database = 0)
{
//save the params for later use
self::$server = $server;
self::$database = $database;

if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server);
Expand All @@ -41,14 +55,32 @@ public static function setBackend($server, $database = 0)
self::redis()->select($database);
}

/**
* Reconnect to the redis backend specified in during __construct
* @return Resque_Redis Instance of Resque_Redis. (redis ressource handle)
*/
public static function ResetBackend()
{
if(is_array(self::$server)) {
self::$redis = new Resque_RedisCluster(self::$server);
}
else {
list($host, $port) = explode(':', self::$server);
self::$redis = new Resque_Redis($host, $port);
}
self::$redis->select(self::$database);
return self::$redis;
}

/**
* Return an instance of the Resque_Redis class instantiated for Resque.
*
* @return Resque_Redis Instance of Resque_Redis.
*/
public static function redis()
{
if(is_null(self::$redis)) {
//try to reset the backend specified during __construct
if(is_null(self::$redis) && is_null(self::ResetBackend())) {
self::setBackend('localhost:6379');
}

Expand Down
5 changes: 4 additions & 1 deletion lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Resque_Worker
* @var Resque_Job Current job, if any, being processed by this worker.
*/
private $currentJob = null;

/**
* @var int Process ID of child worker processes.
*/
Expand Down Expand Up @@ -193,6 +193,9 @@ public function work($interval = 5)

// Forked and we're the child. Run the job.
if($this->child === 0 || $this->child === false) {
//reconnect to Redis in child to avoid sharing same connection with parent process
// leading to race condition reading from the same socket, thus errors
Resque::ResetBackend();
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
Expand Down
54 changes: 36 additions & 18 deletions resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,48 @@
$count = $COUNT;
}

if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork();
if($pid == -1) {
die("Could not fork worker ".$i."\n");
}
// Child, start the worker
else if(!$pid) {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
break;
}
}
$PIDFILE = getenv('PIDFILE');
if($count > 1) {//start multiple workers by forking this process
$pids = '';
for($i = 0; $i < $count; ++$i) {
$pid = pcntl_fork();
if($pid === -1) {
die("Could not fork worker ".$i."\n");
}
// Child, start the worker
else if($pid === 0) {
//reconnect to Redis in child to avoid sharing same connection with parent process
// leading to race condition reading from the same socket, thus errors
Resque::ResetBackend();
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
//to kill self before exit()
register_shutdown_function(create_function('$pars', 'posix_kill(getmypid(), SIGKILL);'), array());
$worker->work($interval);
//to avoid foreach loop in child process, exit with its order of creation $i
exit($i);
}
}

if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
die('Could not write PID information to ' . $PIDFILE);
}

//in parent, wait for all child processes to terminate
while (pcntl_waitpid(0, $status) != -1) {
$status = pcntl_wexitstatus($status);
fwrite(STDOUT, '*** Worker '.$status." terminated\n");
}
}
// Start a single worker
else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;

$PIDFILE = getenv('PIDFILE');

if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
die('Could not write PID information to ' . $PIDFILE);
Expand Down