Skip to content

Commit

Permalink
refactor: replace loose comparisons
Browse files Browse the repository at this point in the history
  • Loading branch information
kenjis authored and michalsn committed Dec 18, 2023
1 parent ae68bd8 commit b9df900
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/Commands/QueueClear.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class QueueClear extends BaseCommand
public function run(array $params)
{
// Read params
if (! $queue = array_shift($params)) {
$queue = array_shift($params);
if ($queue === null) {
CLI::error('The queueName is not specified.');

return EXIT_ERROR;
Expand Down
3 changes: 2 additions & 1 deletion src/Commands/QueueForget.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class QueueForget extends BaseCommand
public function run(array $params)
{
// Read params
if (! $id = array_shift($params)) {
$id = array_shift($params);
if ($id === null) {
CLI::error('The ID of the failed job is not specified.');

return EXIT_ERROR;
Expand Down
3 changes: 2 additions & 1 deletion src/Commands/QueueRetry.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class QueueRetry extends BaseCommand
public function run(array $params)
{
// Read params
if (! $id = array_shift($params)) {
$id = array_shift($params);
if ($id === null) {
CLI::error('The ID of the failed job is not specified.');

return EXIT_ERROR;
Expand Down
3 changes: 2 additions & 1 deletion src/Commands/QueueStop.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class QueueStop extends BaseCommand
public function run(array $params)
{
// Read params
if (! $queue = array_shift($params)) {
$queue = array_shift($params);
if ($queue === null) {
CLI::error('The queueName is not specified.');

return EXIT_ERROR;
Expand Down
3 changes: 2 additions & 1 deletion src/Commands/QueueWork.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public function run(array $params)
$waiting = false;

// Read queue name from params
if (! $queue = array_shift($params)) {
$queue = array_shift($params);
if ($queue === null) {
CLI::error('The queueName is not specified.');

return EXIT_ERROR;
Expand Down
25 changes: 19 additions & 6 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,22 @@ public function pop(string $queue, array $priorities): ?QueueJob
$now = Time::now()->timestamp;

foreach ($priorities as $priority) {
if ($tasks = $this->predis->zrangebyscore("queues:{$queue}:{$priority}", '-inf', $now, ['LIMIT' => [0, 1]])) {
if ($this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks)) {
$tasks = $this->predis->zrangebyscore(
"queues:{$queue}:{$priority}",
'-inf',
$now,
['LIMIT' => [0, 1]]
);
if ($tasks !== []) {
$removed = $this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks);
if ($removed !== 0) {
break;
}
$tasks = [];
}
}

if (empty($tasks[0])) {
if ($tasks === []) {
return null;
}

Expand All @@ -93,7 +100,11 @@ public function later(QueueJob $queueJob, int $seconds): bool
$queueJob->status = Status::PENDING->value;
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;

if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) {
$result = $this->predis->zadd(
"queues:{$queueJob->queue}:{$queueJob->priority}",
[json_encode($queueJob) => $queueJob->available_at->timestamp]
);
if ($result !== 0) {
$this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);
}

Expand Down Expand Up @@ -131,14 +142,16 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
public function clear(?string $queue = null): bool
{
if ($queue !== null) {
if ($keys = $this->predis->keys("queues:{$queue}:*")) {
$keys = $this->predis->keys("queues:{$queue}:*");
if ($keys !== []) {
return $this->predis->del($keys) > 0;
}

return true;
}

if ($keys = $this->predis->keys('queues:*')) {
$keys = $this->predis->keys('queues:*');
if ($keys !== []) {
return $this->predis->del($keys) > 0;
}

Expand Down
9 changes: 7 additions & 2 deletions src/Handlers/RedisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public function pop(string $queue, array $priorities): ?QueueJob
}
}

if (empty($tasks[0])) {
if ($tasks === []) {
return null;
}

Expand All @@ -114,7 +114,12 @@ public function later(QueueJob $queueJob, int $seconds): bool
$queueJob->status = Status::PENDING->value;
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;

if ($result = (int) $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) {
$result = (int) $this->redis->zAdd(
"queues:{$queueJob->queue}:{$queueJob->priority}",
$queueJob->available_at->timestamp,
json_encode($queueJob)
);
if ($result !== 0) {
$this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);
}

Expand Down

0 comments on commit b9df900

Please sign in to comment.