Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ jobs:
matrix:
php: ['8.1', '8.2', '8.3']
stability: ['prefer-lowest', 'prefer-stable']
laravel: ['^10.0', '^11.0']
laravel: ['^10.0', '^11.0', '^12.0']
exclude:
- php: '8.1'
laravel: '^11.0'
- php: '8.1'
laravel: '^12.0'

name: 'PHP ${{ matrix.php }} - Laravel: ${{matrix.laravel}} - ${{ matrix.stability }}'

Expand All @@ -33,15 +35,8 @@ jobs:
extensions: dom, curl, libxml, mbstring, zip
coverage: none

- name: Set up Docker
run: |
sudo rm /usr/local/bin/docker-compose
curl -L https://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname -s`-`uname -m` > docker-compose
chmod +x docker-compose
sudo mv docker-compose /usr/local/bin

- name: Start Docker container
run: docker-compose up -d rabbitmq
run: docker compose up -d rabbitmq

- name: Install dependencies
run: composer update --with='laravel/framework:${{matrix.laravel}}' --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,33 @@ Available protocols : `tcp`, `ssl`, `tls`
],
```

### Network Timeouts

For network timeouts configuration you can use option parameters.
All float values are in seconds and zero value can mean infinite timeout.
Example contains default values.

```php
'connections' => [
// ...

'rabbitmq' => [
// ...

'options' => [
// ...

'connection_timeout' => 3.0,
'read_timeout' => 3.0,
'write_timeout' => 3.0,
'channel_rpc_timeout' => 0.0,
],
],

// ...
],
```

### Octane support

Starting with 13.3.0, this package supports [Laravel Octane](https://laravel.com/docs/octane) out of the box.
Expand Down
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
"require": {
"php": "^8.0",
"ext-json": "*",
"illuminate/queue": "^10.0|^11.0",
"illuminate/queue": "^10.0|^11.0|^12.0",
"php-amqplib/php-amqplib": "^v3.6"
},
"require-dev": {
"phpunit/phpunit": "^10.0|^11.0",
"mockery/mockery": "^1.0",
"laravel/horizon": "^5.0",
"orchestra/testbench": "^7.0|^8.0|^9.0",
"orchestra/testbench": "^7.0|^8.0|^9.0|^10.0",
"laravel/pint": "^1.2",
"laravel/framework": "^9.0|^10.0|^11.0"
"laravel/framework": "^9.0|^10.0|^11.0|^12.0"
},
"autoload": {
"psr-4": {
Expand Down
4 changes: 2 additions & 2 deletions pint.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"preset": "laravel",
"rules": {
"nullable_type_declaration_for_default_null_value": {
"use_nullable_type_declaration": false
"php_unit_method_casing": {
"case": "camel_case"
}
}
}
3 changes: 2 additions & 1 deletion src/Console/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ class ConsumeCommand extends WorkCommand
{--force : Force the worker to run even in maintenance mode}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--rest=0 : Number of seconds to rest between jobs}
{--timeout=60 : The number of seconds a child process can run}
{--tries=1 : Number of times to attempt a job before logging it failed}
{--rest=0 : Number of seconds to rest between jobs}
{--json : Output the queue worker information as JSON}

{--max-priority=}
{--consumer-tag}
Expand Down
2 changes: 1 addition & 1 deletion src/Horizon/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RabbitMQQueue extends BaseRabbitMQQueue
*
* @throws AMQPProtocolChannelException
*/
public function readyNow(string $queue = null): int
public function readyNow(?string $queue = null): int
{
return $this->size($queue);
}
Expand Down
26 changes: 25 additions & 1 deletion src/Queue/Connection/ConfigFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ConfigFactory
*/
public static function make(array $config = []): AMQPConnectionConfig
{
return tap(new AMQPConnectionConfig(), function (AMQPConnectionConfig $connectionConfig) use ($config) {
return tap(new AMQPConnectionConfig, function (AMQPConnectionConfig $connectionConfig) use ($config) {
// Set the connection to a Lazy by default
$connectionConfig->setIsLazy(! in_array(
Arr::get($config, 'lazy') ?? true,
Expand All @@ -38,6 +38,7 @@ public static function make(array $config = []): AMQPConnectionConfig
self::getHostFromConfig($connectionConfig, $config);
self::getHeartbeatFromConfig($connectionConfig, $config);
self::getNetworkProtocolFromConfig($connectionConfig, $config);
self::getTimeoutsFromConfig($connectionConfig, $config);
});
}

Expand Down Expand Up @@ -99,4 +100,27 @@ protected static function getNetworkProtocolFromConfig(AMQPConnectionConfig $con
$connectionConfig->setNetworkProtocol($networkProtocol);
}
}

protected static function getTimeoutsFromConfig(AMQPConnectionConfig $connectionConfig, array $config): void
{
$connectionTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.connection_timeout');
if (is_numeric($connectionTimeout) && floatval($connectionTimeout) >= 0) {
$connectionConfig->setConnectionTimeout((float) $connectionTimeout);
}

$readTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.read_timeout');
if (is_numeric($readTimeout) && floatval($readTimeout) >= 0) {
$connectionConfig->setReadTimeout((float) $readTimeout);
}

$writeTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.write_timeout');
if (is_numeric($writeTimeout) && floatval($writeTimeout) >= 0) {
$connectionConfig->setWriteTimeout((float) $writeTimeout);
}

$chanelRpcTimeout = Arr::get($config, self::CONFIG_OPTIONS.'.channel_rpc_timeout');
if (is_numeric($chanelRpcTimeout) && floatval($chanelRpcTimeout) >= 0) {
$connectionConfig->setChannelRPCTimeout((float) $chanelRpcTimeout);
}
}
}
2 changes: 1 addition & 1 deletion src/Queue/QueueConfigFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class QueueConfigFactory
*/
public static function make(array $config = []): QueueConfig
{
return tap(new QueueConfig(), function (QueueConfig $queueConfig) use ($config) {
return tap(new QueueConfig, function (QueueConfig $queueConfig) use ($config) {
if (! empty($queue = Arr::get($config, 'queue'))) {
$queueConfig->setQueue($queue);
}
Expand Down
14 changes: 7 additions & 7 deletions src/Queue/RabbitMQQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected function publishBatch($jobs, $data = '', $queue = null): void
/**
* @throws AMQPProtocolChannelException
*/
public function bulkRaw(string $payload, string $queue = null, array $options = []): int|string|null
public function bulkRaw(string $payload, ?string $queue = null, array $options = []): int|string|null
{
[$destination, $exchange, $exchangeType, $attempts] = $this->publishProperties($queue, $options);

Expand Down Expand Up @@ -397,7 +397,7 @@ public function deleteExchange(string $name, bool $unused = false): void
*
* @throws AMQPProtocolChannelException
*/
public function isQueueExists(string $name = null): bool
public function isQueueExists(?string $name = null): bool
{
$queueName = $this->getQueue($name);

Expand Down Expand Up @@ -484,7 +484,7 @@ public function bindQueue(string $queue, string $exchange, string $routingKey =
/**
* Purge the queue of messages.
*/
public function purge(string $queue = null): void
public function purge(?string $queue = null): void
{
// create a temporary channel, so the main channel will not be closed on exception
$channel = $this->createChannel();
Expand Down Expand Up @@ -637,7 +637,7 @@ protected function getDelayQueueArguments(string $destination, int $ttl): array
/**
* Get the exchange name, or empty string; as default value.
*/
protected function getExchange(string $exchange = null): string
protected function getExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getExchange();
}
Expand All @@ -654,7 +654,7 @@ protected function getRoutingKey(string $destination): string
/**
* Get the exchangeType, or AMQPExchangeType::DIRECT as default.
*/
protected function getExchangeType(string $type = null): string
protected function getExchangeType(?string $type = null): string
{
$constant = AMQPExchangeType::class.'::'.Str::upper($type ?: $this->getConfig()->getExchangeType());

Expand All @@ -664,7 +664,7 @@ protected function getExchangeType(string $type = null): string
/**
* Get the exchange for failed messages.
*/
protected function getFailedExchange(string $exchange = null): string
protected function getFailedExchange(?string $exchange = null): string
{
return $exchange ?? $this->getConfig()->getFailedExchange();
}
Expand Down Expand Up @@ -699,7 +699,7 @@ protected function isQueueDeclared(string $name): bool
*
* @throws AMQPProtocolChannelException
*/
protected function declareDestination(string $destination, string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void
protected function declareDestination(string $destination, ?string $exchange = null, string $exchangeType = AMQPExchangeType::DIRECT): void
{
// When an exchange is provided and no exchange is present in RabbitMQ, create an exchange.
if ($exchange && ! $this->isExchangeExists($exchange)) {
Expand Down
8 changes: 4 additions & 4 deletions tests/Feature/ConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class ConnectorTest extends \VladimirYuldashev\LaravelQueueRabbitMQ\Tests\TestCase
{
public function testLazyConnection(): void
public function test_lazy_connection(): void
{
$this->app['config']->set('queue.connections.rabbitmq', [
'driver' => 'rabbitmq',
Expand Down Expand Up @@ -55,7 +55,7 @@ public function testLazyConnection(): void
$this->assertTrue($connection->getConnection()->isConnected());
}

public function testLazyStreamConnection(): void
public function test_lazy_stream_connection(): void
{
$this->app['config']->set('queue.connections.rabbitmq', [
'driver' => 'rabbitmq',
Expand Down Expand Up @@ -98,7 +98,7 @@ public function testLazyStreamConnection(): void
$this->assertTrue($connection->getConnection()->isConnected());
}

public function testSslConnection(): void
public function test_ssl_connection(): void
{
$this->markTestSkipped();

Expand Down Expand Up @@ -142,7 +142,7 @@ public function testSslConnection(): void
}

// Test to validate ssl connection params
public function testNoVerificationSslConnection(): void
public function test_no_verification_ssl_connection(): void
{
$this->app['config']->set('queue.connections.rabbitmq', [
'driver' => 'rabbitmq',
Expand Down
10 changes: 5 additions & 5 deletions tests/Feature/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class QueueTest extends TestCase
{
public function setUp(): void
protected function setUp(): void
{
parent::setUp();

Expand All @@ -20,16 +20,16 @@ public function setUp(): void
]);
}

public function testConnection(): void
public function test_connection(): void
{
$this->assertInstanceOf(AMQPStreamConnection::class, $this->connection()->getChannel()->getConnection());
}

public function testWithoutReconnect(): void
public function test_without_reconnect(): void
{
$queue = $this->connection('rabbitmq');

$queue->push(new TestJob());
$queue->push(new TestJob);
sleep(1);
$this->assertSame(1, $queue->size());

Expand All @@ -38,6 +38,6 @@ public function testWithoutReconnect(): void
$this->assertFalse($queue->getConnection()->isConnected());

$this->expectException(AMQPChannelClosedException::class);
$queue->push(new TestJob());
$queue->push(new TestJob);
}
}
4 changes: 2 additions & 2 deletions tests/Feature/SslQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class SslQueueTest extends TestCase
{
public function setUp(): void
protected function setUp(): void
{
$this->markTestSkipped();
}
Expand Down Expand Up @@ -43,7 +43,7 @@ protected function getEnvironmentSetUp($app): void
]);
}

public function testConnection(): void
public function test_connection(): void
{
$this->assertInstanceOf(AMQPSSLConnection::class, $this->connection()->getChannel()->getConnection());
}
Expand Down
Loading