diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index 7627da8..0000000 --- a/.gitattributes +++ /dev/null @@ -1,5 +0,0 @@ -* text=auto - -/.gitattributes export-ignore -/.gitignore export-ignore -/readme.md export-ignore diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 987e2a2..0000000 --- a/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -composer.lock -vendor diff --git a/composer.json b/composer.json index e461f7a..030ab7b 100644 --- a/composer.json +++ b/composer.json @@ -1,28 +1,27 @@ { - "name": "barryvdh/laravel-async-queue", - "description": "Async Queue Driver for Laravel (Push to background)", - "keywords": ["laravel", "queue", "async", "background"], + "name": "armxy/laravel-internal-queue", + "description": "Internal Queue Driver for Laravel (Push to background)", + "keywords": ["laravel", "queue", "async", "internal", "background"], "license": "MIT", "authors": [ { - "name": "Barry vd. Heuvel", - "email": "barryvdh@gmail.com" + "name": "Thanayu Thanakitworawat", + "email": "dead1body@hotmail.com" } ], "require": { - "php": ">=5.4.0", - "illuminate/support": "5.0.x|5.1.x|5.2.x|5.3.x", - "illuminate/console": "5.0.x|5.1.x|5.2.x|5.3.x", - "symfony/process": "~2.6|~3.0" + "php": ">=5.3.0", + "illuminate/support": "4.x|5.0.x", + "illuminate/console": "4.x|5.0.x", + "illuminate/container": "4.x|5.0.x", + "illuminate/queue": "4.x|5.0.x" }, "autoload": { + "classmap": [ + "src/migrations" + ], "psr-4": { - "Barryvdh\\Queue\\": "src/" - } - }, - "extra": { - "branch-alias": { - "dev-master": "0.4-dev" + "Armxy\\Queue\\": "src/" } } } diff --git a/readme.md b/readme.md deleted file mode 100644 index 263e234..0000000 --- a/readme.md +++ /dev/null @@ -1,59 +0,0 @@ -# Laravel 5 Async Queue Driver - -## Push a function/closure to the background. - -Just like the 'sync' driver, this is not a real queue driver. It is always fired immediatly. -The only difference is that the closure is sent to the background without waiting for the response. -This package is more usable as an alternative for running incidental tasks in the background, without setting up a 'real' queue driver. - -> **Note:** This is using the DatabaseQueue, so make sure you set that up first, including migrations. - -### Install - -Require the latest version of this package with Composer - - composer require barryvdh/laravel-async-queue:"0.4.x" - -Add the Service Provider to the providers array in config/app.php - - Barryvdh\Queue\AsyncServiceProvider::class, - -You need to create the migration table for queues and run it. - - $ php artisan queue:table - $ php artisan migrate - -You should now be able to use the async driver in config/queue.php. Use the same config as for the database, but use async as driver. - - 'default' => 'async', - - 'connections' => array( - ... - 'async' => array( - 'driver' => 'async', - 'table' => 'queue_jobs', - 'queue' => 'default', - 'expire' => 60, - ), - ... - } - -By default, `php` is used as the binary path to PHP. You can change this by adding the `binary` option to the queue config. You can also add extra arguments (for HHVM for example) - - 'connections' => array( - ... - 'async' => array( - 'driver' => 'async', - 'table' => 'queue_jobs', - 'queue' => 'default', - 'expire' => 60, - 'binary' => 'php', - 'binary_args' => '', - ), - ... - } - -It should work the same as the sync driver, so no need to run a queue listener. Downside is that you cannot actually queue or plan things. -Queue::later() is also fired directly, but just runs `sleep($delay)` in background.. -For more info see http://laravel.com/docs/queues - diff --git a/src/AsyncQueue.php b/src/AsyncQueue.php deleted file mode 100644 index 519aa5b..0000000 --- a/src/AsyncQueue.php +++ /dev/null @@ -1,190 +0,0 @@ -binary = $binary; - $this->binaryArgs = $binaryArgs; - $this->connectionName = $connectionName; - } - - /** - * Push a new job onto the queue. - * - * @param string $job - * @param mixed $data - * @param string|null $queue - * - * @return int - */ - public function push($job, $data = '', $queue = null) - { - $id = parent::push($job, $data, $queue); - $this->startProcess($id); - - return $id; - } - - /** - * Push a raw payload onto the queue. - * - * @param string $payload - * @param string $queue - * @param array $options - * @return mixed - */ - public function pushRaw($payload, $queue = null, array $options = array()) - { - $id = parent::pushRaw($payload, $queue, $options); - $this->startProcess($id); - - return $id; - } - - /** - * Push a new job onto the queue after a delay. - * - * @param \DateTime|int $delay - * @param string $job - * @param mixed $data - * @param string|null $queue - * - * @return int - */ - public function later($delay, $job, $data = '', $queue = null) - { - $id = parent::later($delay, $job, $data, $queue); - $this->startProcess($id); - - return $id; - } - - protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) - { - $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay); - - return $this->database->table($this->table)->insertGetId([ - 'queue' => $this->getQueue($queue), - 'payload' => $payload, - 'attempts' => $attempts, - 'reserved' => 1, - 'reserved_at' => $this->getTime(), - 'available_at' => $availableAt->getTimestamp(), - 'created_at' => $this->getTime(), - ]); - } - - /** - * Get the next available job for the queue. - * - * @param string|null $queue - * @return \StdClass|null - */ - public function getJobFromId($id) - { - $job = $this->database->table($this->table) - ->where('id', $id) - ->first(); - - if($job) { - - return new DatabaseJob( - $this->container, $this, $job, $job->queue - ); - } - } - - /** - * Make a Process for the Artisan command for the job id. - * - * @param int $jobId - * @param int $delay - * - * @return void - */ - public function startProcess($id) - { - $command = $this->getCommand($id); - $cwd = base_path(); - - $process = new Process($command, $cwd); - $process->run(); - } - - /** - * Get the Artisan command as a string for the job id. - * - * @param int $jobId - * @param int $delay - * - * @return string - */ - protected function getCommand($id) - { - $connection = $this->connectionName; - $cmd = '%s artisan queue:async %d %s'; - $cmd = $this->getBackgroundCommand($cmd); - - $binary = $this->getPhpBinary(); - - return sprintf($cmd, $binary, $id, $connection); - } - - /** - * Get the escaped PHP Binary from the configuration - * - * @return string - */ - protected function getPhpBinary() - { - $path = $this->binary; - if (!defined('PHP_WINDOWS_VERSION_BUILD')) { - $path = escapeshellarg($path); - } - - $args = $this->binaryArgs; - if(is_array($args)){ - $args = implode(' ', $args); - } - return trim($path.' '.$args); - } - - protected function getBackgroundCommand($cmd) - { - if (defined('PHP_WINDOWS_VERSION_BUILD')) { - return 'start /B '.$cmd.' > NUL'; - } else { - return $cmd.' > /dev/null 2>&1 &'; - } - } - - - -} diff --git a/src/AsyncServiceProvider.php b/src/AsyncServiceProvider.php deleted file mode 100644 index af297b6..0000000 --- a/src/AsyncServiceProvider.php +++ /dev/null @@ -1,76 +0,0 @@ -registerAsyncConnector($this->app['queue']); - - $this->commands('command.queue.async'); - } - - /** - * Register the service provider. - * - * @return void - */ - public function register() - { - $this->registerAsyncCommand(); - } - - /** - * Register the queue listener console command. - * - * - * @return void - */ - protected function registerAsyncCommand() - { - $this->app->singleton('command.queue.async', function () { - return new AsyncCommand($this->app['queue.worker']); - }); - } - - /** - * Register the Async queue connector. - * - * @param \Illuminate\Queue\QueueManager $manager - * - * @return void - */ - protected function registerAsyncConnector($manager) - { - $manager->addConnector('async', function () { - return new AsyncConnector($this->app['db']); - }); - } - - /** - * Get the services provided by the provider. - * - * @return array - */ - public function provides() - { - return array('command.queue.async'); - } -} diff --git a/src/Connectors/AsyncConnector.php b/src/Connectors/AsyncConnector.php deleted file mode 100644 index 00e23c1..0000000 --- a/src/Connectors/AsyncConnector.php +++ /dev/null @@ -1,30 +0,0 @@ -connections->connection(array_get($config, 'connection')), - $config['table'], - $config['queue'], - array_get($config, 'expire', 60), - array_get($config, 'binary', 'php'), - array_get($config, 'binary_args', ''), - array_get($config, 'connection_name', '') - ); - } -} diff --git a/src/Connectors/InternalConnector.php b/src/Connectors/InternalConnector.php new file mode 100644 index 0000000..9307241 --- /dev/null +++ b/src/Connectors/InternalConnector.php @@ -0,0 +1,32 @@ + 'php', + 'binary_args' => '', + ); + + /** + * Establish a queue connection. + * + * @param array $config + * + * @return \Illuminate\Queue\QueueInterface + */ + public function connect(array $config) + { + $config = array_merge($this->defaults, $config); + return new InternalQueue($config); + } +} diff --git a/src/Console/AsyncCommand.php b/src/Console/AsyncCommand.php deleted file mode 100644 index c6f1ef4..0000000 --- a/src/Console/AsyncCommand.php +++ /dev/null @@ -1,113 +0,0 @@ -worker = $worker; - } - - /** - * Execute the console command. - * - * @return void - */ - public function fire() - { - $id = $this->argument('id'); - $connection = $this->argument('connection'); - - $this->processJob( - $connection, $id - ); - } - - - /** - * Process the job - * - */ - protected function processJob($connectionName, $id) - { - $manager = $this->worker->getManager(); - $connection = $manager->connection($connectionName); - - $job = $connection->getJobFromId($id); - - // If we're able to pull a job off of the stack, we will process it and - // then immediately return back out. If there is no job on the queue - // we will "sleep" the worker for the specified number of seconds. - if ( ! is_null($job)) - { - $sleep = max($job->getDatabaseJob()->available_at - time(), 0); - sleep($sleep); - return $this->worker->process( - $manager->getName($connectionName), $job - ); - } - - return ['job' => null, 'failed' => false]; - - } - - /** - * Get the console command arguments. - * - * @return array - */ - protected function getArguments() - { - return array( - array('id', InputArgument::REQUIRED, 'The Job ID'), - - array('connection', InputArgument::OPTIONAL, 'The name of connection'), - ); - } - - /** - * Get the console command arguments. - * - * @return array - */ - protected function getOptions() - { - return array( - - ); - } -} diff --git a/src/Console/InternalCommand.php b/src/Console/InternalCommand.php new file mode 100644 index 0000000..ddb1b8d --- /dev/null +++ b/src/Console/InternalCommand.php @@ -0,0 +1,80 @@ +reconnect(); + $item = Job::lock()->findOrFail($this->argument('job_id')); + + if ($delay = (int)$this->option('delay')) { + sleep($delay); + } + + $job = new InternalJob($this->laravel, $item); + + $job->fire(); + } + + /** + * Get the console command arguments. + * + * @return array + */ + protected function getArguments() + { + return array( + array('job_id', InputArgument::REQUIRED, 'The Job ID'), + ); + } + + /** + * Get the console command arguments. + * + * @return array + */ + protected function getOptions() + { + return array( + array('delay', 'D', InputOption::VALUE_OPTIONAL, 'The delay in seconds', 0), + ); + } +} diff --git a/src/InternalQueue.php b/src/InternalQueue.php new file mode 100644 index 0000000..c902d7a --- /dev/null +++ b/src/InternalQueue.php @@ -0,0 +1,181 @@ +config = $config; + } + + /** + * Push a new job onto the queue. + * + * @param string $job + * @param mixed $data + * @param string|null $queue + * @param bool $startNow + * + * @return int + */ + public function push($job, $data = '', $queue = null, $startNow = false) + { + $id = $this->storeJob($job, $data, 0); + + if($startNow){ + + $this->startProcess($id, 0); + } + + return $id; + } + + /** + * Store the job in the database. + * + * Returns the id of the job. + * + * @param string $job + * @param mixed $data + * @param int $delay + * + * @return int + */ + public function storeJob($job, $data, $delay = 0) + { + $payload = $this->createPayload($job, $data); + + $job = new Job(); + $job->status = Job::STATUS_OPEN; + $job->delay = $delay; + $job->payload = $payload; + $job->save(); + + return $job->id; + } + + /** + * Make a Process for the Artisan command for the job id. + * + * @param int $jobId + * @param int $delay + * + * @return void + */ + public function startProcess($jobId, $delay = 0) + { + $command = $this->getCommand($jobId, $delay); + $cwd = $this->container['path.base']; + $process = new Process($command, $cwd); + $process->run(); + + //chdir($this->container['path.base']); + //exec($this->getCommand($jobId, $delay)); + } + + /** + * Get the Artisan command as a string for the job id. + * + * @param int $jobId + * @param int $delay + * + * @return string + */ + protected function getCommand($jobId, $delay = 0) + { + $cmd = '%s artisan queue:internal %d --env=%s --delay=%d'; + $cmd = $this->getBackgroundCommand($cmd); + + $binary = $this->getPhpBinary(); + $environment = $this->container->environment(); + + return sprintf($cmd, $binary, $jobId, $environment, $delay); + } + + /** + * Get the escaped PHP Binary from the configuration + * + * @return string + */ + protected function getPhpBinary() + { + $path = $this->config['binary']; + if (!defined('PHP_WINDOWS_VERSION_BUILD')) { + $path = escapeshellarg($path); + } + + $args = $this->config['binary_args']; + if(is_array($args)){ + $args = implode(' ', $args); + } + return trim($path.' '.$args); + } + + protected function getBackgroundCommand($cmd) + { + return $cmd.' > /dev/null 2>&1 &'; + /* + if (defined('PHP_WINDOWS_VERSION_BUILD')) { + return 'start /B '.$cmd.' > NUL'; + } else { + return $cmd.' > /dev/null 2>&1 &'; + } + */ + } + + /** + * Push a new job onto the queue after a delay. + * + * @param \DateTime|int $delay + * @param string $job + * @param mixed $data + * @param string|null $queue + * + * @return int + */ + public function later($delay, $job, $data = '', $queue = null) + { + $delay = $this->getSeconds($delay); + $id = $this->storeJob($job, $data, $delay); + $this->startProcess($id, $delay); + + return $id; + } + + + /** + * Get next queued job from database. + * + * @param string|null $queue + * + * @return int + */ + public function pop($queue = null){ + + $queueModel = new Queue(); + + $firstJob = $queueModel->getFirstQueue(); + + if($firstJob !== null){ + + $job = new InternalJob($this->container, $firstJob); + + return $job; + } + + return null; + } + +} diff --git a/src/InternalQueueServiceProvider.php b/src/InternalQueueServiceProvider.php new file mode 100644 index 0000000..a47614b --- /dev/null +++ b/src/InternalQueueServiceProvider.php @@ -0,0 +1,77 @@ +registerInternalConnector($this->app['queue']); + + $this->commands('command.queue.internal'); + } + + /** + * Register the service provider. + * + * @return void + */ + public function register() + { + $this->registerInternalCommand($this->app); + } + + /** + * Register the queue listener console command. + * + * @param \Illuminate\Foundation\Application $app + * + * @return void + */ + protected function registerInternalCommand($app) + { + $app['command.queue.internal'] = $app->share(function ($app) { + return new InternalCommand(); + }); + } + + /** + * Register the Internal queue connector. + * + * @param \Illuminate\Queue\QueueManager $manager + * + * @return void + */ + protected function registerInternalConnector($manager) + { + $manager->addConnector('internal', function () { + return new InternalConnector(); + }); + } + + /** + * Get the services provided by the provider. + * + * @return array + */ + public function provides() + { + return array('command.queue.internal'); + } +} diff --git a/src/Jobs/InternalJob.php b/src/Jobs/InternalJob.php new file mode 100644 index 0000000..8df660b --- /dev/null +++ b/src/Jobs/InternalJob.php @@ -0,0 +1,153 @@ +container = $container; + $this->job = $job; + } + + /** + * Fire the job. + * + * @return void + */ + public function fire() + { + \DB::connection()->reconnect(); + // Get the payload from the job + $payload = $this->parsePayload($this->getRawBody()); + + // Mark job as started + $this->job->status = Job::STATUS_STARTED; + $this->job->retries++; + $this->job->save(); + + // Fire the actual job + $this->resolveAndFire($payload); + + // If job is not deleted, mark as finished + if (!$this->deleted && $this->job->status != Job::STATUS_OPEN) { + $this->job->status = Job::STATUS_FINISHED; + $this->job->save(); + } + } + + /** + * Get the raw body string for the job. + * + * @return string + */ + public function getRawBody() + { + return $this->job->payload; + } + + /** + * Release the job back into the queue. + * + * @param int $delay + * @return void + */ + public function release($delay = 0) + { + // Update the Job status + $this->job->status = Job::STATUS_OPEN; + + // Wait for the delay + if ($delay) { + + sleep($this->getSeconds($delay)); + } + + $this->job->save(); + } + + /** + * Get the number of times the job has been attempted. + * + * @return int + */ + public function attempts() + { + return (int) $this->job->retries; + } + + public function retryIncrement() + { + $this->job->retries++; + $this->job->save(); + } + + public function getDelay() + { + if ($this->job->delay !== '') { + return (int)$this->job->delay; + } else { + return 0; + } + } + + /** + * Delete the job from the queue. + * + * @return void + */ + public function delete() + { + parent::delete(); + $this->job->delete(); + } + + /** + * Parse the payload to an array. + * + * @param string $payload + * + * @return array|null + */ + protected function parsePayload($payload) + { + return json_decode($payload, true); + } + + /** + * Get the name of the queue the job belongs to. + * + * @return string + */ + public function getQueue() + { + return $this->getJobId(); + } + + /** + * Get the job identifier. + * + * @return string + */ + public function getJobId() + { + return $this->job->id; + } +} diff --git a/src/Models/Job.php b/src/Models/Job.php new file mode 100644 index 0000000..6d0ed6d --- /dev/null +++ b/src/Models/Job.php @@ -0,0 +1,27 @@ +orderBy('created_at')->first(); + + if(!is_null($firstQueue)){ + + $job = Job::find($firstQueue->id); + + return $job; + } + } +} \ No newline at end of file diff --git a/src/migrations/2014_11_28_193005_create_internal_queue_table.php b/src/migrations/2014_11_28_193005_create_internal_queue_table.php new file mode 100644 index 0000000..d736347 --- /dev/null +++ b/src/migrations/2014_11_28_193005_create_internal_queue_table.php @@ -0,0 +1,35 @@ +increments('id'); + $table->integer('status')->default(0); + $table->integer('retries')->default(0); + $table->integer('delay')->default(0); + $table->longText('payload')->nullable(); + $table->timestamps(); + }); + } + + /** + * Reverse the migrations. + * + * @return void + */ + public function down() + { + Schema::drop('internal_queue'); + } +}