Skip to content

Commit

Permalink
add initial version. test.php shows sample usage/useful for testing u…
Browse files Browse the repository at this point in the history
…ntil tests are ready
  • Loading branch information
Chris Boulton committed Mar 18, 2012
0 parents commit ff3fc17
Show file tree
Hide file tree
Showing 8 changed files with 579 additions and 0 deletions.
20 changes: 20 additions & 0 deletions LICENSE
@@ -0,0 +1,20 @@
(c) 2012 Chris Boulton <chris@bigcommerce.com>

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
103 changes: 103 additions & 0 deletions README.markdown
@@ -0,0 +1,103 @@
php-resque-scheduler: PHP Resque Scheduler
==========================================

php-resque-scheduler is a PHP port of [resque-scheduler](http://github.com/defunkt/resque),
which adds support for scheduling items in the future to Resque.

The PHP port of resque-scheduler has been designed to be an almost direct-copy
of the Ruby plugin, and is designed to work with the PHP port of resque,
[php-resque](http://github.com/chrisboulton/php-resque).

At the moment, php-resque-scheduler only supports delayed jobs, which is the
ability to push a job to the queue and have it run at a certain timestamp, or
in a number of seconds. Support for recurring jobs (similar to CRON) is planned
for a future release.

Because the PHP port is almost a direct API copy of the Ruby version, it is also
compatible with the web interface of the Ruby version, which provides the
ability to view and manage delayed jobs.

## Delayed Jobs

To quote the documentation for the Ruby resque-scheduler:

> Delayed jobs are one-off jobs that you want to be put into a queue at some
point in the future. The classic example is sending an email:

require 'Resque/Resque.php';
require 'ResqueScheduler/ResqueScheduler.php';

$in = strtotime('+5 days');
$args = array('id' => $user->id);
ResqueScheduler::enqueueIn($in, 'email', 'SendFollowUpEmail', $args);

The above will store the job for 5 days in the delayed queue, and then pull the
job off and submit it to the `email` queue in Resque for processing as soon as
a worker is available.

Instead of passing a relative time in seconds, you can also supply a timestamp
as either a DateTime object or integer containing a UNIX timestamp to the
`enqueueAt` method:

require 'Resque/Resque.php';
require 'ResqueScheduler/ResqueScheduler.php';

$time = 1332067214;
ResqueScheduler::enqueueAt($time, 'email', 'SendFollowUpEmail', $args);

$datetime = new DateTime('2012-03-18 13:21:49');
ResqueScheduler::enqueueAt(datetime, 'email', 'SendFollowUpEmail', $args);

NOTE: resque-scheduler does not guarantee a job will fire at the time supplied.
At the time supplied, resque-scheduler will take the job out of the delayed
queue and push it to the appropriate queue in Resque. Your next available Resque
worker will pick the job up. To keep processing as quick as possible, keep your
queues as empty as possible.

## Worker

Like resque, resque-scheduler includes a worker that runs in the background. This
worker is responsible for pulling items off the schedule/delayed queue and adding
them to the queue for resque. This means that for delayed or scheduled jobs to be
executed, the worker needs to be running.

A basic "up-and-running" resque-scheduler.php file is included that sets up a
running worker environment is included in the root directory. It accepts many
of the same environment variables as php-resque:

* `REDIS_BACKEND` - Redis server to connect to
* `LOGGING` - Enable logging to STDOUT
* `VERBOSE` - Enable verbose logging
* `VVERBOSE` - Enable very verbose logging
* `INTERVAL` - Sleep for this long before checking scheduled/delayed queues
* `APP_INCLUDE` - Include this file when starting (to launch your app)
* `PIDFILE` - Write the PID of the worker out to this file

The resque-scheduler worker requires resque to function. The demo
resque-scheduler.php worker allows you to supply a `RESQUE_PHP` environment
variable with the path to Resque.php. If not supplied and resque is not already
loaded, resque-scheduler will attempt to load it from your include path
(`require_once 'Resque/Resque.php';'`)

It's easy to start the resque-scheduler worker using resque-scheduler.php:
$ RESQUE_PHP=../resque/lib/Resque/Resque.php php resque-scheduler.php

## Event/Hook System

php-resque-scheduler uses the same event system used by php-resque and exposes
the following events.

### afterSchedule

Called after a job has been added to the schedule. Arguments passed are the
timestamp, queue of the job, the class name of the job, and the job's arguments.

### beforeDelayedEnqueue

Called immediately after a job has been pulled off the delayed queue and right
before the job is added to the queue in resque. Arguments passed are the queue
of the job, the class name of the job, and the job's arguments.

## Contributors ##

* chrisboulton
13 changes: 13 additions & 0 deletions extras/resque-scheduler.monit
@@ -0,0 +1,13 @@
# Replace these with your own:
# [PATH/TO/RESQUE]
# [PATH/TO/RESQUE-SCHEDULER]
# [UID]
# [GID]
# [APP_INCLUDE]

check process resque-scheduler_worker
with pidfile /var/run/resque/scheduler-worker.pid
start program = "/bin/sh -c 'APP_INCLUDE=[APP_INCLUDE] RESQUE_PHP=[PATH/TO/RESQUE] PIDFILE=/var/run/resque/scheduler-worker.pid nohup php -f [PATH/TO/RESQUE-SCHEDULER]/resque-scheduler.php > /var/log/resque/scheduler-worker.log &'" as uid [UID] and gid [GID]
stop program = "/bin/sh -c 'kill -s QUIT `cat /var/run/resque/scheduler-worker.pid` && rm -f /var/run/resque/scheduler-worker.pid; exit 0;'"
if totalmem is greater than 300 MB for 10 cycles then restart # eating up memory?
group resque-scheduler_workers
215 changes: 215 additions & 0 deletions lib/ResqueScheduler.php
@@ -0,0 +1,215 @@
<?php
/**
* ResqueScheduler core class to handle scheduling of jobs in the future.
*
* @package ResqueScheduler
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2012 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class ResqueScheduler
{
/**
* Enqueue a job in a given number of seconds from now.
*
* Identical to Resque::enqueue, however the first argument is the number
* of seconds before the job should be executed.
*
* @param int $in Number of seconds from now when the job should be executed.
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
*/
public static function enqueueIn($in, $queue, $class, array $args = array())
{
self::enqueueAt(time() + $in, $queue, $class, $args);
}

/**
* Enqueue a job for execution at a given timestamp.
*
* Identical to Resque::enqueue, however the first argument is a timestamp
* (either UNIX timestamp in integer format or an instance of the DateTime
* class in PHP).
*
* @param DateTime|int $at Instance of PHP DateTime object or int of UNIX timestamp.
* @param string $queue The name of the queue to place the job in.
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
*/
public static function enqueueAt($at, $queue, $class, $args = array())
{
self::validateJob($class, $queue);

$job = self::jobToHash($queue, $class, $args);
self::delayedPush($at, $job);

Resque_Event::trigger('afterSchedule', array(
'at' => $at,
'queue' => $queue,
'class' => $class,
'args' => $args,
));
}

/**
* Directly append an item to the delayed queue schedule.
*
* @param DateTime|int $timestamp Timestamp job is scheduled to be run at.
* @param array $item Hash of item to be pushed to schedule.
*/
public static function delayedPush($timestamp, $item)
{
$timestamp = self::getTimestamp($timestamp);
$redis = Resque::redis();
$redis->rpush('delayed:' . $timestamp, json_encode($item));

$redis->zadd('delayed_queue_schedule', $timestamp, $timestamp);
}

/**
* Get the total number of jobs in the delayed schedule.
*
* @return int Number of scheduled jobs.
*/
public static function getDelayedQueueScheduleSize()
{
return (int)Resque::redis()->zcard('delayed_queue_schedule');
}

/**
* Get the number of jobs for a given timestamp in the delayed schedule.
*
* @param DateTime|int $timestamp Timestamp
* @return int Number of scheduled jobs.
*/
public static function getDelayedTimestampSize($timestamp)
{
$timestamp = self::toTimestamp($timestamp);
return Resque::redis()->llen('delayed:' . $timestamp, $timestamp);
}

/**
* Generate hash of all job properties to be saved in the scheduled queue.
*
* @param string $queue Name of the queue the job will be placed on.
* @param string $class Name of the job class.
* @param array $args Array of job arguments.
*/

private static function jobToHash($queue, $class, $args)
{
return array(
'class' => $class,
'args' => $args,
'queue' => $queue,
);
}

/**
* If there are no jobs for a given key/timestamp, delete references to it.
*
* Used internally to remove empty delayed: items in Redis when there are
* no more jobs left to run at that timestamp.
*
* @param string $key Key to count number of items at.
* @param int $timestamp Matching timestamp for $key.
*/
private static function cleanupTimestamp($key, $timestamp)
{
$timestamp = self::getTimestamp($timestamp);
$redis = Resque::redis();

if ($redis->llen($key) == 0) {
$redis->del($key);
$redis->zrem('delayed_queue_schedule', $timestamp);
}
}

/**
* Convert a timestamp in some format in to a unix timestamp as an integer.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* @return int Timestamp
* @throws ResqueScheduler_InvalidTimestampException
*/
private static function getTimestamp($timestamp)
{
if ($timestamp instanceof DateTime) {
$timestamp = $timestamp->getTimestamp();
}

if ((int)$timestamp != $timestamp) {
throw new ResqueScheduler_InvalidTimestampExeption(
'The supplied timestamp value could not be converted to an integer.'
);
}

return (int)$timestamp;
}

/**
* Find the first timestamp in the delayed schedule before/including the timestamp.
*
* Will find and return the first timestamp upto and including the given
* timestamp. This is the heart of the ResqueScheduler that will make sure
* that any jobs scheduled for the past when the worker wasn't running are
* also queued up.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* Defaults to now.
* @return int|false UNIX timestamp, or false if nothing to run.
*/
public function nextDelayedTimestamp($at = null)
{
if ($at === null) {
$at = time();
}
else {
$at = self::getTimestamp($at);
}

$items = Resque::redis()->zrangebyscore('delayed_queue_schedule', '-inf', $at, 'LIMIT', 0, 1);
if (!empty($items)) {
return $items[0];
}

return false;
}

/**
* Pop a job off the delayed queue for a given timestamp.
*
* @param DateTime|int $timestamp Instance of DateTime or UNIX timestamp.
* @return array Matching job at timestamp.
*/
public function nextItemForTimestamp($timestamp)
{
$timestamp = self::getTimestamp($timestamp);
$key = 'delayed:' . $timestamp;

$item = json_decode(Resque::redis()->lpop($key), true);

self::cleanupTimestamp($key, $timestamp);
return $item;
}

/**
* Ensure that supplied job class/queue is valid.
*
* @param string $class Name of job class.
* @param string $queue Name of queue.
* @throws Resque_Exception
*/
private static function validateJob($class, $queue)
{
if (empty($class)) {
throw new Resque_Exception('Jobs must be given a class.');
}
else if (empty($queue)) {
throw new Resque_Exception('Jobs must be put in a queue.');
}

return true;
}
}
13 changes: 13 additions & 0 deletions lib/ResqueScheduler/InvalidTimestampException.php
@@ -0,0 +1,13 @@
<?php
/**
* Exception thrown whenever an invalid timestamp has been passed to a job.
*
* @package ResqueScheduler
* @author Chris Boulton <chris.boulton@interspire.com>
* @copyright (c) 2012 Chris Boulton
* @license http://www.opensource.org/licenses/mit-license.php
*/
class ResqueScheduler_InvalidTimestampException extends Resque_Exception
{

}

0 comments on commit ff3fc17

Please sign in to comment.