Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple demo framework for Bedrock::Jobs; add requestID; add retries #15

Merged
merged 19 commits into from Apr 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
179 changes: 97 additions & 82 deletions bin/BedrockWorkerManager.php
Expand Up @@ -16,89 +16,77 @@
* After N cycle in the loop, we exit
* If the versionWatchFile modified time changes, we stop processing new jobs and exit after finishing all running jobs.
*
* Usage: `Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --maxIterations=<loopIteration> --versionWatchFile=<file> --writeConsistency=<consistency>]`
* Usage: `Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --failoverHost=<host> --failoverPort=<port> --maxIterations=<iteration> --versionWatchFile=<file> --writeConsistency=<consistency>]`
*/

// Verify it's being started correctly
if (php_sapi_name() !== "cli") {
// Throw an exception rather than just output because we assume this is
// being executed on a webserver, so no STDOUT. Hopefully they've
// configured a general uncaught exception handler, and this will trigger
// that.
throw new Exception('This script is cli only');
}

$options = getopt('', ['host::', 'port::', 'maxLoad::', 'maxIterations::', 'jobName::', 'logger::', 'stats::', 'workerPath::', 'versionWatchFile::', 'writeConsistency::']);
$jobName = isset($options['jobName']) ? $options['jobName'] : null;
$maxLoad = isset($options['maxLoad']) && floatval($options['maxLoad']) ? floatval($options['maxLoad']) : 0;
$maxLoopIteration = isset($options['maxIterations']) && intval($options['maxIterations']) ? intval($options['maxIterations']) : 0;
if (!$maxLoopIteration) {
$maxLoopIteration = 1000;
}
$bedrockConfig = [];
if (isset($options['host'])) {
$bedrockConfig['host'] = $options['host'];
}
if (isset($options['port'])) {
$bedrockConfig['port'] = $options['port'];
}
if (isset($options['logger'])) {
$bedrockConfig['logger'] = $options['logger'];
}
if (isset($options['stats'])) {
$bedrockConfig['stats'] = $options['stats'];
}
if (isset($options['connectionTimeout'])) {
$bedrockConfig['connectionTimeout'] = $options['connectionTimeout'];
}
if (isset($options['readTimeout'])) {
$bedrockConfig['readTimeout'] = $options['readTimeout'];
}
if (isset($options['failoverHost'])) {
$bedrockConfig['failoverHost'] = $options['failoverHost'];
}
if (isset($options['failoverPort'])) {
$bedrockConfig['failoverPort'] = $options['failoverPort'];
}
if (isset($options['writeConsistency'])) {
$bedrockConfig['writeConsistency'] = $options['writeConsistency'];
}
$versionWatchFile = isset($options['versionWatchFile']) ? $options['versionWatchFile'] : null;
$workerPath = isset($options['workerPath']) ? $options['workerPath'] : null;
if (!$jobName || !$maxLoad || !$workerPath) {
throw new Exception('Usage: sudo -u user php ./bin/BedrockWorkerManager.php --jobName=<jobName> --workerPath=<workerPath> --maxLoad=<maxLoad> [--host=<host> --port=<port> --maxIterations=<loopIteration> --writeConsistency=<consistency>]');
}
if ($maxLoad <= 0) {
throw new Exception('Maximum load must be greater than zero');
// Parse the command line and verify the required settings are provided
$options = getopt('', ['host::', 'port::', 'failoverHost::', 'failoverPort::', 'maxLoad::', 'maxIterations::', 'jobName::', 'logger::', 'stats::', 'workerPath::', 'versionWatchFile::', 'writeConsistency::']);
$workerPath = @$options['workerPath'];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$options['workerPath'] ?? null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per offline discussion, this is NAB

if (!$workerPath) {
echo "Usage: sudo -u user php ./bin/BedrockWorkerManager.php --workerPath=<workerPath> [--jobName=<jobName> --maxLoad=<maxLoad> --host=<host> --port=<port> --maxIterations=<iteration> --writeConsistency=<consistency>]\r\n";
exit(1);
}

Client::configure($bedrockConfig);
// Add defaults
$jobName = $options['jobName'] ?? '*'; // Process all jobs by default
$maxLoad = floatval(@$options['maxLoad']) ?: 1.0; // Max load of 1.0 by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

floatval($options['maxLoad'] ?? 1.0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per offline discussion, this is NAB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... on this does the ?? just implicitly do the error suppression on the left expression? So it'll work even if inside a function? That's pretty slick.

Copy link
Contributor

@iwiznia iwiznia Apr 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$options['maxLoad'] ?? 1.0 is equivalent to (isset($options['maxLoad']) ? $options['maxLoad'] : 1.0)

$maxIterations = intval(@$options['maxIterations']) ?: -1; // Unlimited iterations by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intval($options['maxIterations'] ?? -1)


// Configure the Bedrock client with these command-line options
Client::configure($options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NAB: we were copying the bedrock specific configurations to a new array in order to not pass a ton of garbage options to the bedrock client. Now all bwm specific params are being passed to the client. Anyway, since I can't see any problem with it, I think is fine as is.


// Prepare to use the host logger, if configured
$logger = Client::getLogger();
$stats = Client::getStats();
$logger->info('Starting BedrockWorkerManager', ['maxIterations' => $maxIterations]);

// If --versionWatch is enabled, begin watching a version file for changes
$versionWatchFile = @$options['versionWatchFile'];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$options['versionWatchFile'] ?? null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per offline discussion, this is NAB

$versionWatchFileTimestamp = $versionWatchFile && file_exists($versionWatchFile) ? filemtime($versionWatchFile) : false;

// Wrap everything in a general exception handler so we can handle error
// conditions as gracefully as possible.
$stats = Client::getStats();
try {
$logger->info('Starting BedrockWorkerManager', ['maxLoopIteration' => $maxLoopIteration]);

if (!file_exists('/proc/loadavg')) {
throw new Exception('are you in a chroot? If so, please make sure /proc is mounted correctly');
// Validate details now that we have exception handling
if (!is_dir($workerPath)) {
throw new Exception("Invalid --workerPath path '$workerPath'");
}
if ($maxLoad <= 0) {
throw new Exception('--maxLoad must be greater than zero');
}

// Connect to Bedrock -- it'll reconnect if necessary
$bedrock = new Client();
$jobs = new Jobs($bedrock);

// Begin the infinite loop
$loopIteration = 0;
// If --maxIterations is set, loop a finite number of times and then self
// destruct. This is to guard against memory leaks, as we assume there is
// some other script that will restart this when it dies.
$iteration = 0;
while (true) {
if ($loopIteration === $maxLoopIteration) {
// Is it time to self destruct?
if ($maxIterations > 0 && $iteration >= $maxIterations) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed these variables to just use iterations rather than loopIterations.

$logger->info("We did all our loops iteration, shutting down");
exit(0);
break;
}

$loopIteration++;
$logger->info("Loop iteration", ['iteration' => $loopIteration]);
$iteration++;
$logger->info("Loop iteration", ['iteration' => $iteration]);

// Step One wait for resources to free up
while (true) {
// Get the latest load
if (!file_exists('/proc/loadavg')) {
throw new Exception('are you in a chroot? If so, please make sure /proc is mounted correctly');
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the /proc/loadavg check here to keep it next to the sys_getloadavg() call that depends on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to do this check every time the loop runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed we don't need to, but I think it's non-harmful to do it (eg, it's not a super expensive call given the relatively low the frequency of this loop), and makes the code read a lot cleaner to keep it where it's used.

$load = sys_getloadavg()[0];
if ($load < $maxLoad) {
$logger->info('Load is under max, checking for more work.', ['load' => $load, 'MAX_LOAD' => $maxLoad]);
Expand All @@ -109,21 +97,28 @@
}
}

// Get any job managed by the BedrockWorkerManager
// Poll the server until we successfully get a job
$response = null;
while (!$response) {
// php's filemtime results are cached, so we need to clear that cache or we'll be getting a stale modified time.
// Watch a version file that will cause us to automatically shut
// down if it changes. This enables triggering a restart if new
// PHP is deployed.
//
// Note: php's filemtime results are cached, so we need to clear
// that cache or we'll be getting a stale modified time.
clearstatcache(true, $versionWatchFile);
$newVersionWatchFileTimestamp = $versionWatchFile && file_exists($versionWatchFile) ? filemtime($versionWatchFile) : false;
$newVersionWatchFileTimestamp = ($versionWatchFile && file_exists($versionWatchFile)) ? filemtime($versionWatchFile) : false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just clarifying what this means with parenthesis as the precedence of the operators wasn't obvious to me at first.

if ($versionWatchFile && $newVersionWatchFileTimestamp !== $versionWatchFileTimestamp) {
$logger->info('Version watch file changed, stop processing new jobs');

// We break out of this loop and the outer one too. We don't want to process anything more,
// just wait for child processes to finish.
break 2;
}

// Ready to get a new job
try {
// Attempt to get a job
// Query the server for a job
$response = $jobs->getJob($jobName, 60 * 1000); // Wait up to 60s
} catch (Exception $e) {
// Try again in 60 seconds
Expand All @@ -132,29 +127,28 @@
}
}

if (!$response) {
$logger->info('Got no response from bedrock... Continuing.');
continue;
}

// Found a job
if ($response['code'] == 200) {
// BWM jobs are '/' separated names, the last component of which indicates the name of the worker to
// instantiate to execute this job:
// arbitrary/optional/path/to/workerName
// BWM jobs are '/' separated names, the last component of which
// indicates the name of the worker to instantiate to execute this
// job:
//
// arbitrary/optional/path/to/workerName
//
// We look for a file:
//
// <workerPath>/<workerName>.php
// <workerPath>/<workerName>.php
//
// If it's there, we include it, and then create
// an object and run it like:
// If it's there, we include it, and then create an object and run
// it like:
//
// $worker = new $workerName( $job );
// $worker->safeRun( );
// $worker = new $workerName( $job );
// $worker->run( );
//
// The optional path info allows for jobs to be scheduled selectively. I.e., you may have separate jobs
// scheduled as production/jobName and staging/jobName, with a WorkerManager in each environment looking for
// each path.
// The optional path info allows for jobs to be scheduled
// selectively. For example, you may have separate jobs scheduled
// as production/jobName and staging/jobName, with a WorkerManager
// in each environment looking for each path.
$job = $response['body'];
$parts = explode('/', $job['name']);
$jobParts = explode('?', $job['name']);
Expand All @@ -166,24 +160,40 @@
$logger->info("Looking for worker '$workerFilename'");
if (file_exists($workerFilename)) {
// The file seems to exist -- fork it so we can run it.
//
// Note: By explicitly ignoring SIGCHLD we tell the kernel to
// "reap" finished child processes automatically, rather
// than creating "zombie" processes. (We don't care
// about the child's exit code, so we have no use for the
// zombie process.)
$logger->info("Forking and running a worker.", [
'workerFileName' => $workerFilename,
]);
// Ignore SIGCHLD signal, which should help 'reap' zombie processes, forcing zombies to kill themselves
// in the event that the parent process dies before the child/zombie)
pcntl_signal(SIGCHLD, SIG_IGN);
$pid = pcntl_fork();
if ($pid == -1) {
// Something went wrong, couldn't fork
$errorMessage = pcntl_strerror(pcntl_get_last_error());
throw new Exception("Unable to fork because '$errorMessage', aborting.");
} elseif ($pid == 0) {
// If we are using a global REQUEST_ID, reset it to indicate this is a new process.
$logger->info("Fork succeeded, child process, running job", [
'name' => $job['name'],
'id' => $job['jobID'],
'extraParams' => $extraParams,
]);

if (isset($GLOBALS['REQUEST_ID'])) {
// Reset the REQUEST_ID and re-log the line so we see
// it when searching for either the parent and child
// REQUEST_IDs.
mt_srand(getmypid());
$GLOBALS['REQUEST_ID'] = substr(base64_encode(sha1(mt_rand())),0,6); // random 6 character ID
$logger->info("Fork succeeded, child process, running job", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this text a bit so it's easier to search for parent vs child?

'name' => $job['name'],
'id' => $job['jobID'],
'extraParams' => $extraParams,
]);
}
$stats->counter('bedrockJob.create.'.$job['name']);

// Include the worker now (not in the parent thread) such
Expand Down Expand Up @@ -223,6 +233,7 @@
}
});

// The forked worker process is all done.
$stats->counter('bedrockJob.finish.'.$job['name']);
exit(1);
} else {
Expand All @@ -245,10 +256,14 @@
}
}

// We wait for all children to finish before dying.
$status = null;
pcntl_wait($status);
} catch (Exception $e) {
$message = $e->getMessage();
$logger->alert('BedrockWorkerManager.php exited abnormally', ['exception' => $e]);
echo "Error: $message\r\n";
}

// We wait for all children to finish before dying.
$logger->info('Stopping BedrockWorkerManager, waiting for children');
$status = null;
pcntl_wait($status);
$logger->info('Stopped BedrockWorkerManager');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving all this logic to the end of the outer loop, so it runs regardless of whether we're doing a graceful or abnormal shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iwiznia Are you sure this shutdown code does anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -2,7 +2,7 @@
"name": "expensify/Bedrock-PHP",
"description": "Bedrock PHP Library",
"type": "library",
"version": "1.0.11",
"version": "1.0.12",
"authors": [
{
"name": "Expensify",
Expand Down
15 changes: 15 additions & 0 deletions sample/SampleWorker.php
@@ -0,0 +1,15 @@
<?php
/**
* Sample BedrockWorkerManager worker class.
*/
class SampleWorker extends BedrockWorker
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this to provide a quick test and demonstration harness for how BWM works. In doing so I discovered that the Bedrock-PHP repo is incomplete, as BedrockWorker isn't included here. We need to extract that from the Expensify codebase and include in Bedrock-PHP/src instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are not moving BedrockWorker to this repo nor removing the Log call, I say we just leave the SampleWorker in the web repo for now and move it whenever we do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about I just move SampleWorker to Bedrock-PHP as part of this PR, and remove its call to Log::, but we leave BedrockWorker where it is for now. So I'll make partial progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

{
/**
* This is called once to run the worker. The thread will be forked before
* this is called, and after this function exits the process will exit.
*/
public function run()
{
$this->bedrock->getLogger()->info("Running SampleWorker for '{$this->job['name']}'");
}
}
66 changes: 66 additions & 0 deletions sample/demo.sh
@@ -0,0 +1,66 @@
#!/bin/bash
# This is a trival script to demonstrate Bedrock::Jobs

# -----------------------
echo "Confirming bedrock is running"
BEDROCK_PID=`pgrep bedrock`
if [ -z "$BEDROCK_PID" ]
then
echo "Please start bedrock, eg: sudo ./bedrock -clean -fork"
exit
fi

# -----------------------
echo "Clean up after the last demo"
RESULT=`echo 'Query
query: DELETE FROM jobs;
connection: close

' | nc localhost 8888 | head -n 1`
if [[ "$RESULT" != 200* ]]
then
echo "ERROR: Cleanup failed ($RESULT)"
exit
fi


# -----------------------
echo 'Creating a SampleWorker job...'
echo "CreateJob
name: SampleWorker
connection: close

" | nc localhost 8888 > /dev/null
sleep 1

# -----------------------
echo "Confirming job is QUEUED"
COUNT=`echo "Query: SELECT COUNT(*) FROM jobs;
connection: close

" | nc localhost 8888 | tail -n 1`
if [ "$COUNT" != 1 ]
then
echo "ERROR: Failed to queue job (count=$COUNT)"
exit
fi

# -----------------------
echo "Starting BWM..."
php ../bin/BedrockWorkerManager.php --workerPath=. &
PID=$!

# -----------------------
while [ "$COUNT" != 0 ]
do
echo "Waiting for job to finish"
COUNT=`echo "Query: SELECT COUNT(*) FROM jobs;
connection: close

" | nc localhost 8888 | tail -n 1`
sleep 1
done

# -----------------------
echo "Done"
kill $PID