Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

**php-resque is a Redis-based library for enqueuing and running background jobs.**

This is a lightly maintained fork of **chrisboulton/php-resque**, compatible with PHP 8.2+.
This is a lightly maintained fork of **chrisboulton/php-resque**, with fixes and improvements, compatible with PHP 8.2+.

⚠️ Not recommended for new projects. We are only maintaining this for legacy projects.

Expand Down Expand Up @@ -74,14 +74,15 @@ QUEUE=default php vendor/bin/resque

You can set the following environment variables on the worker:

| Name | Description |
|---------------|-------------------------------------------------------------------------------------------------------------------------|
| `QUEUE` | Required. Defines one or more comma-separated queues to process tasks from. Set to `*` to process from any queue. |
| `APP_INCLUDE` | Optional. Defines a bootstrap script to run before starting the worker. |
| `PREFIX` | Optional. Prefix to use in Redis. |
| `COUNT` | Optional. Amount of worker forks to start. If set to > 1, the process will start the workers and then exit immediately. |
| `VERBOSE` | Optional. Forces verbose log output. |
| `VVERBOSE` | Optional. Forces detailed verbose log output. |
| Name | Description |
|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `QUEUE` | Required. Defines one or more comma-separated queues to process tasks from. Set to `*` to process from any queue. |
| `APP_INCLUDE` | Optional. Defines a bootstrap script to run before starting the worker. |
| `PREFIX` | Optional. Prefix to use in Redis. |
| `COUNT` | Optional. Amount of worker forks to start. If set to > 1, the process will start the workers and then exit immediately. |
| `SCHEDULE` | Optional. An expression with a from/until time, e.g. `22:00-06:00` to only run tasks between 10pm and 6am. The worker is paused outside of the schedule. Relative to default timezone (`date_default_timezone_set`). |
| `VERBOSE` | Optional. Forces verbose log output. |
| `VVERBOSE` | Optional. Forces detailed verbose log output. |

### Events

Expand Down
12 changes: 12 additions & 0 deletions bin/resque
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ if(!empty($PREFIX)) {
Resque_Redis::prefix($PREFIX);
}

$SCHEDULE = getenv('SCHEDULE');
$scheduleParsed = null;
if (!empty($SCHEDULE)) {
$scheduleParsed = Resque_Time_Schedule::tryParse($SCHEDULE);
if (!$scheduleParsed) {
die('SCHEDULE ('.$SCHEDULE.") expression invalid (parse error).\n");
}
}

