Skip to content
This repository was archived by the owner on Nov 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions src/AsyncQueue.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<?php
namespace Barryvdh\Queue;

use Carbon\Carbon;
use DateTime;
use Illuminate\Database\Connection;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJob;
Expand Down Expand Up @@ -45,7 +47,7 @@ public function __construct(Connection $database, $table, $default = 'default',
public function push($job, $data = '', $queue = null)
{
$id = parent::push($job, $data, $queue);
$this->startProcess($queue, $id);
$this->startProcess($id);

return $id;
}
Expand All @@ -61,7 +63,7 @@ public function push($job, $data = '', $queue = null)
public function pushRaw($payload, $queue = null, array $options = array())
{
$id = parent::push($job, $data, $queue);
$this->startProcess($queue, $id);
$this->startProcess($id);

return $id;
}
Expand All @@ -79,7 +81,7 @@ public function pushRaw($payload, $queue = null, array $options = array())
public function later($delay, $job, $data = '', $queue = null)
{
$id = parent::later($delay, $job, $data, $queue);
$this->startProcess($queue, $id);
$this->startProcess($id);

return $id;
}
Expand All @@ -95,36 +97,46 @@ public function later($delay, $job, $data = '', $queue = null)
public function release($queue, $job, $delay)
{
$id = parent::release($queue, $job, $delay);
$this->startProcess($queue, $id);
$this->startProcess($id);

return $id;
}

protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
{
$availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to import DateTime


return $this->database->table($this->table)->insertGetId([
'queue' => $this->getQueue($queue),
'payload' => $payload,
'attempts' => $attempts,
'reserved' => 1,
'reserved_at' => $this->getTime(),
'available_at' => $availableAt->getTimestamp(),
'created_at' => $this->getTime(),
]);
}

/**
* Get the next available job for the queue.
*
* @param string|null $queue
* @return \StdClass|null
*/
public function getJobFromId($queue, $id)
public function getJobFromId($id)
{
$this->database->beginTransaction();
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('id', $id)
->first();

if($job) {
$this->markJobAsReserved($job->id);

return new DatabaseJob(
$this->container, $this, $job, $queue
$this->container, $this, $job, $job->queue
);
}
}

/**
* Make a Process for the Artisan command for the job id.
*
Expand All @@ -133,9 +145,9 @@ public function getJobFromId($queue, $id)
*
* @return void
*/
public function startProcess($queue, $id)
public function startProcess($id)
{
$command = $this->getCommand($queue, $id);
$command = $this->getCommand($id);
$cwd = base_path();

$process = new Process($command, $cwd);
Expand All @@ -150,16 +162,15 @@ public function startProcess($queue, $id)
*
* @return string
*/
protected function getCommand($queue, $id)
protected function getCommand($id)
{
$connection = $this->connectionName;
$cmd = '%s artisan queue:async %d %s --env=%s --queue=%s';
$cmd = '%s artisan queue:async %d %s';
$cmd = $this->getBackgroundCommand($cmd);

$binary = $this->getPhpBinary();
$environment = $this->container->environment();

return sprintf($cmd, $binary, $id, $connection, $environment, $this->getQueue($queue));
return sprintf($cmd, $binary, $id, $connection);
}

/**
Expand Down
21 changes: 7 additions & 14 deletions src/Console/AsyncCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,11 @@ public function __construct(Worker $worker)
*/
public function fire()
{
$queue = $this->option('queue');


$id = $this->argument('id');
$connection = $this->argument('connection');
$delay = $this->option('delay');
$tries = $this->option('tries');

$this->processJob(
$connection, $queue, $id, $delay, $tries
$connection, $id
);
}

Expand All @@ -67,20 +62,22 @@ public function fire()
* Process the job
*
*/
protected function processJob($connectionName, $queue, $id, $delay, $maxTries)
protected function processJob($connectionName, $id)
{
$manager = $this->worker->getManager();
$connection = $manager->connection($connectionName);

$job = $connection->getJobFromId($queue, $id);
$job = $connection->getJobFromId($id);

// If we're able to pull a job off of the stack, we will process it and
// then immediately return back out. If there is no job on the queue
// we will "sleep" the worker for the specified number of seconds.
if ( ! is_null($job))
{
$sleep = max($job->getDatabaseJob()->available_at - time(), 0);
sleep($sleep);
return $this->worker->process(
$manager->getName($connectionName), $job, $maxTries, $delay
$manager->getName($connectionName), $job
);
}

Expand Down Expand Up @@ -110,11 +107,7 @@ protected function getArguments()
protected function getOptions()
{
return array(
array('queue', null, InputOption::VALUE_OPTIONAL, 'The queue name', null),

array('delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0),

array('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0),

);
}
}