From 0a83fa653a15dea1f6fcdf255cf38338013246e0 Mon Sep 17 00:00:00 2001 From: A G Date: Mon, 23 Dec 2024 00:01:14 -0500 Subject: [PATCH 1/6] Child process queue workers - Remove `ShouldBroadcast` in favor of `ShouldBroadcastNow` in `Events\\ChildProcess` namespace - Implement `QueueWorker::class` - Add new binding to `NativeServiceProvider::class` - Fire up workers: iterate through queue worker config in `NativeServiceProvider::configureApp()` - Test `QueueWorker::up()`, `QueueWorker::down()` - Test `QueueWorkerFake::class` assertions work as expected --- config/nativephp.php | 8 +++ src/Contracts/QueueWorker.php | 12 ++++ src/DTOs/QueueConfig.php | 39 ++++++++++++ src/Events/ChildProcess/ErrorReceived.php | 4 +- src/Events/ChildProcess/MessageReceived.php | 4 +- src/Events/ChildProcess/ProcessExited.php | 4 +- src/Events/ChildProcess/ProcessSpawned.php | 4 +- src/Facades/QueueWorker.php | 29 +++++++++ src/Fakes/QueueWorkerFake.php | 61 ++++++++++++++++++ src/NativeServiceProvider.php | 17 +++++ src/QueueWorker.php | 37 +++++++++++ tests/DTOs/QueueWorkerTest.php | 66 ++++++++++++++++++++ tests/Fakes/FakeQueueWorkerTest.php | 69 +++++++++++++++++++++ tests/QueueWorker/QueueWorkerTest.php | 39 ++++++++++++ 14 files changed, 385 insertions(+), 8 deletions(-) create mode 100644 src/Contracts/QueueWorker.php create mode 100644 src/DTOs/QueueConfig.php create mode 100644 src/Facades/QueueWorker.php create mode 100644 src/Fakes/QueueWorkerFake.php create mode 100644 src/QueueWorker.php create mode 100644 tests/DTOs/QueueWorkerTest.php create mode 100644 tests/Fakes/FakeQueueWorkerTest.php create mode 100644 tests/QueueWorker/QueueWorkerTest.php diff --git a/config/nativephp.php b/config/nativephp.php index b24afec0..5c6e7a8f 100644 --- a/config/nativephp.php +++ b/config/nativephp.php @@ -114,4 +114,12 @@ ], ], ], + + 'queue_workers' => [ + 'default' => [ + 'queues' => ['default'], + 'memory_limit' => 128, + 'timeout' => 60, + ], + ], ]; diff --git a/src/Contracts/QueueWorker.php b/src/Contracts/QueueWorker.php new file mode 100644 index 00000000..a2c4cf9c --- /dev/null +++ b/src/Contracts/QueueWorker.php @@ -0,0 +1,12 @@ + $queuesToConsume + */ + public function __construct( + public readonly string $alias, + public readonly array $queuesToConsume, + public readonly int $memoryLimit, + public readonly int $timeout, + ) {} + + /** + * @return array + */ + public static function fromConfigArray(array $config): array + { + return array_map( + function (array|string $worker, string $alias) { + if (is_string($worker)) { + return new self($worker, ['default'], 128, 60); + } + + return new self( + $alias, + $worker['queues'] ?? ['default'], + $worker['memory_limit'] ?? 128, + $worker['timeout'] ?? 60, + ); + }, + $config, + array_keys($config), + ); + } +} diff --git a/src/Events/ChildProcess/ErrorReceived.php b/src/Events/ChildProcess/ErrorReceived.php index 65db9c66..334e9ccd 100644 --- a/src/Events/ChildProcess/ErrorReceived.php +++ b/src/Events/ChildProcess/ErrorReceived.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ErrorReceived implements ShouldBroadcast +class ErrorReceived implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/MessageReceived.php b/src/Events/ChildProcess/MessageReceived.php index 5f7a432c..04a51c2f 100644 --- a/src/Events/ChildProcess/MessageReceived.php +++ b/src/Events/ChildProcess/MessageReceived.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class MessageReceived implements ShouldBroadcast +class MessageReceived implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/ProcessExited.php b/src/Events/ChildProcess/ProcessExited.php index bf570d84..0dcd5891 100644 --- a/src/Events/ChildProcess/ProcessExited.php +++ b/src/Events/ChildProcess/ProcessExited.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ProcessExited implements ShouldBroadcast +class ProcessExited implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Events/ChildProcess/ProcessSpawned.php b/src/Events/ChildProcess/ProcessSpawned.php index 91fc9170..a49b31bd 100644 --- a/src/Events/ChildProcess/ProcessSpawned.php +++ b/src/Events/ChildProcess/ProcessSpawned.php @@ -3,11 +3,11 @@ namespace Native\Laravel\Events\ChildProcess; use Illuminate\Broadcasting\Channel; -use Illuminate\Contracts\Broadcasting\ShouldBroadcast; +use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow; use Illuminate\Foundation\Events\Dispatchable; use Illuminate\Queue\SerializesModels; -class ProcessSpawned implements ShouldBroadcast +class ProcessSpawned implements ShouldBroadcastNow { use Dispatchable, SerializesModels; diff --git a/src/Facades/QueueWorker.php b/src/Facades/QueueWorker.php new file mode 100644 index 00000000..3b504bec --- /dev/null +++ b/src/Facades/QueueWorker.php @@ -0,0 +1,29 @@ +make(QueueWorkerFake::class), function ($fake) { + static::swap($fake); + }); + } + + protected static function getFacadeAccessor(): string + { + self::clearResolvedInstance(QueueWorkerContract::class); + + return QueueWorkerContract::class; + } +} diff --git a/src/Fakes/QueueWorkerFake.php b/src/Fakes/QueueWorkerFake.php new file mode 100644 index 00000000..6482dd9f --- /dev/null +++ b/src/Fakes/QueueWorkerFake.php @@ -0,0 +1,61 @@ + + */ + public array $ups = []; + + /** + * @var array + */ + public array $downs = []; + + public function up(QueueConfig $config): void + { + $this->ups[] = $config; + } + + public function down(string $alias): void + { + $this->downs[] = $alias; + } + + public function assertUp(Closure $callback): void + { + $hit = empty( + array_filter( + $this->ups, + fn (QueueConfig $up) => $callback($up) === true + ) + ) === false; + + PHPUnit::assertTrue($hit); + } + + public function assertDown(string|Closure $alias): void + { + if (is_callable($alias) === false) { + PHPUnit::assertContains($alias, $this->downs); + + return; + } + + $hit = empty( + array_filter( + $this->downs, + fn (string $down) => $alias($down) === true + ) + ) === false; + + PHPUnit::assertTrue($hit); + } +} diff --git a/src/NativeServiceProvider.php b/src/NativeServiceProvider.php index 22e39913..d459d88d 100644 --- a/src/NativeServiceProvider.php +++ b/src/NativeServiceProvider.php @@ -17,7 +17,9 @@ use Native\Laravel\Contracts\ChildProcess as ChildProcessContract; use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract; use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract; +use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract; use Native\Laravel\Contracts\WindowManager as WindowManagerContract; +use Native\Laravel\DTOs\QueueConfig; use Native\Laravel\Events\EventWatcher; use Native\Laravel\Exceptions\Handler; use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation; @@ -73,6 +75,10 @@ public function packageRegistered() return $app->make(PowerMonitorImplementation::class); }); + $this->app->bind(QueueWorkerContract::class, function (Foundation $app) { + return $app->make(QueueWorker::class); + }); + if (config('nativephp-internal.running')) { $this->app->singleton( \Illuminate\Contracts\Debug\ExceptionHandler::class, @@ -112,6 +118,8 @@ protected function configureApp() config(['session.driver' => 'file']); config(['queue.default' => 'database']); + + $this->fireUpQueueWorkers(); } protected function rewriteStoragePath() @@ -210,4 +218,13 @@ protected function configureDisks(): void ]); } } + + protected function fireUpQueueWorkers(): void + { + $queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers')); + + foreach ($queueConfigs as $queueConfig) { + $this->app->make(QueueWorkerContract::class)->up($queueConfig); + } + } } diff --git a/src/QueueWorker.php b/src/QueueWorker.php new file mode 100644 index 00000000..bfce2105 --- /dev/null +++ b/src/QueueWorker.php @@ -0,0 +1,37 @@ +childProcess->php( + [ + '-d', + "memory_limit={$config->memoryLimit}M", + 'artisan', + 'queue:work', + "--name={$config->alias}", + '--queue='.implode(',', $config->queuesToConsume), + "--memory={$config->memoryLimit}", + "--timeout={$config->timeout}", + ], + $config->alias, + persistent: true, + ); + } + + public function down(string $alias): void + { + $this->childProcess->stop($alias); + } +} diff --git a/tests/DTOs/QueueWorkerTest.php b/tests/DTOs/QueueWorkerTest.php new file mode 100644 index 00000000..0d9bed39 --- /dev/null +++ b/tests/DTOs/QueueWorkerTest.php @@ -0,0 +1,66 @@ +toBeArray(); + expect($configObject)->toHaveCount(count($config)); + + foreach ($config as $alias => $worker) { + if (is_string($worker)) { + expect( + Arr::first( + array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)) + )->queuesToConsume->toBe(['default'] + ); + + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128); + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60); + + continue; + } + + expect( + Arr::first( + array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)) + )->queuesToConsume->toBe($worker['queues'] ?? ['default'] + ); + + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128); + expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60); + } +})->with([ + [ + 'queue_workers' => [ + 'some_worker' => [ + 'queues' => ['default'], + 'memory_limit' => 64, + 'timeout' => 60, + ], + ], + ], + [ + 'queue_workers' => [ + 'some_worker', + 'another_worker', + ], + ], + [ + 'queue_workers' => [ + 'some_worker' => [ + ], + 'another_worker' => [ + 'queues' => ['default', 'another'], + ], + 'yet_another_worker' => [ + 'memory_limit' => 256, + ], + 'one_more_worker' => [ + 'timeout' => 120, + ], + ], + ], +]); diff --git a/tests/Fakes/FakeQueueWorkerTest.php b/tests/Fakes/FakeQueueWorkerTest.php new file mode 100644 index 00000000..4b22f34d --- /dev/null +++ b/tests/Fakes/FakeQueueWorkerTest.php @@ -0,0 +1,69 @@ +toBeInstanceOf(QueueWorkerFake::class); +}); + +it('asserts up using callable', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->up(new QueueConfig('testA', ['default'], 123, 123)); + $fake->up(new QueueConfig('testB', ['default'], 123, 123)); + + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testA'); + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testB'); + + try { + $fake->assertUp(fn (QueueConfig $up) => $up->alias === 'testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); + +it('asserts down using string', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->down('testA'); + $fake->down('testB'); + + $fake->assertDown('testA'); + $fake->assertDown('testB'); + + try { + $fake->assertDown('testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); + +it('asserts down using callable', function () { + swap(QueueWorkerContract::class, $fake = app(QueueWorkerFake::class)); + + $fake->down('testA'); + $fake->down('testB'); + + $fake->assertDown(fn (string $alias) => $alias === 'testA'); + $fake->assertDown(fn (string $alias) => $alias === 'testB'); + + try { + $fake->assertDown(fn (string $alias) => $alias === 'testC'); + } catch (AssertionFailedError) { + return; + } + + $this->fail('Expected assertion to fail'); +}); diff --git a/tests/QueueWorker/QueueWorkerTest.php b/tests/QueueWorker/QueueWorkerTest.php new file mode 100644 index 00000000..a3fbd576 --- /dev/null +++ b/tests/QueueWorker/QueueWorkerTest.php @@ -0,0 +1,39 @@ +toBe([ + '-d', + 'memory_limit=128M', + 'artisan', + 'queue:work', + "--name={$alias}", + '--queue=default', + '--memory=128', + '--timeout=61', + ]); + + expect($alias)->toBe('some_worker'); + expect($env)->toBeNull(); + expect($persistent)->toBeTrue(); + + return true; + }); +}); + +it('hits the child process with relevant alias spin down a queue worker', function () { + ChildProcess::fake(); + + QueueWorker::down('some_worker'); + + ChildProcess::assertStop('some_worker'); +}); From 4ff605e14b683c30ba3a9b8200bbb6d71b98dcfd Mon Sep 17 00:00:00 2001 From: Simon Hamp Date: Sun, 29 Dec 2024 14:26:14 +0000 Subject: [PATCH 2/6] Prevent attempting to boot workers on CLI calls --- src/NativeServiceProvider.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/NativeServiceProvider.php b/src/NativeServiceProvider.php index d459d88d..6ca8d626 100644 --- a/src/NativeServiceProvider.php +++ b/src/NativeServiceProvider.php @@ -119,7 +119,10 @@ protected function configureApp() config(['session.driver' => 'file']); config(['queue.default' => 'database']); - $this->fireUpQueueWorkers(); + // XXX: This logic may need to change when we ditch the internal web server + if (! $this->app->runningInConsole()) { + $this->fireUpQueueWorkers(); + } } protected function rewriteStoragePath() From a997774b61901ed95cec216d465c2bdaced538b8 Mon Sep 17 00:00:00 2001 From: Simon Hamp Date: Sun, 29 Dec 2024 14:46:05 +0000 Subject: [PATCH 3/6] Remove creating workers just by string Rely on keys of the config array to assert uniqueness of worker aliases --- src/DTOs/QueueConfig.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/DTOs/QueueConfig.php b/src/DTOs/QueueConfig.php index 1974d243..0ec3ca01 100644 --- a/src/DTOs/QueueConfig.php +++ b/src/DTOs/QueueConfig.php @@ -21,10 +21,6 @@ public static function fromConfigArray(array $config): array { return array_map( function (array|string $worker, string $alias) { - if (is_string($worker)) { - return new self($worker, ['default'], 128, 60); - } - return new self( $alias, $worker['queues'] ?? ['default'], From 82e462b63b21f62f14ff76bd39628d39985b1f3f Mon Sep 17 00:00:00 2001 From: Simon Hamp Date: Sun, 29 Dec 2024 15:09:48 +0000 Subject: [PATCH 4/6] Allow workers to be instantiated directly by alias --- src/QueueWorker.php | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/QueueWorker.php b/src/QueueWorker.php index bfce2105..508cc2fc 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -12,8 +12,18 @@ public function __construct( private readonly ChildProcessContract $childProcess, ) {} - public function up(QueueConfig $config): void + public function up(string|QueueConfig $config): void { + if (is_string($config) && config()->has("nativephp.queue_workers.{$config}")) { + $config = QueueConfig::fromConfigArray([ + $config => config("nativephp.queue_workers.{$config}") + ])[0]; + } + + if (! $config instanceof QueueConfig) { + throw new \InvalidArgumentException("Invalid queue configuration alias [$config]"); + } + $this->childProcess->php( [ '-d', From 0a470be63db3865be8be434a4044adea55b94600 Mon Sep 17 00:00:00 2001 From: simonhamp Date: Sun, 29 Dec 2024 15:10:28 +0000 Subject: [PATCH 5/6] Fix styling --- src/QueueWorker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/QueueWorker.php b/src/QueueWorker.php index 508cc2fc..1eb0c000 100644 --- a/src/QueueWorker.php +++ b/src/QueueWorker.php @@ -16,7 +16,7 @@ public function up(string|QueueConfig $config): void { if (is_string($config) && config()->has("nativephp.queue_workers.{$config}")) { $config = QueueConfig::fromConfigArray([ - $config => config("nativephp.queue_workers.{$config}") + $config => config("nativephp.queue_workers.{$config}"), ])[0]; } From 3727edbc3fccf930f26082d8e8e813186d94a4c0 Mon Sep 17 00:00:00 2001 From: Simon Hamp Date: Sun, 29 Dec 2024 15:15:31 +0000 Subject: [PATCH 6/6] Fix tests --- tests/DTOs/QueueWorkerTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/DTOs/QueueWorkerTest.php b/tests/DTOs/QueueWorkerTest.php index 0d9bed39..bc1b764f 100644 --- a/tests/DTOs/QueueWorkerTest.php +++ b/tests/DTOs/QueueWorkerTest.php @@ -44,8 +44,8 @@ ], [ 'queue_workers' => [ - 'some_worker', - 'another_worker', + 'some_worker' => [], + 'another_worker' => [], ], ], [