Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Fastcgi #81

Open
wants to merge 8 commits into from

2 participants

ebernhardson Chris Boulton
ebernhardson

As discussed in issue #76 this is a refactor of the the current fork/in-process strategy into independent objects along with the addition of a fastcgi strategy to utilize php-fpm.

A couple notes:

To pass the job between the worker and fastcgi I settled on serializing the job and passing as an environment variable. Specifically when failing a job both the worker and job instances are required. I thought it best to serialize and pass both rather than make changes to the failure code.

I pulled the FCGIClient into the repository, currently under the BitTP namespace as that is where i found it. I had to make a few changes to this class to make it fit our usage; Specifically it was originally written using non-blocking sockets and busy-waiting to read the socket. For our purposes a blocking fastcgi client is more appropriate so i made that change. It may be more appropriate to re-namespace or put into a repo on its own to be referenced by composer.

I could not come up with a good way to programmatically test this code. Its fairly straight forward code, and I've run numerous tests by hand, but needs automated on-going testing one way or the other. I'm open to suggestions which i can flush out into proper tests.

Chris Boulton

This looks good :thumbsup:.

Before I bring it in, I want to work on externalizing the FCGIClient (essentially creating a Composer package for it), and obviously work on the testing. If you don't have the time to do this, I'll probably begin work on getting this down sometime in the next two weeks.

Nice job!

ebernhardson

The FastCGI client has been extracted into a suggested package, https://github.com/ebernhardson/fastcgi , and the pull request has been updated.

Still need to figure out a full featured way to test the JobStrategy interactions.

Chris Boulton
Owner

Awesome. :thumbsup:

Sorry, I meant to spend some time on this but dropped the ball because I've been busy with work and life at the moment. I definitely appreciate the effort you're putting into this.

Jesse homeyjd referenced this pull request from a commit in dmvorg/php-resque
Jesse homeyjd refactoring to JobStrategy, adding FastCGI option, fixing unit test i…
…ssues

