From c2f8198b3139b62a37e752227125c2da3716bed7 Mon Sep 17 00:00:00 2001 From: Vladislav Gurkov Date: Thu, 26 Mar 2015 15:02:07 +0300 Subject: [PATCH 1/3] Sleep --- src/AsyncQueue.php | 43 ++++++++++++++++++++++-------------- src/Console/AsyncCommand.php | 19 +++++----------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php index cffc5bd..16b37ba 100644 --- a/src/AsyncQueue.php +++ b/src/AsyncQueue.php @@ -45,7 +45,7 @@ public function __construct(Connection $database, $table, $default = 'default', public function push($job, $data = '', $queue = null) { $id = parent::push($job, $data, $queue); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -61,7 +61,7 @@ public function push($job, $data = '', $queue = null) public function pushRaw($payload, $queue = null, array $options = array()) { $id = parent::push($job, $data, $queue); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -79,7 +79,7 @@ public function pushRaw($payload, $queue = null, array $options = array()) public function later($delay, $job, $data = '', $queue = null) { $id = parent::later($delay, $job, $data, $queue); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -95,36 +95,46 @@ public function later($delay, $job, $data = '', $queue = null) public function release($queue, $job, $delay) { $id = parent::release($queue, $job, $delay); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } + protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) + { + $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay); + + return $this->database->table($this->table)->insertGetId([ + 'queue' => $this->getQueue($queue), + 'payload' => $payload, + 'attempts' => $attempts, + 'reserved' => 1, + 'reserved_at' => $this->getTime(), + 'available_at' => $availableAt->getTimestamp(), + 'created_at' => $this->getTime(), + ]); + } + /** * Get the next available job for the queue. * * @param string|null $queue * @return \StdClass|null */ - public function getJobFromId($queue, $id) + public function getJobFromId($id) { - $this->database->beginTransaction(); $job = $this->database->table($this->table) - ->lockForUpdate() - ->where('queue', $this->getQueue($queue)) - ->where('reserved', 0) ->where('id', $id) ->first(); if($job) { - $this->markJobAsReserved($job->id); return new DatabaseJob( $this->container, $this, $job, $queue ); } } - + /** * Make a Process for the Artisan command for the job id. * @@ -133,9 +143,9 @@ public function getJobFromId($queue, $id) * * @return void */ - public function startProcess($queue, $id) + public function startProcess($id) { - $command = $this->getCommand($queue, $id); + $command = $this->getCommand($id); $cwd = base_path(); $process = new Process($command, $cwd); @@ -150,16 +160,15 @@ public function startProcess($queue, $id) * * @return string */ - protected function getCommand($queue, $id) + protected function getCommand($id) { $connection = $this->connectionName; - $cmd = '%s artisan queue:async %d %s --env=%s --queue=%s'; + $cmd = '%s artisan queue:async %d %s'; $cmd = $this->getBackgroundCommand($cmd); $binary = $this->getPhpBinary(); - $environment = $this->container->environment(); - return sprintf($cmd, $binary, $id, $connection, $environment, $this->getQueue($queue)); + return sprintf($cmd, $binary, $id, $connection); } /** diff --git a/src/Console/AsyncCommand.php b/src/Console/AsyncCommand.php index 46e82b7..134eab6 100644 --- a/src/Console/AsyncCommand.php +++ b/src/Console/AsyncCommand.php @@ -49,16 +49,11 @@ public function __construct(Worker $worker) */ public function fire() { - $queue = $this->option('queue'); - - $id = $this->argument('id'); $connection = $this->argument('connection'); - $delay = $this->option('delay'); - $tries = $this->option('tries'); $this->processJob( - $connection, $queue, $id, $delay, $tries + $connection, $id ); } @@ -67,18 +62,20 @@ public function fire() * Process the job * */ - protected function processJob($connectionName, $queue, $id, $delay, $maxTries) + protected function processJob($connectionName, $id) { $manager = $this->worker->getManager(); $connection = $manager->connection($connectionName); - $job = $connection->getJobFromId($queue, $id); + $job = $connection->getJobFromId($id); // If we're able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if ( ! is_null($job)) { + $sleep = $job->available_at - time(); + sleep($sleep); return $this->worker->process( $manager->getName($connectionName), $job, $maxTries, $delay ); @@ -110,11 +107,7 @@ protected function getArguments() protected function getOptions() { return array( - array('queue', null, InputOption::VALUE_OPTIONAL, 'The queue name', null), - - array('delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0), - - array('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0), + ); } } From 4e950108d9baa1d689c388edf2ea1226fdfc2fa9 Mon Sep 17 00:00:00 2001 From: Vladislav Gurkov Date: Thu, 26 Mar 2015 15:36:19 +0300 Subject: [PATCH 2/3] Sleep --- src/AsyncQueue.php | 3 ++- src/Console/AsyncCommand.php | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php index 16b37ba..691513c 100644 --- a/src/AsyncQueue.php +++ b/src/AsyncQueue.php @@ -5,6 +5,7 @@ use Illuminate\Queue\DatabaseQueue; use Illuminate\Queue\Jobs\DatabaseJob; use Symfony\Component\Process\Process; +use Carbon\Carbon; class AsyncQueue extends DatabaseQueue { @@ -130,7 +131,7 @@ public function getJobFromId($id) if($job) { return new DatabaseJob( - $this->container, $this, $job, $queue + $this->container, $this, $job, $job->queue ); } } diff --git a/src/Console/AsyncCommand.php b/src/Console/AsyncCommand.php index 134eab6..c6f1ef4 100644 --- a/src/Console/AsyncCommand.php +++ b/src/Console/AsyncCommand.php @@ -74,10 +74,10 @@ protected function processJob($connectionName, $id) // we will "sleep" the worker for the specified number of seconds. if ( ! is_null($job)) { - $sleep = $job->available_at - time(); + $sleep = max($job->getDatabaseJob()->available_at - time(), 0); sleep($sleep); return $this->worker->process( - $manager->getName($connectionName), $job, $maxTries, $delay + $manager->getName($connectionName), $job ); } From 7d0c783def5b319b298229007cf65634eb5bfad1 Mon Sep 17 00:00:00 2001 From: Vladislav Gurkov Date: Thu, 26 Mar 2015 16:18:23 +0300 Subject: [PATCH 3/3] Sleep --- src/AsyncQueue.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php index 691513c..59d2f6b 100644 --- a/src/AsyncQueue.php +++ b/src/AsyncQueue.php @@ -1,11 +1,12 @@