if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = Resque::fork();
Expand All @@ -117,6 +126,9 @@ else {
$worker = new Resque_Worker($queues);
$worker->setLogger($logger);

if ($scheduleParsed)
$worker->setSchedule($scheduleParsed);

$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
Expand Down
63 changes: 63 additions & 0 deletions lib/Resque/Time/Expression.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

/**
* Resque hour:minute time expression for scheduling.
*
* @package Resque/Time
* @author Roy de Jong <roy@softwarepunt.nl>
* @license http://www.opensource.org/licenses/mit-license.php
*
* @see Resque_Time_Schedule
*/
class Resque_Time_Expression
{
public int $hour;
public int $minute;

public function __construct(int $hour, int $minute = 0)
{
$this->hour = $hour;
$this->minute = $minute;
}

// -----------------------------------------------------------------------------------------------------------------
// Format

public function __toString(): string
{
return self::pad2($this->hour) . ':' . self::pad2($this->minute);
}

public static function pad2(int $number): string
{
$strVal = strval($number);
if (strlen($strVal) === 1)
return "0{$strVal}";
return $strVal;
}

// -----------------------------------------------------------------------------------------------------------------
// Parse

/**
* Tries to parse a time expression, e.g. "12:34" into a Resque_TimeExpression.
*
* @param string $input Time expression with hours and minutes.
* @return Resque_Time_Expression|null Parsed time expression, or NULL if parsing failed.
*/
public static function tryParse(string $input): ?Resque_Time_Expression
{
$parts = explode(':', $input);

if (count($parts) < 2)
return null;

$hours = intval($parts[0]);
$minutes = intval($parts[1]);

if ($hours < 0 || $hours >= 24 || $minutes < 0 || $minutes >= 60)
return null;

return new Resque_Time_Expression($hours, $minutes);
}
}
95 changes: 95 additions & 0 deletions lib/Resque/Time/Schedule.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
<?php

/**
* Resque time schedule expression.
*
* @package Resque/Time
* @author Roy de Jong <roy@softwarepunt.nl>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Time_Schedule
{
public Resque_Time_Expression $from;
public Resque_Time_Expression $until;

public function __construct(Resque_Time_Expression $from, Resque_Time_Expression $until)
{
$this->from = $from;
$this->until = $until;
}

// -----------------------------------------------------------------------------------------------------------------
// Schedule checking

public function isInSchedule(?DateTime $now = null): bool
{
if (!$now)
$now = new DateTime('now');

$todayFrom = $this->getFromDateTime($now);

if ($todayFrom > $now) {
// Outside of start range, check if we are in yesterday's range
$yesterdayFrom = (clone $todayFrom)->modify('-1 day');
$yesterdayUntil = $this->getUntilDateTime($yesterdayFrom);

return $now >= $yesterdayFrom && $now <= $yesterdayUntil;
}

$todayUntil = $this->getUntilDateTime($todayFrom);
return $now <= $todayUntil;
}

public function getFromDateTime(?DateTime $now = null): DateTime
{
if (!$now)
$now = new DateTime('now');

$dt = clone $now;
$dt->setTime($this->from->hour, $this->from->minute, 0, 0);

return $dt;
}

public function getUntilDateTime(?DateTime $fromDateTime = null): DateTime
{
if (!$fromDateTime)
$fromDateTime = new DateTime('now');

$dt = clone $fromDateTime;
$dt->setTime($this->until->hour, $this->until->minute, 59, 999999);

if ($dt < $fromDateTime)
// Midnight rollover
$dt->modify('+1 day');

return $dt;
}

// -----------------------------------------------------------------------------------------------------------------
// Expression parsing

public static function tryParse(string $input): ?Resque_Time_Schedule
{
$parts = explode('-', $input, 2);

if (count($parts) !== 2)
return null;

$from = Resque_Time_Expression::tryParse(trim($parts[0]));
$until = Resque_Time_Expression::tryParse(trim($parts[1]));

if ($from === null || $until === null)
return null;

return new Resque_Time_Schedule($from, $until);
}

// -----------------------------------------------------------------------------------------------------------------
// Schedule formatting

public function __toString(): string
{
return "{$this->from} - {$this->until}";
}
}
33 changes: 33 additions & 0 deletions lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class Resque_Worker
*/
private $child = null;

/**
* The schedule constraints for this worker.
* If a schedule is set, the worker will not perform any tasks outside of it.
*/
private ?Resque_Time_Schedule $schedule = null;

/**
* Instantiate a new worker, given a list of queues that it should be working
* on. The list of queues should be supplied in the priority that they should
Expand Down Expand Up @@ -192,6 +198,28 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
break;
}

// Check schedule constraints
if ($this->schedule) {
$didScheduleDelay = false;
while (!$this->schedule->isInSchedule()) {
if (!$didScheduleDelay) {
// Announce schedule pause
$this->logger->log(Psr\Log\LogLevel::NOTICE, "Pausing, outside schedule constraint ({$this->schedule})");
$this->updateProcLine("Paused for " . implode(',', $this->queues) . " with schedule constraint {$this->schedule}");
$didScheduleDelay = true;
}
if ($this->shutdown) {
// Interrupted, immediate shutdown
$this->shutdownNow();
return;
}
usleep(10000000); // 10s
}
if ($didScheduleDelay) {
$this->logger->log(Psr\Log\LogLevel::NOTICE, "Resuming, now within schedule constraint ({$this->schedule})");
}
}

// Attempt to find and reserve a job
$job = false;
if(!$this->paused) {
Expand Down Expand Up @@ -601,4 +629,9 @@ public function setLogger(Psr\Log\LoggerInterface $logger)
{
$this->logger = $logger;
}

public function setSchedule(?Resque_Time_Schedule $schedule): void
{
$this->schedule = $schedule;
}
}
Loading