Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fastcgi #81

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 34 additions & 2 deletions README.md
Expand Up @@ -223,7 +223,15 @@ the `COUNT` environment variable:

$ COUNT=5 bin/resque

### Forking ###
### Job Srategys ###

Php-resque implements multiple ways to seperate the worker process
from the job process to improce resilience. Supported platforms
default to the fork strategy, falling back to in-process execution.
Specific strategys can be chosen by supplyingthe `JOB_STRATEGY`
environment variable.

#### Forking ####

Similarly to the Ruby versions, supported platforms will immediately
fork after picking up a job. The forked child will exit as soon as
Expand All @@ -233,6 +241,30 @@ The difference with php-resque is that if a forked child does not
exit nicely (PHP error or such), php-resque will automatically fail
the job.

$ JOB_STRATEGY=fork php resque.php

#### Fastcgi ####

The fastcgi strategy executes jobs over a fastcgi connection to php-fpm.
It may offer a lower overhead per job in environments with lots of very short
jobs. To use fastcgi you must install the suggested composer package
`ebernhardson/fastcgi`

$ JOB_STRATEGY=fastcgi php resque.php

Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the
location of the php-fpm server. This can either be a host:port combination
or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used
to receive and run the job in the php-fpm process.

#### In Process ####

For cases when the other two strategys are not available the in-process
strategy will run jobs in the same process as the worker. This is not
recommended as failures in the job may turn into failures in the worker.

$ JOB_STRATEGY=inprocess php resque.php

### Signals ###

Signals also work on supported platforms exactly as in the Ruby
Expand Down Expand Up @@ -370,4 +402,4 @@ Called after a job has been queued using the `Resque::enqueue` method. Arguments
* maetl
* Matt Heath
* jjfrey
* scragg0x
* scragg0x
40 changes: 40 additions & 0 deletions bin/resque
Expand Up @@ -66,6 +66,40 @@ if(!empty($COUNT) && $COUNT > 1) {
$count = $COUNT;
}

$jobStrategy=null;
$JOB_STRATEGY = getenv('JOB_STRATEGY');
switch($JOB_STRATEGY) {
case 'inprocess':
$jobStrategy = new Resque_JobStrategy_InProcess;
break;
case 'fork':
$jobStrategy = new Resque_JobStrategy_Fork;
break;
case 'fastcgi':
$fastcgiLocation = '127.0.0.1:9000';
$FASTCGI_LOCATION = getenv('FASTCGI_LOCATION');
if (!empty($FASTCGI_LOCATION)) {
$fastcgiLocation = $FASTCGI_LOCATION;
}

$fastcgiScript = dirname(__FILE__).'/extras/fastcgi_worker.php';
$FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT');
if (!empty($FASTCGI_SCRIPT)) {
$fastcgiScript = $FASTCGI_SCRIPT;
}

require_once dirname(__FILE__).'/lib/Resque/JobStrategy/Fastcgi.php';
$jobStrategy = new Resque_JobStrategy_Fastcgi(
$fastcgiLocation,
$fastcgiScript,
array(
'APP_INCLUDE' => $APP_INCLUDE,
'REDIS_BACKEND' => $REDIS_BACKEND,
)
);
break;
}

if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = Resque::fork();
Expand All @@ -77,6 +111,9 @@ if($count > 1) {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
if ($jobStrategy) {
$worker->setJobStrategy($jobStrategy);
}
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
break;
Expand All @@ -88,6 +125,9 @@ else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
if ($jobStrategy) {
$worker->setJobStrategy($jobStrategy);
}

$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
Expand Down
5 changes: 3 additions & 2 deletions composer.json
Expand Up @@ -23,7 +23,8 @@
},
"suggest": {
"ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available."
"ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.",
"ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm."
},
"require-dev": {
"phpunit/phpunit": "3.7.*"
Expand All @@ -36,4 +37,4 @@
"Resque": "lib"
}
}
}
}
30 changes: 30 additions & 0 deletions extras/fastcgi_worker.php
@@ -0,0 +1,30 @@
<?php

if (!isset($_SERVER['RESQUE_JOB'])) {
header('Status: 500 No Job');
return;
}

require_once dirname(__FILE__).'/../lib/Resque.php';
require_once dirname(__FILE__).'/../lib/Resque/Worker.php';

if (isset($_SERVER['REDIS_BACKEND'])) {
Resque::setBackend($_SERVER['REDIS_BACKEND']);
}

try {
if (isset($_SERVER['APP_INCLUDE'])) {
require_once $_SERVER['APP_INCLUDE'];
}

$job = unserialize(urldecode($_SERVER['RESQUE_JOB']));
$job->worker->perform($job);
} catch (\Exception $e) {
if (isset($job)) {
$job->fail($e);
} else {
header('Status: 500');
}
}

?>
113 changes: 113 additions & 0 deletions lib/Resque/JobStrategy/Fastcgi.php
@@ -0,0 +1,113 @@
<?php

use EBernhardson\FastCGI\Client;
use EBernhardson\FastCGI\CommunicationException;

