Skip to content
This repository was archived by the owner on Nov 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
],
"require": {
"php": ">=7",
"illuminate/support": "^6|^7|^8|^9|^10.0|^11.0",
"illuminate/console": "^6|^7|^8|^9|^10.0|^11.0",
"symfony/process": "^4|^5|^6|^7.0"
"illuminate/support": "^11.0|^12.0",
"illuminate/queue": "^11.0|^12.0"
},
"autoload": {
"psr-4": {
Expand All @@ -27,7 +26,7 @@
},
"extra": {
"branch-alias": {
"dev-master": "0.7-dev"
"dev-master": "0.8-dev"
},
"laravel": {
"providers": [
Expand Down
39 changes: 3 additions & 36 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,29 @@
# Laravel 5 Async Queue Driver
# Laravel Async Queue Driver

## Push a function/closure to the background.


### For Laravel 5.4, check the [0.6 branch](https://github.com/barryvdh/laravel-async-queue/tree/v0.6.0)

### For Laravel 5.3, check the [0.5 branch](https://github.com/barryvdh/laravel-async-queue/tree/v0.5.0)

Just like the 'sync' driver, this is not a real queue driver. It is always fired immediatly.
Just like the 'sync' or 'deferred' connection, this is not a real queue. It is always fired immediately.
The only difference is that the closure is sent to the background without waiting for the response.
This package is more usable as an alternative for running incidental tasks in the background, without setting up a 'real' queue driver.

> **Note:** This is using the DatabaseQueue, so make sure you set that up first, including migrations.
It is similar to the 'deferred' connection, but it runs in a background process, so might be more suitable for long running tasks.

### Install

Require the latest version of this package with Composer

composer require barryvdh/laravel-async-queue

Add the Service Provider to the providers array in config/app.php

Barryvdh\Queue\AsyncServiceProvider::class,

You need to create the migration table for queues and run it.

$ php artisan queue:table
$ php artisan migrate

You should now be able to use the async driver in config/queue.php. Use the same config as for the database, but use async as driver.

'connections' => array(
...
'async' => array(
'driver' => 'async',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
),
...
}

Set the default to `async`, either by changing to config or setting `QUEUE_DRIVER` in your `.env` file to `async`.

> Note: By default, `php` is used as the binary path to PHP. You can change this by adding the `binary` option to the queue config. You can also add extra arguments (for HHVM for example)

'connections' => array(
...
'async' => array(
'driver' => 'async',
'table' => 'jobs',
'queue' => 'default',
'expire' => 60,
'binary' => 'php',
'binary_args' => '',
),
...
}

It should work the same as the sync driver, so no need to run a queue listener. Downside is that you cannot actually queue or plan things. Queue::later() is also fired directly. For more info see http://laravel.com/docs/queues

183 changes: 7 additions & 176 deletions src/AsyncQueue.php
Original file line number Diff line number Diff line change
@@ -1,187 +1,18 @@
<?php
namespace Barryvdh\Queue;

use Illuminate\Database\Connection;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Queue\Jobs\DatabaseJob;
use Symfony\Component\Process\Process;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Queue\SyncQueue;
use Illuminate\Support\Facades\Concurrency;
use Illuminate\Support\Facades\Queue;

class AsyncQueue extends DatabaseQueue
class AsyncQueue extends SyncQueue
{
/** @var string */
protected $binary;

/** @var string */
protected $binaryArgs;

/** @var string */
protected $connectionName;

/**
* @param \Illuminate\Database\Connection $database
* @param string $table
* @param string $default
* @param int $expire
* @param string $binary
* @param string|array $binaryArgs
*/
public function __construct(Connection $database, $table, $default = 'default', $expire = 60, $binary = 'php', $binaryArgs = '', $connectionName = '')
{
parent::__construct($database, $table, $default, $expire);
$this->binary = $binary;
$this->binaryArgs = $binaryArgs;
$this->connectionName = $connectionName;
}

/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string|null $queue
*
* @return int
* {@inheritdoc}
*/
public function push($job, $data = '', $queue = null)
{
$id = parent::push($job, $data, $queue);
$this->startProcess($id);

return $id;
}

/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$id = parent::pushRaw($payload, $queue, $options);
$this->startProcess($id);

return $id;
}

/**
* Push a new job onto the queue after a delay.
*
* @param \DateTime|int $delay
* @param string $job
* @param mixed $data
* @param string|null $queue
*
* @return int
*/
public function later($delay, $job, $data = '', $queue = null)
{
$id = parent::later($delay, $job, $data, $queue);
$this->startProcess($id);

return $id;
}

/**
* Create an array to insert for the given job.
*
* @param string|null $queue
* @param string $payload
* @param int $availableAt
* @param int $attempts
* @return array
*/
protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
{
$record = parent::buildDatabaseRecord($queue, $payload, $availableAt, $attempts);
$record['reserved_at'] = $this->currentTime();

return $record;
}

/**
* Get the next available job for the queue.
*
* @param int $id
* @return DatabaseJob
*/
public function getJobFromId($id)
{
$job = $this->database->table($this->table)
->where('id', $id)
->first();

if ($job) {
$job = $this->markJobAsReserved(new DatabaseJobRecord((object) $job));
return new DatabaseJob(
$this->container, $this, $job, $this->connectionName, $job->queue
);
}
}

/**
* Make a Process for the Artisan command for the job id.
*
* @param int $id
*
* @return void
*/
public function startProcess($id)
{
$command = $this->getCommand($id);
$cwd = base_path();

$process = new Process([$command], $cwd);
$process->run();
}

/**
* Get the Artisan command as a string for the job id.
*
* @param int $id
*
* @return string
*/
protected function getCommand($id)
{
$connection = $this->connectionName;
$cmd = '%s artisan queue:async %d %s';
$cmd = $this->getBackgroundCommand($cmd);

$binary = $this->getPhpBinary();

return sprintf($cmd, $binary, $id, $connection);
}

/**
* Get the escaped PHP Binary from the configuration
*
* @return string
*/
protected function getPhpBinary()
{
$path = $this->binary;
if ( ! defined('PHP_WINDOWS_VERSION_BUILD')) {
$path = escapeshellarg($path);
}

$args = $this->binaryArgs;
if (is_array($args)) {
$args = implode(' ', $args);
}

return trim($path . ' ' . $args);
}

protected function getBackgroundCommand($cmd)
{
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
return 'start /B ' . $cmd . ' > NUL';
}

return $cmd . ' > /dev/null 2>&1 &';
Concurrency::driver('process')
->defer(fn () => Queue::connection('sync')->push($job, $data, $queue));
}
}
47 changes: 3 additions & 44 deletions src/AsyncServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@

use Barryvdh\Queue\Connectors\AsyncConnector;
use Barryvdh\Queue\Console\AsyncCommand;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Support\ServiceProvider;

class AsyncServiceProvider extends ServiceProvider
class AsyncServiceProvider extends ServiceProvider implements DeferrableProvider
{
/**
* Indicates if loading of the provider is deferred.
*
* @var bool
*/
protected $defer = false;

/**
* Add the connector to the queue drivers.
*
Expand All @@ -23,31 +17,6 @@ class AsyncServiceProvider extends ServiceProvider
public function boot()
{
$this->registerAsyncConnector($this->app['queue']);

$this->commands('command.queue.async');
}

/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->registerAsyncCommand();
}

/**
* Register the queue listener console command.
*
*
* @return void
*/
protected function registerAsyncCommand()
{
$this->app->singleton('command.queue.async', function () {
return new AsyncCommand($this->app['queue.worker']);
});
}

/**
Expand All @@ -60,17 +29,7 @@ protected function registerAsyncCommand()
protected function registerAsyncConnector($manager)
{
$manager->addConnector('async', function () {
return new AsyncConnector($this->app['db']);
return new AsyncConnector;
});
}

/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return ['command.queue.async'];
}
}
22 changes: 4 additions & 18 deletions src/Connectors/AsyncConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,15 @@
namespace Barryvdh\Queue\Connectors;

use Barryvdh\Queue\AsyncQueue;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Support\Arr;
use Illuminate\Queue\Connectors\SyncConnector;

class AsyncConnector extends DatabaseConnector
class AsyncConnector extends SyncConnector
{

/**
* Establish a queue connection.
*
* @param array $config
*
* @return \Illuminate\Contracts\Queue\Queue
* {@inheritdoc}
*/
public function connect(array $config)
{
return new AsyncQueue(
$this->connections->connection(Arr::get($config, 'connection')),
$config['table'],
$config['queue'],
Arr::get($config, 'expire', 60),
Arr::get($config, 'binary', 'php'),
Arr::get($config, 'binary_args', ''),
Arr::get($config, 'connection_name', '')
);
return new AsyncQueue($config['after_commit'] ?? null);
}
}
Loading