Navigation Menu

Skip to content

Commit

Permalink
Refactor forking code into Resque_JobStrategy_Interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ebernhardson committed Feb 2, 2013
1 parent aae1683 commit e0c115d
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 50 deletions.
106 changes: 106 additions & 0 deletions lib/Resque/JobStrategy/Fork.php
@@ -0,0 +1,106 @@
<?php
/**
* Seperates the job execution environment from the worker via pcntl_fork
*
* @package Resque/JobStrategy
* @author Chris Boulton <chris@bigcommerce.com>
* @author Erik Bernharsdon <bernhardsonerik@gmail.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_JobStrategy_Fork extends Resque_JobStrategy_InProcess
{
/**
* @param int|null 0 for the forked child, the PID of the child for the parent, or null if no child.
*/
protected $child;

/**
* @param Resque_Worker Instance of Resque_Worker that is starting jobs
*/
protected $worker;

/**
* Set the Resque_Worker instance
*
* @param Resque_Worker $worker
*/
public function setWorker(Resque_Worker $worker)
{
$this->worker = $worker;
}

/**
* Seperate the job from the worker via pcntl_fork
*
* @param Resque_Job $job
*/
public function perform(Resque_Job $job)
{
$this->child = $this->fork();

// Forked and we're the child. Run the job.
if ($this->child === 0) {
parent::perform($job);
exit(0);
}

// Parent process, sit and wait
if($this->child > 0) {
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->worker->updateProcLine($status);
$this->worker->log($status, Resque_Worker::LOG_VERBOSE);

// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}

$this->child = null;
}

/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently working
*/
public function shutdown()
{
if (!$this->child) {
$this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
return;
}

$this->worker->log('Killing child at '.$this->child, Resque_Worker::LOG_VERBOSE);
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->worker->log('Killing child at ' . $this->child, Resque_Worker::LOG_VERBOSE);
posix_kill($this->child, SIGKILL);
$this->child = null;
}
else {
$this->worker->log('Child ' . $this->child . ' not found, restarting.', Resque_Worker::LOG_VERBOSE);
$this->worker->shutdown();
}
}

/**
* Attempt to fork a child process from the parent to run a job in.
*
* Return values are those of pcntl_fork().
*
* @return int 0 for the forked child, or the PID of the child for the parent.
* @throws RuntimeException When pcntl_fork returns -1
*/
private function fork()
{
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}

return $pid;
}
}
48 changes: 48 additions & 0 deletions lib/Resque/JobStrategy/InProcess.php
@@ -0,0 +1,48 @@
<?php
/**
* Runs the job in the same process as Resque_Worker
*
* @package Resque/JobStrategy
* @author Chris Boulton <chris@bigcommerce.com>
* @author Erik Bernharsdon <bernhardsonerik@gmail.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_JobStrategy_InProcess implements Resque_JobStrategy_Interface
{
/**
* @param Resque_Worker Instance of Resque_Worker that is starting jobs
*/
protected $worker;

/**
* Set the Resque_Worker instance
*
* @param Resque_Worker $worker
*/
public function setWorker(Resque_Worker $worker)
{
$this->worker = $worker;
}

/**
* Run the job in the worker process
*
* @param Resque_Job $job
*/
public function perform(Resque_Job $job)
{
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->worker->updateProcLine($status);
$this->worker->log($status, Resque_Worker::LOG_VERBOSE);
$this->worker->perform($job);
}

/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently working
*/
public function shutdown()
{
$this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
}
}
31 changes: 31 additions & 0 deletions lib/Resque/JobStrategy/Interface.php
@@ -0,0 +1,31 @@
<?php
/**
* Interface that all job strategy backends should implement.
*
* @package Resque/JobStrategy
* @author Chris Boulton <chris@bigcommerce.com>
* @author Erik Bernharsdon <bernhardsonerik@gmail.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
interface Resque_JobStrategy_Interface
{
/**
* Set the Resque_Worker instance
*
* @param Resque_Worker $worker
*/
function setWorker(Resque_Worker $worker);

/**
* Seperates the job execution context from the worker and calls $worker->perform($job).
*
* @param Resque_Job $job
*/
function perform(Resque_Job $job);

/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently working
*/
function shutdown();
}
70 changes: 20 additions & 50 deletions lib/Resque/Worker.php
Expand Up @@ -48,11 +48,6 @@ class Resque_Worker
*/
private $currentJob = null;

/**
* @var int Process ID of child worker processes.
*/
private $child = null;

/**
* Return all workers known to Resque as instantiated instances.
* @return array
Expand Down Expand Up @@ -137,6 +132,23 @@ public function __construct($queues)
}
$this->hostname = $hostname;
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);

if (function_exists('pcntl_fork')) {
$this->setJobStrategy(new Resque_JobStrategy_Fork);
} else {
$this->setJobStrategy(new Resque_JobStrategy_InProcess);
}
}

/**
* Set the JobStrategy used to seperate the job execution context from the worker
*
* @param Resque_JobStrategy_Interface
*/
public function setJobStrategy(Resque_JobStrategy_Interface $jobStrategy)
{
$this->jobStrategy = $jobStrategy;
$this->jobStrategy->setWorker($this);
}

/**
Expand Down Expand Up @@ -184,36 +196,8 @@ public function work($interval = 5)
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);

$this->child = Resque::fork();
$this->jobStrategy->perform($job);

// Forked and we're the child. Run the job.
if ($this->child === 0 || $this->child === false) {
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);
$this->perform($job);
if ($this->child === 0) {
exit(0);
}
}

if($this->child > 0) {
// Parent process, sit and wait
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->updateProcLine($status);
$this->log($status, self::LOG_VERBOSE);

// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}

$this->child = null;
$this->doneWorking();
}

Expand Down Expand Up @@ -304,7 +288,7 @@ private function startup()
*
* @param string $status The updated process title.
*/
private function updateProcLine($status)
public function updateProcLine($status)
{
if(function_exists('setproctitle')) {
setproctitle('resque-' . Resque::VERSION . ': ' . $status);
Expand Down Expand Up @@ -391,21 +375,7 @@ public function shutdownNow()
*/
public function killChild()
{
if(!$this->child) {
$this->log('No child to kill.', self::LOG_VERBOSE);
return;
}

$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
posix_kill($this->child, SIGKILL);
$this->child = null;
}
else {
$this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
$this->shutdown();
}
$this->jobStrategy->shutdown();
}

/**
Expand Down

0 comments on commit e0c115d

Please sign in to comment.