/**
* @package Resque/JobStrategy
* @author Erik Bernhardson <bernhardsonerik@gmail.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_JobStrategy_Fastcgi implements Resque_JobStrategy_Interface
{
/**
* @var bool True when waiting for a response from fcgi server
*/
private $waiting = false;

/**
* @var array Default enironment for FCGI requests
*/
protected $requestData = array(
'GATEWAY_INTERFACE' => 'FastCGI/1.0',
'REQUEST_METHOD' => 'GET',
'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev',
'REMOTE_ADDR' => '127.0.0.1',
'REMOTE_PORT' => 8888,
'SERVER_ADDR' => '127.0.0.1',
'SERVER_PORT' => 8888,
'SERVER_PROTOCOL' => 'HTTP/1.1'
);

/**
* @param string $location When the location contains a `:` it will be considered a host/port pair
* otherwise a unix socket path
* @param string $script Absolute path to the script that will load resque and perform the job
* @param array $environment Additional environment variables available in $_SERVER to the fcgi script
*/
public function __construct($location, $script, $environment = array())
{
$this->location = $location;

$port = false;
if (false !== strpos($location, ':')) {
list($location, $port) = explode(':', $location, 2);
}

$this->fcgi = new Client($location, $port);
$this->fcgi->setKeepAlive(true);

$this->requestData = $environment + $this->requestData + array(
'SCRIPT_FILENAME' => $script,
'SERVER_NAME' => php_uname('n'),
'RESQUE_DIR' => __DIR__.'/../../../',
);
}

/**
* @param Resque_Worker $worker
*/
public function setWorker(Resque_Worker $worker)
{
$this->worker = $worker;
}

/**
* Executes the provided job over a fastcgi connection
*
* @param Resque_Job $job
*/
public function perform(Resque_Job $job)
{
$status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T');
$this->worker->updateProcLine($status);
$this->worker->log($status, Resque_Worker::LOG_VERBOSE);

$this->waiting = true;

try {
$this->fcgi->request(array(
'RESQUE_JOB' => urlencode(serialize($job)),
) + $this->requestData, '');

$response = $this->fcgi->response();
$this->waiting = false;
} catch (CommunicationException $e) {
$this->waiting = false;
$job->fail($e);
return;
}

if ($response['statusCode'] !== 200) {
$job->fail(new Exception(sprintf(
'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s',
$response['headers']['status'],
$response['body'],
$response['stderr']
)));
}
}

/**
* Shutdown the worker process.
*/
public function shutdown()
{
if ($this->waiting === false) {
$this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
} else {
$this->worker->log('Closing fcgi connection with job in progress.', Resque_Worker::LOG_VERBOSE);
}
$this->fcgi->close();
}
}
106 changes: 106 additions & 0 deletions lib/Resque/JobStrategy/Fork.php
@@ -0,0 +1,106 @@
<?php
/**
* Seperates the job execution environment from the worker via pcntl_fork
*
* @package Resque/JobStrategy
* @author Chris Boulton <chris@bigcommerce.com>
* @author Erik Bernharsdon <bernhardsonerik@gmail.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_JobStrategy_Fork extends Resque_JobStrategy_InProcess
{
/**
* @param int|null 0 for the forked child, the PID of the child for the parent, or null if no child.
*/
protected $child;

/**
* @param Resque_Worker Instance of Resque_Worker that is starting jobs
*/
protected $worker;

/**
* Set the Resque_Worker instance
*
* @param Resque_Worker $worker
*/
public function setWorker(Resque_Worker $worker)
{
$this->worker = $worker;
}

/**
* Seperate the job from the worker via pcntl_fork
*
* @param Resque_Job $job
*/
public function perform(Resque_Job $job)
{
$this->child = $this->fork();

// Forked and we're the child. Run the job.
if ($this->child === 0) {
parent::perform($job);
exit(0);
}

// Parent process, sit and wait
if($this->child > 0) {
$status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
$this->worker->updateProcLine($status);
$this->worker->log($status, Resque_Worker::LOG_VERBOSE);

// Wait until the child process finishes before continuing
pcntl_wait($status);
$exitStatus = pcntl_wexitstatus($status);
if($exitStatus !== 0) {
$job->fail(new Resque_Job_DirtyExitException(
'Job exited with exit code ' . $exitStatus
));
}
}

$this->child = null;
}

/**
* Force an immediate shutdown of the worker, killing any child jobs
* currently working
*/
public function shutdown()
{
if (!$this->child) {
$this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
return;
}

$this->worker->log('Killing child at '.$this->child, Resque_Worker::LOG_VERBOSE);
if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
$this->worker->log('Killing child at ' . $this->child, Resque_Worker::LOG_VERBOSE);
posix_kill($this->child, SIGKILL);
$this->child = null;
}
else {
$this->worker->log('Child ' . $this->child . ' not found, restarting.', Resque_Worker::LOG_VERBOSE);
$this->worker->shutdown();
}
}

/**
* Attempt to fork a child process from the parent to run a job in.
*
* Return values are those of pcntl_fork().
*
* @return int 0 for the forked child, or the PID of the child for the parent.
* @throws RuntimeException When pcntl_fork returns -1
*/
private function fork()
{
$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
}

return $pid;
}
}