Skip to content

Commit

Permalink
Merge pull request #15 from Expensify/dbarrett_requestIDTweaks
Browse files Browse the repository at this point in the history
Add simple demo framework for Bedrock::Jobs; add requestID; add retries
  • Loading branch information
iwiznia committed Apr 17, 2017
2 parents 71c610f + 4433ef1 commit 52ba40d
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 111 deletions.
179 changes: 97 additions & 82 deletions bin/BedrockWorkerManager.php
Expand Up @@ -16,89 +16,77 @@
* After N cycle in the loop, we exit
* If the versionWatchFile modified time changes, we stop processing new jobs and exit after finishing all running jobs.
*
* Usage: `Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --maxIterations=<loopIteration> --versionWatchFile=<file> --writeConsistency=<consistency>]`
* Usage: `Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --failoverHost=<host> --failoverPort=<port> --maxIterations=<iteration> --versionWatchFile=<file> --writeConsistency=<consistency>]`
*/

// Verify it's being started correctly
if (php_sapi_name() !== "cli") {
// Throw an exception rather than just output because we assume this is
// being executed on a webserver, so no STDOUT. Hopefully they've
// configured a general uncaught exception handler, and this will trigger
// that.
throw new Exception('This script is cli only');
}

$options = getopt('', ['host::', 'port::', 'maxLoad::', 'maxIterations::', 'jobName::', 'logger::', 'stats::', 'workerPath::', 'versionWatchFile::', 'writeConsistency::']);
$jobName = isset($options['jobName']) ? $options['jobName'] : null;
$maxLoad = isset($options['maxLoad']) && floatval($options['maxLoad']) ? floatval($options['maxLoad']) : 0;
$maxLoopIteration = isset($options['maxIterations']) && intval($options['maxIterations']) ? intval($options['maxIterations']) : 0;
if (!$maxLoopIteration) {
$maxLoopIteration = 1000;
}
$bedrockConfig = [];
if (isset($options['host'])) {
$bedrockConfig['host'] = $options['host'];
}
if (isset($options['port'])) {
$bedrockConfig['port'] = $options['port'];
}
if (isset($options['logger'])) {
$bedrockConfig['logger'] = $options['logger'];
}
if (isset($options['stats'])) {
$bedrockConfig['stats'] = $options['stats'];
}
if (isset($options['connectionTimeout'])) {
$bedrockConfig['connectionTimeout'] = $options['connectionTimeout'];
}
if (isset($options['readTimeout'])) {
$bedrockConfig['readTimeout'] = $options['readTimeout'];
}
if (isset($options['failoverHost'])) {
$bedrockConfig['failoverHost'] = $options['failoverHost'];
}
if (isset($options['failoverPort'])) {
$bedrockConfig['failoverPort'] = $options['failoverPort'];
}
if (isset($options['writeConsistency'])) {
$bedrockConfig['writeConsistency'] = $options['writeConsistency'];
}
$versionWatchFile = isset($options['versionWatchFile']) ? $options['versionWatchFile'] : null;
$workerPath = isset($options['workerPath']) ? $options['workerPath'] : null;
if (!$jobName || !$maxLoad || !$workerPath) {
throw new Exception('Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --maxIterations=<loopIteration> --writeConsistency=<consistency>]');
}
if ($maxLoad <= 0) {
throw new Exception('Maximum load must be greater than zero');
// Parse the command line and verify the required settings are provided
$options = getopt('', ['host::', 'port::', 'failoverHost::', 'failoverPort::', 'maxLoad::', 'maxIterations::', 'jobName::', 'logger::', 'stats::', 'workerPath::', 'versionWatchFile::', 'writeConsistency::']);
$workerPath = @$options['workerPath'];
if (!$workerPath) {
echo "Usage: sudo -u user php ./bin/BedrockWorkerManager.php --workerPath=<workerPath> [--jobName=<jobName> --maxLoad=<maxLoad> --host=<host> --port=<port> --maxIterations=<iteration> --writeConsistency=<consistency>]\r\n";
exit(1);
}

Client::configure($bedrockConfig);
// Add defaults
$jobName = $options['jobName'] ?? '*'; // Process all jobs by default
$maxLoad = floatval(@$options['maxLoad']) ?: 1.0; // Max load of 1.0 by default
$maxIterations = intval(@$options['maxIterations']) ?: -1; // Unlimited iterations by default

// Configure the Bedrock client with these command-line options
Client::configure($options);

// Prepare to use the host logger, if configured
$logger = Client::getLogger();
$stats = Client::getStats();
$logger->info('Starting BedrockWorkerManager', ['maxIterations' => $maxIterations]);

// If --versionWatch is enabled, begin watching a version file for changes
$versionWatchFile = @$options['versionWatchFile'];
$versionWatchFileTimestamp = $versionWatchFile && file_exists($versionWatchFile) ? filemtime($versionWatchFile) : false;

// Wrap everything in a general exception handler so we can handle error
// conditions as gracefully as possible.
$stats = Client::getStats();
try {
$logger->info('Starting BedrockWorkerManager', ['maxLoopIteration' => $maxLoopIteration]);

if (!file_exists('/proc/loadavg')) {
throw new Exception('are you in a chroot? If so, please make sure /proc is mounted correctly');
// Validate details now that we have exception handling
if (!is_dir($workerPath)) {
throw new Exception("Invalid --workerPath path '$workerPath'");
}
if ($maxLoad <= 0) {
throw new Exception('--maxLoad must be greater than zero');
}

// Connect to Bedrock -- it'll reconnect if necessary
$bedrock = new Client();
$jobs = new Jobs($bedrock);

// Begin the infinite loop
$loopIteration = 0;
// If --maxIterations is set, loop a finite number of times and then self
// destruct. This is to guard against memory leaks, as we assume there is
// some other script that will restart this when it dies.
$iteration = 0;
while (true) {
if ($loopIteration === $maxLoopIteration) {
// Is it time to self destruct?
if ($maxIterations > 0 && $iteration >= $maxIterations) {
$logger->info("We did all our loops iteration, shutting down");
exit(0);
break;
}

$loopIteration++;
$logger->info("Loop iteration", ['iteration' => $loopIteration]);
$iteration++;
$logger->info("Loop iteration", ['iteration' => $iteration]);

// Step One wait for resources to free up
while (true) {
// Get the latest load
if (!file_exists('/proc/loadavg')) {
throw new Exception('are you in a chroot? If so, please make sure /proc is mounted correctly');
}
$load = sys_getloadavg()[0];
if ($load < $maxLoad) {
$logger->info('Load is under max, checking for more work.', ['load' => $load, 'MAX_LOAD' => $maxLoad]);
Expand All @@ -109,21 +97,28 @@
}
}

// Get any job managed by the BedrockWorkerManager
// Poll the server until we successfully get a job
$response = null;
while (!$response) {
// php's filemtime results are cached, so we need to clear that cache or we'll be getting a stale modified time.
// Watch a version file that will cause us to automatically shut
// down if it changes. This enables triggering a restart if new
// PHP is deployed.
//
// Note: php's filemtime results are cached, so we need to clear
// that cache or we'll be getting a stale modified time.
clearstatcache(true, $versionWatchFile);
$newVersionWatchFileTimestamp = $versionWatchFile && file_exists($versionWatchFile) ? filemtime($versionWatchFile) : false;
$newVersionWatchFileTimestamp = ($versionWatchFile && file_exists($versionWatchFile)) ? filemtime($versionWatchFile) : false;
if ($versionWatchFile && $newVersionWatchFileTimestamp !== $versionWatchFileTimestamp) {
$logger->info('Version watch file changed, stop processing new jobs');

// We break out of this loop and the outer one too. We don't want to process anything more,
// just wait for child processes to finish.
break 2;
}

// Ready to get a new job
try {
// Attempt to get a job
// Query the server for a job
$response = $jobs->getJob($jobName, 60 * 1000); // Wait up to 60s
} catch (Exception $e) {
// Try again in 60 seconds
Expand All @@ -132,29 +127,28 @@
}
}

if (!$response) {
$logger->info('Got no response from bedrock... Continuing.');
continue;
}

// Found a job
if ($response['code'] == 200) {
// BWM jobs are '/' separated names, the last component of which indicates the name of the worker to
// instantiate to execute this job:
// arbitrary/optional/path/to/workerName
// BWM jobs are '/' separated names, the last component of which
// indicates the name of the worker to instantiate to execute this
// job:
//
// arbitrary/optional/path/to/workerName
//
// We look for a file:
//
// <workerPath>/<workerName>.php
// <workerPath>/<workerName>.php
//
// If it's there, we include it, and then create
// an object and run it like:
// If it's there, we include it, and then create an object and run
// it like:
//
// $worker = new $workerName( $job );
// $worker->safeRun( );
// $worker = new $workerName( $job );
// $worker->run( );
//
// The optional path info allows for jobs to be scheduled selectively. I.e., you may have separate jobs
// scheduled as production/jobName and staging/jobName, with a WorkerManager in each environment looking for
// each path.
// The optional path info allows for jobs to be scheduled
// selectively. For example, you may have separate jobs scheduled
// as production/jobName and staging/jobName, with a WorkerManager
// in each environment looking for each path.
$job = $response['body'];
$parts = explode('/', $job['name']);
$jobParts = explode('?', $job['name']);
Expand All @@ -166,24 +160,40 @@
$logger->info("Looking for worker '$workerFilename'");
if (file_exists($workerFilename)) {
// The file seems to exist -- fork it so we can run it.
//
// Note: By explicitly ignoring SIGCHLD we tell the kernel to
// "reap" finished child processes automatically, rather
// than creating "zombie" processes. (We don't care
// about the child's exit code, so we have no use for the
// zombie process.)
$logger->info("Forking and running a worker.", [
'workerFileName' => $workerFilename,
]);
// Ignore SIGCHLD signal, which should help 'reap' zombie processes, forcing zombies to kill themselves
// in the event that the parent process dies before the child/zombie)
pcntl_signal(SIGCHLD, SIG_IGN);
$pid = pcntl_fork();
if ($pid == -1) {
// Something went wrong, couldn't fork
$errorMessage = pcntl_strerror(pcntl_get_last_error());
throw new Exception("Unable to fork because '$errorMessage', aborting.");
} elseif ($pid == 0) {
// If we are using a global REQUEST_ID, reset it to indicate this is a new process.
$logger->info("Fork succeeded, child process, running job", [
'name' => $job['name'],
'id' => $job['jobID'],
'extraParams' => $extraParams,
]);

if (isset($GLOBALS['REQUEST_ID'])) {
// Reset the REQUEST_ID and re-log the line so we see
// it when searching for either the parent and child
// REQUEST_IDs.
mt_srand(getmypid());
$GLOBALS['REQUEST_ID'] = substr(base64_encode(sha1(mt_rand())),0,6); // random 6 character ID
$logger->info("Fork succeeded, child process, running job", [
'name' => $job['name'],
'id' => $job['jobID'],
'extraParams' => $extraParams,
]);
}
$stats->counter('bedrockJob.create.'.$job['name']);

// Include the worker now (not in the parent thread) such
Expand Down Expand Up @@ -223,6 +233,7 @@
}
});

// The forked worker process is all done.
$stats->counter('bedrockJob.finish.'.$job['name']);
exit(1);
} else {
Expand All @@ -245,10 +256,14 @@
}
}

// We wait for all children to finish before dying.
$status = null;
pcntl_wait($status);
} catch (Exception $e) {
$message = $e->getMessage();
$logger->alert('BedrockWorkerManager.php exited abnormally', ['exception' => $e]);
echo "Error: $message\r\n";
}

// We wait for all children to finish before dying.
$logger->info('Stopping BedrockWorkerManager, waiting for children');
$status = null;
pcntl_wait($status);
$logger->info('Stopped BedrockWorkerManager');
2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -2,7 +2,7 @@
"name": "expensify/Bedrock-PHP",
"description": "Bedrock PHP Library",
"type": "library",
"version": "1.0.11",
"version": "1.0.12",
"authors": [
{
"name": "Expensify",
Expand Down
15 changes: 15 additions & 0 deletions sample/SampleWorker.php
@@ -0,0 +1,15 @@
<?php
/**
* Sample BedrockWorkerManager worker class.
*/
class SampleWorker extends BedrockWorker
{
/**
* This is called once to run the worker. The thread will be forked before
* this is called, and after this function exits the process will exit.
*/
public function run()
{
$this->bedrock->getLogger()->info("Running SampleWorker for '{$this->job['name']}'");
}
}
66 changes: 66 additions & 0 deletions sample/demo.sh
@@ -0,0 +1,66 @@
#!/bin/bash
# This is a trival script to demonstrate Bedrock::Jobs

# -----------------------
echo "Confirming bedrock is running"
BEDROCK_PID=`pgrep bedrock`
if [ -z "$BEDROCK_PID" ]
then
echo "Please start bedrock, eg: sudo ./bedrock -clean -fork"
exit
fi

# -----------------------
echo "Clean up after the last demo"
RESULT=`echo 'Query
query: DELETE FROM jobs;
connection: close
' | nc localhost 8888 | head -n 1`
if [[ "$RESULT" != 200* ]]
then
echo "ERROR: Cleanup failed ($RESULT)"
exit
fi


# -----------------------
echo 'Creating a SampleWorker job...'
echo "CreateJob
name: SampleWorker
connection: close
" | nc localhost 8888 > /dev/null
sleep 1

# -----------------------
echo "Confirming job is QUEUED"
COUNT=`echo "Query: SELECT COUNT(*) FROM jobs;
connection: close
" | nc localhost 8888 | tail -n 1`
if [ "$COUNT" != 1 ]
then
echo "ERROR: Failed to queue job (count=$COUNT)"
exit
fi

# -----------------------
echo "Starting BWM..."
php ../bin/BedrockWorkerManager.php --workerPath=. &
PID=$!

# -----------------------
while [ "$COUNT" != 0 ]
do
echo "Waiting for job to finish"
COUNT=`echo "Query: SELECT COUNT(*) FROM jobs;
connection: close
" | nc localhost 8888 | tail -n 1`
sleep 1
done

# -----------------------
echo "Done"
kill $PID

0 comments on commit 52ba40d

Please sign in to comment.