Skip to content

Commit

Permalink
Merge pull request #2 from michalsn/feat/priority
Browse files Browse the repository at this point in the history
feat: Add priority to the queue
  • Loading branch information
michalsn committed Nov 9, 2023
2 parents 2ff57f5 + 9a85afe commit b27a50e
Show file tree
Hide file tree
Showing 21 changed files with 500 additions and 119 deletions.
5 changes: 5 additions & 0 deletions docs/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Allows you to consume jobs from a specific queue.
* `-max-jobs` - The maximum number of jobs to handle before worker should exit. Disabled by default.
* `-max-time` - The maximum number of seconds worker should run. Disabled by default.
* `-memory` - The maximum memory in MB that worker can take. Default value: `128`.
* `-priority` - The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via `$queuePriorities` for the given queue. Disabled by default.
* `-tries` - The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default.
* `-retry-after` - The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default.
* `--stop-when-empty` - Stop when the queue is empty.
Expand All @@ -70,6 +71,10 @@ Allows you to consume jobs from a specific queue.

It will listen for 5 jobs from the `emails` queue and then stop.

php spark queue:work emails -max-jobs 5 -priority low,high

It will work the same as the previous command but will first consume jobs from the `emails` queue that were added with the `low` priority.

### queue:stop

Allows you to stop a specific queue in a safe way. It does this as soon as the job that is running in the queue is completed.
Expand Down
34 changes: 34 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Available options:
- [$database](#database)
- [$keepDoneJobs](#keepDoneJobs)
- [$keepFailedJobs](#keepFailedJobs)
- [$queueDefaultPriority](#queueDefaultPriority)
- [$queuePriorities](#queuePriorities)
- [$jobHandlers](#jobHandlers)

### $defaultHandler
Expand Down Expand Up @@ -44,6 +46,38 @@ If the job failed, should we move it to the failed jobs table? Default value: `t

This is very useful when you want to be able to see which tasks are failing and why.

### $queueDefaultPriority

The default priority for the `queue` if non default `queuePriorities` are set. Not set by default.

This is needed only if you have defined non default priorities for the queue and the default priority should be different from the `default` value.

Example:

```php
public array $queueDefaultPriority = [
'emails' => 'low',
];
```

This means that all the jobs added to the `emails` queue will have the default priority set to `low`.

### $queuePriorities

The valid priorities for the `queue` in the order they will be consumed first. Not set by default.

By default, the priority is set to `['default']`. If you want to have multiple priorities in the queue, you can define them here.

Example:

```php
public array $queuePriorities = [
'emails' => ['high', 'low'],
];
```

This means that the jobs added to the `emails` queue can have either `high` or `low` priority.

### $jobHandlers

An array of available jobs as key-value. Every job that you want to use with the queue has to be defined here.
Expand Down
39 changes: 39 additions & 0 deletions docs/running_queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,45 @@ So choosing the right command is not so obvious. We have to estimate how many jo

You might use CodeIgniter [Tasks](https://github.com/codeigniter4/tasks) library to schedule queue worker instead of working directly with CRON.

### Working with priorities

By default, every job in the queue has the same priority. However, we can send the jobs to the queue with different priorities. This way some jobs may be handled earlier.

As an example, we will define priorities for the `emails` queue:

```php
// app/Config/Queue.php

public array $queueDefaultPriority = [
'emails' => 'low',
];

public array $queuePriorities = [
'emails' => ['high', 'low'],
];
```

With this configuration, we can now add new jobs to the queue like this:

```php
// This job will have low priority:
service('queue')->push('emails', 'email', ['message' => 'Email message with low priority']);
// But this one will have high priority
service('queue')->setPriority('high')->push('emails', 'email', ['message' => 'Email message with high priority']);
```

Now, if we run the worker:

php spark queue:work emails

It will consume the jobs from the queue based on priority set in the config: `$queuePriorities`. So, first `high` priority and then `low` priority.

But we can also run the worker like this:

php spark queue:work emails -priority low,high

This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override.

### Running many instances of the same queue

As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases except `SQLite3` since it doesn't guarantee that the job will be selected only by one process.
Expand Down
9 changes: 7 additions & 2 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@ parameters:
message: '#Variable \$config on left side of \?\?\= always exists and is not nullable.#'
paths:
- src/Config/Services.php
-
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#'
paths:
- src/Handlers/BaseHandler.php
-
message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#'
paths:
- src/Handlers/BaseHandler.php
- src/Handlers/DatabaseHandler.php
- src/Models/QueueJobModel.php
-
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#'
paths:
- src/Handlers/DatabaseHandler.php
- src/Handlers/BaseHandler.php
-
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::truncate\(\).#'
paths:
- src/Handlers/DatabaseHandler.php
- src/Handlers/BaseHandler.php
-
message: '#Parameter \#3 \$tries of method Michalsn\\CodeIgniterQueue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#'
paths:
Expand Down
12 changes: 12 additions & 0 deletions src/Commands/QueuePublish.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public function resolveJobClass(string $name): string
return $this->jobHandlers[$name];
}
/**
* Stringify queue priorities.
*/
public function getQueuePriorities(string $name): ?string
{
if (! isset($this->queuePriorities[$name])) {
return null;
}
return implode(',', $this->queuePriorities[$name]);
}
EOT;
$contents = str_replace($method, '', $contents);
file_put_contents($file, $contents);
Expand Down
20 changes: 16 additions & 4 deletions src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class QueueWork extends BaseCommand
'-max-jobs' => 'The maximum number of jobs to handle before worker should exit. Disabled by default.',
'-max-time' => 'The maximum number of seconds worker should run. Disabled by default.',
'-memory' => 'The maximum memory in MB that worker can take. Default value: 128',
'-priority' => 'The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via $queuePriorities for the given queue. Disabled by default.',
'-tries' => 'The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default.',
'-retry-after' => 'The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default.',
'--stop-when-empty' => 'Stop when the queue is empty.',
Expand All @@ -71,6 +72,8 @@ class QueueWork extends BaseCommand
*/
public function run(array $params)
{
set_time_limit(0);

/** @var QueueConfig $config */
$config = config('Queue');
$stopWhenEmpty = false;
Expand All @@ -89,6 +92,7 @@ public function run(array $params)
$maxJobs = $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0;
$maxTime = $params['max-time'] ?? CLI::getOption('max-time') ?? 0;
$memory = $params['memory'] ?? CLI::getOption('memory') ?? 128;
$priority = $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default';
$tries = $params['tries'] ?? CLI::getOption('tries');
$retryAfter = $params['retry-after'] ?? CLI::getOption('retry-after');
$countJobs = 0;
Expand All @@ -99,10 +103,18 @@ public function run(array $params)

$startTime = microtime(true);

CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan') . PHP_EOL, 'cyan');
CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan'), 'cyan');

if ($priority !== 'default') {
CLI::write('Jobs will be consumed according to priority: ' . CLI::color($priority, 'light_cyan'), 'cyan');
}

CLI::write(PHP_EOL);

$priority = array_map('trim', explode(',', $priority));

while (true) {
$work = service('queue')->pop($queue);
$work = service('queue')->pop($queue, $priority);

if ($work === null) {
if ($stopWhenEmpty) {
Expand Down Expand Up @@ -216,7 +228,7 @@ private function maxTimeCheck(int $maxTime, float $startTime): bool

private function checkMemory(int $memory): bool
{
if (memory_get_peak_usage() > $memory * 1024 * 1024) {
if (memory_get_usage(true) > $memory * 1024 * 1024) {
CLI::write(sprintf('The memory limit of %s MB was reached. Stopping.', $memory), 'yellow');

return true;
Expand All @@ -234,7 +246,7 @@ private function checkStop(string $queue, float $startTime): bool
}

if ($startTime < (float) $time) {
CLI::write('This worker has been scheduled to end. Stopping.', 'yellow');
CLI::write('The termination of this worker has been planned. Stopping.', 'yellow');

return true;
}
Expand Down
24 changes: 24 additions & 0 deletions src/Config/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ class Queue extends BaseConfig
*/
public bool $keepFailedJobs = true;

/**
* Default priorities for the queue
* if different from the "default".
*/
public array $queueDefaultPriority = [];

/**
* Valid priorities in the order for the queue,
* if different from the "default".
*/
public array $queuePriorities = [];

/**
* Your jobs handlers.
*/
Expand All @@ -63,4 +75,16 @@ public function resolveJobClass(string $name): string

return $this->jobHandlers[$name];
}

/**
* Stringify queue priorities.
*/
public function getQueuePriorities(string $name): ?string
{
if (! isset($this->queuePriorities[$name])) {
return null;
}

return implode(',', $this->queuePriorities[$name]);
}
}
3 changes: 2 additions & 1 deletion src/Config/Services.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

use CodeIgniter\Config\BaseService;
use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig;
use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface;
use Michalsn\CodeIgniterQueue\Queue;

class Services extends BaseService
{
public static function queue(?QueueConfig $config = null, $getShared = true)
public static function queue(?QueueConfig $config = null, $getShared = true): QueueInterface
{
if ($getShared) {
return static::getSharedInstance('queue', $config);
Expand Down
61 changes: 61 additions & 0 deletions src/Database/Migrations/2023-11-05-064053_AddPriorityField.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace Michalsn\CodeIgniterQueue\Database\Migrations;

use CodeIgniter\Database\BaseConnection;
use CodeIgniter\Database\Migration;

/**
* @property BaseConnection $db
*/
class AddPriorityField extends Migration
{
public function up()
{
$fields = [
'priority' => [
'type' => 'varchar',
'constraint' => 64,
'null' => false,
'default' => 'default',
'after' => 'payload',
],
];

$this->forge->addColumn('queue_jobs', $fields);
$this->forge->addColumn('queue_jobs_failed', $fields);

// Ugly fix for dropping the correct index
// since it had no name given
$keys = $this->db->getIndexData('queue_jobs');

foreach ($keys as $key) {
if ($key->fields === ['queue', 'status', 'available_at']) {
$this->forge->dropKey('queue_jobs', $key->name, false);
break;
}
}

$this->forge->addKey(['queue', 'priority', 'status', 'available_at'], false, false, 'queue_priority_status_available_at');
$this->forge->processIndexes('queue_jobs');
}

public function down()
{
// Ugly fix for dropping the correct index
$keys = $this->db->getIndexData('queue_jobs');

foreach ($keys as $key) {
if ($key->fields === ['queue', 'priority', 'status', 'available_at']) {
$this->forge->dropKey('queue_jobs', $key->name, false);
break;
}
}

$this->forge->addKey(['queue', 'status', 'available_at']);
$this->forge->processIndexes('queue_jobs');

$this->forge->dropColumn('queue_jobs', 'priority');
$this->forge->dropColumn('queue_jobs_failed', 'priority');
}
}
1 change: 1 addition & 0 deletions src/Entities/QueueJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class QueueJob extends Entity
'id' => 'integer',
'queue' => 'string',
'payload' => 'json-array',
'priority' => 'string',
'status' => 'integer',
'attempts' => 'integer',
];
Expand Down
1 change: 1 addition & 0 deletions src/Entities/QueueJobFailed.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class QueueJobFailed extends Entity
'connection' => 'string',
'queue' => 'string',
'payload' => 'json-array',
'priority' => 'string',
'exceptions' => 'string',
];
}
15 changes: 15 additions & 0 deletions src/Exceptions/QueueException.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,19 @@ public static function forIncorrectJobHandler(): static
{
return new self(lang('Queue.incorrectJobHandler'));
}

public static function forIncorrectPriorityFormat(): static
{
return new self(lang('Queue.incorrectPriorityFormat'));
}

public static function forTooLongPriorityName(): static
{
return new self(lang('Queue.tooLongPriorityName'));
}

public static function forIncorrectQueuePriority(string $priority, string $queue): static
{
return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue]));
}
}
Loading

0 comments on commit b27a50e

Please sign in to comment.