Merging changes from chrisboulton#81
6836c76
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
36 README.md
View
@@ -223,7 +223,15 @@ the `COUNT` environment variable:
$ COUNT=5 bin/resque
-### Forking ###
+### Job Srategys ###
+
+Php-resque implements multiple ways to seperate the worker process
+from the job process to improce resilience. Supported platforms
+default to the fork strategy, falling back to in-process execution.
+Specific strategys can be chosen by supplyingthe `JOB_STRATEGY`
+environment variable.
+
+#### Forking ####
Similarly to the Ruby versions, supported platforms will immediately
fork after picking up a job. The forked child will exit as soon as
@@ -233,6 +241,30 @@ The difference with php-resque is that if a forked child does not
exit nicely (PHP error or such), php-resque will automatically fail
the job.
+ $ JOB_STRATEGY=fork php resque.php
+
+#### Fastcgi ####
+
+The fastcgi strategy executes jobs over a fastcgi connection to php-fpm.
+It may offer a lower overhead per job in environments with lots of very short
+jobs. To use fastcgi you must install the suggested composer package
+`ebernhardson/fastcgi`
+
+ $ JOB_STRATEGY=fastcgi php resque.php
+
+Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the
+location of the php-fpm server. This can either be a host:port combination
+or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used
+to receive and run the job in the php-fpm process.
+
+#### In Process ####
+
+For cases when the other two strategys are not available the in-process
+strategy will run jobs in the same process as the worker. This is not
+recommended as failures in the job may turn into failures in the worker.
+
+ $ JOB_STRATEGY=inprocess php resque.php
+
### Signals ###
Signals also work on supported platforms exactly as in the Ruby
@@ -370,4 +402,4 @@ Called after a job has been queued using the `Resque::enqueue` method. Arguments
* maetl
* Matt Heath
* jjfrey
-* scragg0x
+* scragg0x
40 bin/resque
View
@@ -66,6 +66,40 @@ if(!empty($COUNT) && $COUNT > 1) {
$count = $COUNT;
}
+$jobStrategy=null;
+$JOB_STRATEGY = getenv('JOB_STRATEGY');
+switch($JOB_STRATEGY) {
+ case 'inprocess':
+ $jobStrategy = new Resque_JobStrategy_InProcess;
+ break;
+ case 'fork':
+ $jobStrategy = new Resque_JobStrategy_Fork;
+ break;
+ case 'fastcgi':
+ $fastcgiLocation = '127.0.0.1:9000';
+ $FASTCGI_LOCATION = getenv('FASTCGI_LOCATION');
+ if (!empty($FASTCGI_LOCATION)) {
+ $fastcgiLocation = $FASTCGI_LOCATION;
+ }
+
+ $fastcgiScript = dirname(__FILE__).'/extras/fastcgi_worker.php';
+ $FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT');
+ if (!empty($FASTCGI_SCRIPT)) {
+ $fastcgiScript = $FASTCGI_SCRIPT;
+ }
+
+ require_once dirname(__FILE__).'/lib/Resque/JobStrategy/Fastcgi.php';
+ $jobStrategy = new Resque_JobStrategy_Fastcgi(
+ $fastcgiLocation,
+ $fastcgiScript,
+ array(
+ 'APP_INCLUDE' => $APP_INCLUDE,
+ 'REDIS_BACKEND' => $REDIS_BACKEND,
+ )
+ );
+ break;
+}
+
if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = Resque::fork();
@@ -77,6 +111,9 @@ if($count > 1) {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
+ if ($jobStrategy) {
+ $worker->setJobStrategy($jobStrategy);
+ }
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
break;
@@ -88,6 +125,9 @@ else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
+ if ($jobStrategy) {
+ $worker->setJobStrategy($jobStrategy);
+ }
$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
5 composer.json
View
@@ -23,7 +23,8 @@
},
"suggest": {
"ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.",
- "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available."
+ "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.",
+ "ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm."
},
"require-dev": {
"phpunit/phpunit": "3.7.*"
@@ -36,4 +37,4 @@
"Resque": "lib"
}
}
-}
+}
30 extras/fastcgi_worker.php
View
@@ -0,0 +1,30 @@
+<?php
+
+if (!isset($_SERVER['RESQUE_JOB'])) {
+ header('Status: 500 No Job');
+ return;
+}
+
+require_once dirname(__FILE__).'/../lib/Resque.php';
+require_once dirname(__FILE__).'/../lib/Resque/Worker.php';
+
+if (isset($_SERVER['REDIS_BACKEND'])) {
+ Resque::setBackend($_SERVER['REDIS_BACKEND']);
+}
+
+try {
+ if (isset($_SERVER['APP_INCLUDE'])) {
+ require_once $_SERVER['APP_INCLUDE'];
+ }
+
+ $job = unserialize(urldecode($_SERVER['RESQUE_JOB']));
+ $job->worker->perform($job);
+} catch (\Exception $e) {
+ if (isset($job)) {
+ $job->fail($e);
+ } else {
+ header('Status: 500');
+ }
+}
+
+?>
113 lib/Resque/JobStrategy/Fastcgi.php
View
@@ -0,0 +1,113 @@
+<?php
+
+use EBernhardson\FastCGI\Client;
+use EBernhardson\FastCGI\CommunicationException;
+
+/**
+ * @package Resque/JobStrategy
+ * @author Erik Bernhardson <bernhardsonerik@gmail.com>
+ * @license http://www.opensource.org/licenses/mit-license.php
+ */
+class Resque_JobStrategy_Fastcgi implements Resque_JobStrategy_Interface
+{
+ /**
+ * @var bool True when waiting for a response from fcgi server
+ */
+ private $waiting = false;
+
+ /**
+ * @var array Default enironment for FCGI requests
+ */
+ protected $requestData = array(
+ 'GATEWAY_INTERFACE' => 'FastCGI/1.0',
+ 'REQUEST_METHOD' => 'GET',
+ 'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev',
+ 'REMOTE_ADDR' => '127.0.0.1',
+ 'REMOTE_PORT' => 8888,
+ 'SERVER_ADDR' => '127.0.0.1',
+ 'SERVER_PORT' => 8888,
+ 'SERVER_PROTOCOL' => 'HTTP/1.1'
+ );
+
+ /**
+ * @param string $location When the location contains a `:` it will be considered a host/port pair
+ * otherwise a unix socket path
+ * @param string $script Absolute path to the script that will load resque and perform the job
+ * @param array $environment Additional environment variables available in $_SERVER to the fcgi script
+ */
+ public function __construct($location, $script, $environment = array())
+ {
+ $this->location = $location;
+
+ $port = false;
+ if (false !== strpos($location, ':')) {
+ list($location, $port) = explode(':', $location, 2);
+ }
+
+ $this->fcgi = new Client($location, $port);
+ $this->fcgi->setKeepAlive(true);
+
+ $this->requestData = $environment + $this->requestData + array(
+ 'SCRIPT_FILENAME' => $script,
+ 'SERVER_NAME' => php_uname('n'),
+ 'RESQUE_DIR' => __DIR__.'/../../../',
+ );
+ }
+
+ /**
+ * @param Resque_Worker $worker
+ */
+ public function setWorker(Resque_Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * Executes the provided job over a fastcgi connection
+ *
+ * @param Resque_Job $job
+ */
+ public function perform(Resque_Job $job)
+ {
+ $status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T');
+ $this->worker->updateProcLine($status);
+ $this->worker->log($status, Resque_Worker::LOG_VERBOSE);
+
+ $this->waiting = true;
+
+ try {
+ $this->fcgi->request(array(
+ 'RESQUE_JOB' => urlencode(serialize($job)),
+ ) + $this->requestData, '');
+
+ $response = $this->fcgi->response();
+ $this->waiting = false;
+ } catch (CommunicationException $e) {
+ $this->waiting = false;
+ $job->fail($e);
+ return;
+ }
+
+ if ($response['statusCode'] !== 200) {
+ $job->fail(new Exception(sprintf(
+ 'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s',
+ $response['headers']['status'],
+ $response['body'],
+ $response['stderr']
+ )));
+ }
+ }
+
+ /**
+ * Shutdown the worker process.
+ */
+ public function shutdown()
+ {
+ if ($this->waiting === false) {
+ $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
+ } else {
+ $this->worker->log('Closing fcgi connection with job in progress.', Resque_Worker::LOG_VERBOSE);
+ }
+ $this->fcgi->close();
+ }
+}
106 lib/Resque/JobStrategy/Fork.php
View
@@ -0,0 +1,106 @@
+<?php
+/**
+ * Seperates the job execution environment from the worker via pcntl_fork
+ *
+ * @package Resque/JobStrategy
+ * @author Chris Boulton <chris@bigcommerce.com>
+ * @author Erik Bernharsdon <bernhardsonerik@gmail.com>
+ * @license http://www.opensource.org/licenses/mit-license.php
+ */
+class Resque_JobStrategy_Fork extends Resque_JobStrategy_InProcess
+{
+ /**
+ * @param int|null 0 for the forked child, the PID of the child for the parent, or null if no child.
+ */
+ protected $child;
+
+ /**
+ * @param Resque_Worker Instance of Resque_Worker that is starting jobs
+ */
+ protected $worker;
+
+ /**
+ * Set the Resque_Worker instance
+ *
+ * @param Resque_Worker $worker
+ */
+ public function setWorker(Resque_Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * Seperate the job from the worker via pcntl_fork
+ *
+ * @param Resque_Job $job
+ */
+ public function perform(Resque_Job $job)
+ {
+ $this->child = $this->fork();
+
+ // Forked and we're the child. Run the job.
+ if ($this->child === 0) {
+ parent::perform($job);
+ exit(0);
+ }
+
+ // Parent process, sit and wait
+ if($this->child > 0) {
+ $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
+ $this->worker->updateProcLine($status);
+ $this->worker->log($status, Resque_Worker::LOG_VERBOSE);
+
+ // Wait until the child process finishes before continuing
+ pcntl_wait($status);
+ $exitStatus = pcntl_wexitstatus($status);
+ if($exitStatus !== 0) {
+ $job->fail(new Resque_Job_DirtyExitException(
+ 'Job exited with exit code ' . $exitStatus
+ ));
+ }
+ }
+
+ $this->child = null;
+ }
+
+ /**
+ * Force an immediate shutdown of the worker, killing any child jobs
+ * currently working
+ */
+ public function shutdown()
+ {
+ if (!$this->child) {
+ $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
+ return;
+ }
+
+ $this->worker->log('Killing child at '.$this->child, Resque_Worker::LOG_VERBOSE);
+ if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
+ $this->worker->log('Killing child at ' . $this->child, Resque_Worker::LOG_VERBOSE);
+ posix_kill($this->child, SIGKILL);
+ $this->child = null;
+ }
+ else {
+ $this->worker->log('Child ' . $this->child . ' not found, restarting.', Resque_Worker::LOG_VERBOSE);
+ $this->worker->shutdown();
+ }
+ }
+
+ /**
+ * Attempt to fork a child process from the parent to run a job in.
+ *
+ * Return values are those of pcntl_fork().
+ *
+ * @return int 0 for the forked child, or the PID of the child for the parent.
+ * @throws RuntimeException When pcntl_fork returns -1
+ */
+ private function fork()
+ {
+ $pid = pcntl_fork();
+ if($pid === -1) {
+ throw new RuntimeException('Unable to fork child worker.');
+ }
+
+ return $pid;
+ }
+}
48 lib/Resque/JobStrategy/InProcess.php
View
@@ -0,0 +1,48 @@
+<?php
+/**
+ * Runs the job in the same process as Resque_Worker
+ *
+ * @package Resque/JobStrategy
+ * @author Chris Boulton <chris@bigcommerce.com>
+ * @author Erik Bernharsdon <bernhardsonerik@gmail.com>
+ * @license http://www.opensource.org/licenses/mit-license.php
+ */
+class Resque_JobStrategy_InProcess implements Resque_JobStrategy_Interface
+{
+ /**
+ * @param Resque_Worker Instance of Resque_Worker that is starting jobs
+ */
+ protected $worker;
+
+ /**
+ * Set the Resque_Worker instance
+ *
+ * @param Resque_Worker $worker
+ */
+ public function setWorker(Resque_Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * Run the job in the worker process
+ *
+ * @param Resque_Job $job
+ */
+ public function perform(Resque_Job $job)
+ {
+ $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
+ $this->worker->updateProcLine($status);
+ $this->worker->log($status, Resque_Worker::LOG_VERBOSE);
+ $this->worker->perform($job);
+ }
+
+ /**
+ * Force an immediate shutdown of the worker, killing any child jobs
+ * currently working
+ */
+ public function shutdown()
+ {
+ $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE);
+ }
+}
31 lib/Resque/JobStrategy/Interface.php
View
@@ -0,0 +1,31 @@
+<?php
+/**
+ * Interface that all job strategy backends should implement.
+ *
+ * @package Resque/JobStrategy
+ * @author Chris Boulton <chris@bigcommerce.com>
+ * @author Erik Bernharsdon <bernhardsonerik@gmail.com>
+ * @license http://www.opensource.org/licenses/mit-license.php
+ */
+interface Resque_JobStrategy_Interface
+{
+ /**
+ * Set the Resque_Worker instance
+ *
+ * @param Resque_Worker $worker
+ */
+ function setWorker(Resque_Worker $worker);
+
+ /**
+ * Seperates the job execution context from the worker and calls $worker->perform($job).
+ *
+ * @param Resque_Job $job
+ */
+ function perform(Resque_Job $job);
+
+ /**
+ * Force an immediate shutdown of the worker, killing any child jobs
+ * currently working
+ */
+ function shutdown();
+}
70 lib/Resque/Worker.php
View
@@ -49,11 +49,6 @@ class Resque_Worker
private $currentJob = null;
/**
- * @var int Process ID of child worker processes.
- */
- private $child = null;
-
- /**
* Return all workers known to Resque as instantiated instances.
* @return array
*/
@@ -137,6 +132,23 @@ public function __construct($queues)
}
$this->hostname = $hostname;
$this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues);
+
+ if (function_exists('pcntl_fork')) {
+ $this->setJobStrategy(new Resque_JobStrategy_Fork);
+ } else {
+ $this->setJobStrategy(new Resque_JobStrategy_InProcess);
+ }
+ }
+
+ /**
+ * Set the JobStrategy used to seperate the job execution context from the worker
+ *
+ * @param Resque_JobStrategy_Interface
+ */
+ public function setJobStrategy(Resque_JobStrategy_Interface $jobStrategy)
+ {
+ $this->jobStrategy = $jobStrategy;
+ $this->jobStrategy->setWorker($this);
}
/**
@@ -184,36 +196,8 @@ public function work($interval = 5)
Resque_Event::trigger('beforeFork', $job);
$this->workingOn($job);
- $this->child = Resque::fork();
+ $this->jobStrategy->perform($job);
- // Forked and we're the child. Run the job.
- if ($this->child === 0 || $this->child === false) {
- $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
- $this->updateProcLine($status);
- $this->log($status, self::LOG_VERBOSE);
- $this->perform($job);
- if ($this->child === 0) {
- exit(0);
- }
- }
-
- if($this->child > 0) {
- // Parent process, sit and wait
- $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T');
- $this->updateProcLine($status);
- $this->log($status, self::LOG_VERBOSE);
-
- // Wait until the child process finishes before continuing
- pcntl_wait($status);
- $exitStatus = pcntl_wexitstatus($status);
- if($exitStatus !== 0) {
- $job->fail(new Resque_Job_DirtyExitException(
- 'Job exited with exit code ' . $exitStatus
- ));
- }
- }
-
- $this->child = null;
$this->doneWorking();
}
@@ -304,7 +288,7 @@ private function startup()
*
* @param string $status The updated process title.
*/
- private function updateProcLine($status)
+ public function updateProcLine($status)
{
if(function_exists('setproctitle')) {
setproctitle('resque-' . Resque::VERSION . ': ' . $status);
@@ -391,21 +375,7 @@ public function shutdownNow()
*/
public function killChild()
{
- if(!$this->child) {
- $this->log('No child to kill.', self::LOG_VERBOSE);
- return;
- }
-
- $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
- if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) {
- $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE);
- posix_kill($this->child, SIGKILL);
- $this->child = null;
- }
- else {
- $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE);
- $this->shutdown();
- }
+ $this->jobStrategy->shutdown();
}
/**
Something went wrong with that request. Please try again.