diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php index cffc5bd..59d2f6b 100644 --- a/src/AsyncQueue.php +++ b/src/AsyncQueue.php @@ -1,6 +1,8 @@ startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -61,7 +63,7 @@ public function push($job, $data = '', $queue = null) public function pushRaw($payload, $queue = null, array $options = array()) { $id = parent::push($job, $data, $queue); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -79,7 +81,7 @@ public function pushRaw($payload, $queue = null, array $options = array()) public function later($delay, $job, $data = '', $queue = null) { $id = parent::later($delay, $job, $data, $queue); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } @@ -95,36 +97,46 @@ public function later($delay, $job, $data = '', $queue = null) public function release($queue, $job, $delay) { $id = parent::release($queue, $job, $delay); - $this->startProcess($queue, $id); + $this->startProcess($id); return $id; } + protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) + { + $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay); + + return $this->database->table($this->table)->insertGetId([ + 'queue' => $this->getQueue($queue), + 'payload' => $payload, + 'attempts' => $attempts, + 'reserved' => 1, + 'reserved_at' => $this->getTime(), + 'available_at' => $availableAt->getTimestamp(), + 'created_at' => $this->getTime(), + ]); + } + /** * Get the next available job for the queue. * * @param string|null $queue * @return \StdClass|null */ - public function getJobFromId($queue, $id) + public function getJobFromId($id) { - $this->database->beginTransaction(); $job = $this->database->table($this->table) - ->lockForUpdate() - ->where('queue', $this->getQueue($queue)) - ->where('reserved', 0) ->where('id', $id) ->first(); if($job) { - $this->markJobAsReserved($job->id); return new DatabaseJob( - $this->container, $this, $job, $queue + $this->container, $this, $job, $job->queue ); } } - + /** * Make a Process for the Artisan command for the job id. * @@ -133,9 +145,9 @@ public function getJobFromId($queue, $id) * * @return void */ - public function startProcess($queue, $id) + public function startProcess($id) { - $command = $this->getCommand($queue, $id); + $command = $this->getCommand($id); $cwd = base_path(); $process = new Process($command, $cwd); @@ -150,16 +162,15 @@ public function startProcess($queue, $id) * * @return string */ - protected function getCommand($queue, $id) + protected function getCommand($id) { $connection = $this->connectionName; - $cmd = '%s artisan queue:async %d %s --env=%s --queue=%s'; + $cmd = '%s artisan queue:async %d %s'; $cmd = $this->getBackgroundCommand($cmd); $binary = $this->getPhpBinary(); - $environment = $this->container->environment(); - return sprintf($cmd, $binary, $id, $connection, $environment, $this->getQueue($queue)); + return sprintf($cmd, $binary, $id, $connection); } /** diff --git a/src/Console/AsyncCommand.php b/src/Console/AsyncCommand.php index 46e82b7..c6f1ef4 100644 --- a/src/Console/AsyncCommand.php +++ b/src/Console/AsyncCommand.php @@ -49,16 +49,11 @@ public function __construct(Worker $worker) */ public function fire() { - $queue = $this->option('queue'); - - $id = $this->argument('id'); $connection = $this->argument('connection'); - $delay = $this->option('delay'); - $tries = $this->option('tries'); $this->processJob( - $connection, $queue, $id, $delay, $tries + $connection, $id ); } @@ -67,20 +62,22 @@ public function fire() * Process the job * */ - protected function processJob($connectionName, $queue, $id, $delay, $maxTries) + protected function processJob($connectionName, $id) { $manager = $this->worker->getManager(); $connection = $manager->connection($connectionName); - $job = $connection->getJobFromId($queue, $id); + $job = $connection->getJobFromId($id); // If we're able to pull a job off of the stack, we will process it and // then immediately return back out. If there is no job on the queue // we will "sleep" the worker for the specified number of seconds. if ( ! is_null($job)) { + $sleep = max($job->getDatabaseJob()->available_at - time(), 0); + sleep($sleep); return $this->worker->process( - $manager->getName($connectionName), $job, $maxTries, $delay + $manager->getName($connectionName), $job ); } @@ -110,11 +107,7 @@ protected function getArguments() protected function getOptions() { return array( - array('queue', null, InputOption::VALUE_OPTIONAL, 'The queue name', null), - - array('delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0), - - array('tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0), + ); } }