diff --git a/.github/workflows/phpunit.yml b/.github/workflows/phpunit.yml
index 8401939..7b04668 100644
--- a/.github/workflows/phpunit.yml
+++ b/.github/workflows/phpunit.yml
@@ -123,7 +123,7 @@ jobs:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- - 5672
+ - 5672:5672
options: >-
--health-cmd="rabbitmq-diagnostics -q ping"
--health-interval=10s
diff --git a/composer.json b/composer.json
index c0c6084..bf1ace2 100644
--- a/composer.json
+++ b/composer.json
@@ -16,10 +16,10 @@
"php": "^8.1"
},
"require-dev": {
- "codeigniter4/devkit": "^1.0",
+ "codeigniter4/devkit": "^1.3",
"codeigniter4/framework": "^4.3",
"predis/predis": "^2.0",
- "phpstan/phpstan-strict-rules": "^1.5",
+ "phpstan/phpstan-strict-rules": "^2.0",
"php-amqplib/php-amqplib": "^3.7"
},
"minimum-stability": "dev",
diff --git a/docs/configuration.md b/docs/configuration.md
index de7b56d..03d0a3d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -18,7 +18,6 @@ Available options:
- [$redis](#redis)
- [$predis](#predis)
- [$rabbitmq](#rabbitmq)
-- [$keepDoneJobs](#keepdonejobs)
- [$keepFailedJobs](#keepfailedjobs)
- [$queueDefaultPriority](#queuedefaultpriority)
- [$queuePriorities](#queuepriorities)
@@ -77,10 +76,6 @@ The configuration settings for `rabbitmq` handler. You need to have [php-amqplib
* `password` - The password for authentication. Default value: `guest`.
* `vhost` - The virtual host to use. Default value: `/`.
-### $keepDoneJobs
-
-If the job is done, should we keep it in the table? Default value: `false`.
-
### $keepFailedJobs
If the job failed, should we move it to the failed jobs table? Default value: `true`.
diff --git a/docs/events.md b/docs/events.md
new file mode 100644
index 0000000..ccb5bd7
--- /dev/null
+++ b/docs/events.md
@@ -0,0 +1,166 @@
+# Events
+
+The Queue library provides a comprehensive event system that allows you to monitor and react to various queue operations. Events are triggered at key points in the queue lifecycle, enabling you to implement logging, monitoring, alerting, or custom business logic.
+
+---
+
+## Overview
+
+Queue events are built on top of CodeIgniter's native event system and are emitted automatically by the queue handlers and workers. Each event carries contextual information about what happened, when it happened, and any relevant data.
+
+Events allow you to:
+
+- **Monitor** queue performance and job execution
+- **Log** queue activities for debugging and auditing
+- **Alert** administrators when jobs fail or workers stop
+- **Collect metrics** for analytics and reporting
+- **Implement custom logic** based on queue operations
+
+## Available Events
+
+All event names are available as constants in the `CodeIgniter\Queue\Events\QueueEventManager` class.
+
+### Job Events
+
+Events related to individual job lifecycle:
+
+| Event | Constant | When Triggered | Metadata |
+|-------|----------|----------------|----------|
+| `queue.job.pushed` | `JOB_PUSHED` | A job is added to the queue | `job_class`, `job` |
+| `queue.job.push.failed` | `JOB_PUSH_FAILED` | Failed to push a job to the queue | `job_class`, `exception` |
+| `queue.job.processing.started` | `JOB_PROCESSING_STARTED` | A worker starts processing a job | `job_class`, `job`, `worker_id` |
+| `queue.job.processing.completed` | `JOB_PROCESSING_COMPLETED` | A job completes successfully | `job_class`, `job`, `processing_time`, `worker_id` |
+| `queue.job.failed` | `JOB_FAILED` | A job fails after exhausting all retry attempts | `job_class`, `job`, `exception`, `processing_time`, `worker_id` |
+
+### Worker Events
+
+Events related to queue worker lifecycle:
+
+| Event | Constant | When Triggered | Metadata |
+|-------|----------|----------------|----------|
+| `queue.worker.started` | `WORKER_STARTED` | A queue worker starts processing | `priorities`, `config`, `worker_id` |
+| `queue.worker.stopped` | `WORKER_STOPPED` | A queue worker stops | `priorities`, `uptime_seconds`, `jobs_processed`, `worker_id`, `stop_reason`, `memory_usage`, `memory_peak` |
+
+Worker stop reasons include:
+
+- `signal_stop` - Stopped by signal (SIGTERM, SIGINT, etc.)
+- `memory_limit` - Memory limit reached
+- `time_limit` - Max time limit reached
+- `job_limit` - Max jobs processed
+- `planned_stop` - Scheduled stop via `queue:stop` command
+- `empty_queue` - Queue is empty (with `--stop-when-empty` flag)
+
+### Handler Events
+
+Events related to queue handler connections:
+
+| Event | Constant | When Triggered | Metadata |
+|-------|----------|----------------|----------|
+| `queue.handler.connection.established` | `HANDLER_CONNECTION_ESTABLISHED` | Successfully connected to queue backend | `config` |
+| `queue.handler.connection.failed` | `HANDLER_CONNECTION_FAILED` | Failed to connect to queue backend | `exception`, `config` |
+
+### Operation Events
+
+Events related to queue operations:
+
+| Event | Constant | When Triggered | Metadata |
+|-------|----------|----------------|----------|
+| `queue.cleared` | `QUEUE_CLEARED` | Queue is cleared via `queue:clear` command | None |
+
+## Listening to Events
+
+You can listen to queue events using CodeIgniter's standard event system. Add your listeners in `app/Config/Events.php`:
+
+```php
+ $event->getJobClass(),
+ 'time' => $event->getProcessingTime(),
+ ]);
+});
+
+// Listen to job failures
+Events::on(QueueEventManager::JOB_FAILED, static function (QueueEvent $event) {
+ log_message('error', 'Job failed: {job} - {error}', [
+ 'job' => $event->getJobClass(),
+ 'error' => $event->getExceptionMessage(),
+ ]);
+});
+
+// Listen to worker lifecycle
+Events::on(QueueEventManager::WORKER_STARTED, static function (QueueEvent $event) {
+ log_message('info', 'Worker started for queue: {queue}', [
+ 'queue' => $event->getQueue(),
+ ]);
+});
+
+Events::on(QueueEventManager::WORKER_STOPPED, static function (QueueEvent $event) {
+ $metadata = $event->getAllMetadata();
+
+ log_message('info', 'Worker stopped: {reason}, processed {jobs} jobs in {time}s', [
+ 'reason' => $metadata['stop_reason'],
+ 'jobs' => $metadata['jobs_processed'],
+ 'time' => round($metadata['uptime_seconds'], 2),
+ ]);
+});
+```
+
+## Event Object
+
+All event listeners receive a `QueueEvent` object with the following methods:
+
+### Basic Information
+
+```php
+$event->getType(); // Event type (e.g., 'queue.job.pushed')
+$event->getHandler(); // Handler name (e.g., 'database', 'redis')
+$event->getQueue(); // Queue name (e.g., 'emails', 'default')
+$event->getTimestamp(); // Time object when event occurred
+```
+
+### Event Type Checks
+
+```php
+$event->isJobEvent(); // True for job-related events
+$event->isWorkerEvent(); // True for worker-related events
+$event->isConnectionEvent(); // True for connection-related events
+$event->isOperationEvent(); // True for operation events
+```
+
+### Job Information (for job events)
+
+```php
+$event->getJobId(); // Job ID
+$event->getJobClass(); // Fully qualified job class name
+$event->getPriority(); // Job priority
+$event->getAttempts(); // Number of attempts
+$event->getStatus(); // Job status
+$event->getProcessingTime(); // Processing time in seconds
+$event->getProcessingTimeMs(); // Processing time in milliseconds
+```
+
+### Error Information (for failed events)
+
+```php
+$event->getException(); // Exception object (if any)
+$event->getExceptionMessage(); // Exception message
+$event->hasFailed(); // True if event contains an exception
+```
+
+### Metadata Access
+
+```php
+// Get specific metadata
+$event->getMetadata('worker_id');
+$event->getMetadata('priority', 'default');
+
+// Get all metadata
+$metadata = $event->getAllMetadata();
+```
diff --git a/docs/index.md b/docs/index.md
index 58ad9ca..0bd0af0 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -46,6 +46,7 @@ If you use `RabbitMQ` (you still need a relational database to store failed jobs
* [Basic usage](basic-usage.md)
* [Running queues](running-queues.md)
* [Commands](commands.md)
+* [Events](events.md)
* [Troubleshooting](troubleshooting.md)
### Acknowledgements
diff --git a/mkdocs.yml b/mkdocs.yml
index b93a433..dbbaf8b 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -77,4 +77,5 @@ nav:
- Basic usage: basic-usage.md
- Running queues: running-queues.md
- Commands: commands.md
+ - Events: events.md
- Troubleshooting: troubleshooting.md
diff --git a/phpstan.neon.dist b/phpstan.neon.dist
index 45e4dd1..399b947 100644
--- a/phpstan.neon.dist
+++ b/phpstan.neon.dist
@@ -42,6 +42,9 @@ parameters:
allRules: false
disallowedLooseComparison: true
booleansInConditions: true
- disallowedConstructs: true
+ disallowedBacktick: true
+ disallowedEmpty: true
+ disallowedImplicitArrayCreation: true
+ disallowedShortTernary: true
matchingInheritedMethodNames: true
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 48fe085..917c289 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -77,6 +77,7 @@
./src/
+ ./src/Compatibility/SignalTrait.php
./src/Commands/Generators
./src/Commands/Utils
./src/Config
diff --git a/psalm.xml b/psalm.xml
index 47cd59e..753f7e4 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -9,12 +9,17 @@
cacheDirectory="build/psalm/"
findUnusedBaselineEntry="true"
findUnusedCode="false"
+ ensureOverrideAttribute="false"
>
+
+
+
+
diff --git a/rector.php b/rector.php
index cec36f6..34f22b8 100644
--- a/rector.php
+++ b/rector.php
@@ -21,6 +21,7 @@
use Rector\CodingStyle\Rector\ClassMethod\MakeInheritedMethodVisibilitySameAsParentRector;
use Rector\CodingStyle\Rector\FuncCall\CountArrayToEmptyArrayComparisonRector;
use Rector\Config\RectorConfig;
+use Rector\DeadCode\Rector\ClassMethod\RemoveUnusedPrivateMethodRector;
use Rector\DeadCode\Rector\ClassMethod\RemoveUnusedPromotedPropertyRector;
use Rector\EarlyReturn\Rector\Foreach_\ChangeNestedForeachIfsToEarlyContinueRector;
use Rector\EarlyReturn\Rector\If_\ChangeIfElseValueAssignToEarlyReturnRector;
@@ -28,10 +29,10 @@
use Rector\EarlyReturn\Rector\Return_\PreparedValueToEarlyReturnRector;
use Rector\Php55\Rector\String_\StringClassNameToClassConstantRector;
use Rector\Php73\Rector\FuncCall\StringifyStrNeedlesRector;
-use Rector\Php81\Rector\ClassMethod\NewInInitializerRector;
use Rector\PHPUnit\AnnotationsToAttributes\Rector\Class_\AnnotationWithValueToAttributeRector;
use Rector\PHPUnit\AnnotationsToAttributes\Rector\ClassMethod\DataProviderAnnotationToAttributeRector;
use Rector\PHPUnit\CodeQuality\Rector\Class_\YieldDataProviderRector;
+use Rector\PHPUnit\CodeQuality\Rector\MethodCall\AssertEmptyNullableObjectToAssertInstanceofRector;
use Rector\PHPUnit\Set\PHPUnitSetList;
use Rector\Privatization\Rector\Property\PrivatizeFinalClassPropertyRector;
use Rector\Set\ValueObject\LevelSetList;
@@ -95,8 +96,11 @@
// Supported from PHPUnit 10
DataProviderAnnotationToAttributeRector::class,
- NewInInitializerRector::class => [
- 'src/Payloads/Payload.php',
+ AssertEmptyNullableObjectToAssertInstanceofRector::class,
+
+ // Skip onInterruption method - called dynamically via reflection in SignalTrait
+ RemoveUnusedPrivateMethodRector::class => [
+ __DIR__ . '/src/Commands/QueueWork.php',
],
]);
diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php
index 4f2bf22..088cb24 100644
--- a/src/Commands/QueueWork.php
+++ b/src/Commands/QueueWork.php
@@ -15,14 +15,18 @@
use CodeIgniter\CLI\BaseCommand;
use CodeIgniter\CLI\CLI;
+use CodeIgniter\Queue\Compatibility\SignalTrait;
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
+use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use Exception;
use Throwable;
class QueueWork extends BaseCommand
{
+ use SignalTrait;
+
/**
* The Command's Group
*
@@ -30,6 +34,11 @@ class QueueWork extends BaseCommand
*/
protected $group = 'Queue';
+ /**
+ * Worker ID for tracking this worker instance
+ */
+ private string $workerId;
+
/**
* The Command's Name
*
@@ -126,6 +135,9 @@ public function run(array $params)
$startTime = microtime(true);
+ // Generate unique worker ID
+ $this->workerId = sprintf('worker-%s-%d', gethostname(), getmypid());
+
CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan'), 'cyan');
if ($priority !== 'default') {
@@ -134,15 +146,38 @@ public function run(array $params)
CLI::write(PHP_EOL);
+ // Convert priority string to array
$priority = array_map('trim', explode(',', (string) $priority));
- while (true) {
+ // Register signals for graceful shutdown
+ $this->registerSignals();
+
+ // Emit worker started event
+ QueueEventManager::workerStarted(
+ handler: service('queue')->name(),
+ queue: $queue,
+ priorities: $priority,
+ config: [
+ 'max_jobs' => $maxJobs,
+ 'max_time' => $maxTime,
+ 'memory_limit' => $memory . 'MB',
+ 'sleep' => $sleep,
+ 'rest' => $rest,
+ ],
+ metadata: [
+ 'worker_id' => $this->workerId,
+ ],
+ );
+
+ while ($this->isRunning()) {
$work = service('queue')->pop($queue, $priority);
if ($work === null) {
if ($stopWhenEmpty) {
CLI::write('No job available. Stopping.', 'yellow');
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'empty_queue');
+
return EXIT_SUCCESS;
}
@@ -154,14 +189,26 @@ public function run(array $params)
sleep((int) $sleep);
if ($this->checkMemory($memory)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'memory_limit');
+
+ return EXIT_SUCCESS;
+ }
+
+ if ($this->shouldTerminate()) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'signal_stop');
+
return EXIT_SUCCESS;
}
if ($this->checkStop($queue, $startTime)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'planned_stop');
+
return EXIT_SUCCESS;
}
if ($this->maxTimeCheck($maxTime, $startTime)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'time_limit');
+
return EXIT_SUCCESS;
}
} else {
@@ -175,19 +222,33 @@ public function run(array $params)
$this->handleWork($work, $config, $tries, $retryAfter);
+ if ($this->shouldTerminate()) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'signal_stop');
+
+ return EXIT_SUCCESS;
+ }
+
if ($this->checkMemory($memory)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'memory_limit');
+
return EXIT_SUCCESS;
}
if ($this->checkStop($queue, $startTime)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'planned_stop');
+
return EXIT_SUCCESS;
}
if ($this->maxJobsCheck($maxJobs, $countJobs)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'job_limit');
+
return EXIT_SUCCESS;
}
if ($this->maxTimeCheck($maxTime, $startTime)) {
+ $this->emitWorkerStoppedEvent($queue, $priority, $startTime, $countJobs, 'time_limit');
+
return EXIT_SUCCESS;
}
@@ -237,10 +298,21 @@ private function readOptions(array $params, QueueConfig $config, string $queue):
private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?int $retryAfter): void
{
timer()->start('work');
- $payload = $work->payload;
+ $startTime = microtime(true);
+ $payload = $work->payload;
$payloadMetadata = null;
+ // Emit job processing started event
+ QueueEventManager::jobProcessingStarted(
+ handler: service('queue')->name(),
+ queue: $work->queue,
+ job: $work,
+ metadata: [
+ 'worker_id' => $this->workerId,
+ ],
+ );
+
try {
// Load payload metadata
$payloadMetadata = PayloadMetadata::fromArray($payload['metadata'] ?? []);
@@ -253,7 +325,18 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
$job->process();
// Mark as done
- service('queue')->done($work, $config->keepDoneJobs);
+ service('queue')->done($work);
+
+ // Emit job processing completed event
+ QueueEventManager::jobProcessingCompleted(
+ handler: service('queue')->name(),
+ queue: $work->queue,
+ job: $work,
+ processingTime: microtime(true) - $startTime,
+ metadata: [
+ 'worker_id' => $this->workerId,
+ ],
+ );
CLI::write('The processing of this job was successful', 'green');
@@ -265,6 +348,17 @@ private function handleWork(QueueJob $work, QueueConfig $config, ?int $tries, ?i
service('queue')->later($work, $retryAfter ?? $job->getRetryAfter());
} else {
// Mark as failed
+ QueueEventManager::jobFailed(
+ handler: service('queue')->name(),
+ queue: $work->queue,
+ job: $work,
+ exception: $err,
+ processingTime: microtime(true) - $startTime,
+ metadata: [
+ 'worker_id' => $this->workerId,
+ ],
+ );
+
service('queue')->failed($work, $err, $config->keepFailedJobs);
}
CLI::write('The processing of this job failed', 'red');
@@ -395,4 +489,36 @@ private function checkStop(string $queue, float $startTime): bool
return false;
}
+
+ /**
+ * Handle interruption
+ */
+ private function onInterruption(int $signal): void
+ {
+ $this->requestTermination();
+
+ CLI::write(sprintf('The termination of this worker has been requested with: %s.', $this->getSignalName($signal)), 'yellow');
+ }
+
+ /**
+ * Emit worker stopped event with runtime statistics
+ */
+ private function emitWorkerStoppedEvent(string $queue, array $priorities, float $startTime, int $jobsProcessed, string $reason): void
+ {
+ $uptime = microtime(true) - $startTime;
+
+ QueueEventManager::workerStopped(
+ handler: service('queue')->name(),
+ queue: $queue,
+ priorities: $priorities,
+ uptime: $uptime,
+ jobsProcessed: $jobsProcessed,
+ metadata: [
+ 'worker_id' => $this->workerId,
+ 'stop_reason' => $reason,
+ 'memory_usage' => memory_get_usage(true),
+ 'memory_peak' => memory_get_peak_usage(true),
+ ],
+ );
+ }
}
diff --git a/src/Compatibility/SignalTrait.php b/src/Compatibility/SignalTrait.php
new file mode 100644
index 0000000..c5532d9
--- /dev/null
+++ b/src/Compatibility/SignalTrait.php
@@ -0,0 +1,410 @@
+
+ *
+ * For the full copyright and license information, please view
+ * the LICENSE file that was distributed with this source code.
+ */
+
+namespace CodeIgniter\Queue\Compatibility;
+
+use Closure;
+use CodeIgniter\CLI\CLI;
+
+if (trait_exists('CodeIgniter\CLI\SignalTrait')) {
+ trait SignalTrait
+ {
+ use \CodeIgniter\CLI\SignalTrait;
+ }
+} else {
+ /**
+ * Signal Trait
+ *
+ * Provides PCNTL signal handling capabilities for CLI commands.
+ * Requires the PCNTL extension (Unix only).
+ *
+ * Bundled compatibility version for CI4 < 4.7
+ */
+ trait SignalTrait
+ {
+ /**
+ * Whether the process should continue running (false = termination requested).
+ */
+ private bool $running = true;
+
+ /**
+ * Whether signals are currently blocked.
+ */
+ private bool $signalsBlocked = false;
+
+ /**
+ * Array of registered signals.
+ *
+ * @var list
+ */
+ private array $registeredSignals = [];
+
+ /**
+ * Signal-to-method mapping.
+ *
+ * @var array
+ */
+ private array $signalMethodMap = [];
+
+ /**
+ * Cached result of PCNTL extension availability.
+ */
+ private static ?bool $isPcntlAvailable = null;
+
+ /**
+ * Cached result of POSIX extension availability.
+ */
+ private static ?bool $isPosixAvailable = null;
+
+ /**
+ * Check if PCNTL extension is available (cached).
+ */
+ protected function isPcntlAvailable(): bool
+ {
+ if (self::$isPcntlAvailable === null) {
+ if (is_windows()) {
+ self::$isPcntlAvailable = false;
+ } else {
+ self::$isPcntlAvailable = extension_loaded('pcntl');
+ if (! self::$isPcntlAvailable) {
+ CLI::write('PCNTL extension is not available. Signal handling will be disabled.', 'yellow');
+ }
+ }
+ }
+
+ return self::$isPcntlAvailable;
+ }
+
+ /**
+ * Check if POSIX extension is available (cached).
+ */
+ protected function isPosixAvailable(): bool
+ {
+ if (self::$isPosixAvailable === null) {
+ self::$isPosixAvailable = is_windows() ? false : extension_loaded('posix');
+ }
+
+ return self::$isPosixAvailable;
+ }
+
+ /**
+ * Register signal handlers.
+ *
+ * @param list $signals List of signals to handle
+ * @param array $methodMap Optional signal-to-method mapping
+ */
+ protected function registerSignals(
+ array $signals = [SIGTERM, SIGINT, SIGHUP, SIGQUIT],
+ array $methodMap = [],
+ ): void {
+ if (! $this->isPcntlAvailable()) {
+ return;
+ }
+
+ if (! $this->isPosixAvailable() && (in_array(SIGTSTP, $signals, true) || in_array(SIGCONT, $signals, true))) {
+ CLI::write('POSIX extension is not available. SIGTSTP and SIGCONT signals will be disabled.', 'yellow');
+ $signals = array_diff($signals, [SIGTSTP, SIGCONT]);
+
+ // Remove from method map as well
+ unset($methodMap[SIGTSTP], $methodMap[SIGCONT]);
+
+ if ($signals === []) {
+ return;
+ }
+ }
+
+ // Enable async signals for immediate response
+ pcntl_async_signals(true);
+
+ $this->signalMethodMap = $methodMap;
+
+ foreach ($signals as $signal) {
+ if (pcntl_signal($signal, [$this, 'handleSignal'])) {
+ $this->registeredSignals[] = $signal;
+ } else {
+ $signal = $this->getSignalName($signal);
+ CLI::write("Failed to register signal handler for {$signal}.", 'red');
+ }
+ }
+ }
+
+ /**
+ * Handle incoming signals.
+ */
+ protected function handleSignal(int $signal): void
+ {
+ $this->callCustomHandler($signal);
+
+ // Apply standard Unix signal behavior for registered signals
+ switch ($signal) {
+ case SIGTERM:
+ case SIGINT:
+ case SIGQUIT:
+ case SIGHUP:
+ $this->running = false;
+ break;
+
+ case SIGTSTP:
+ // Restore default handler and re-send signal to actually suspend
+ pcntl_signal(SIGTSTP, SIG_DFL);
+ posix_kill(posix_getpid(), SIGTSTP);
+ break;
+
+ case SIGCONT:
+ // Re-register SIGTSTP handler after resume
+ pcntl_signal(SIGTSTP, [$this, 'handleSignal']);
+ break;
+ }
+ }
+
+ /**
+ * Call custom signal handler if one is mapped for this signal.
+ * Falls back to generic onInterruption() method if no explicit mapping exists.
+ */
+ private function callCustomHandler(int $signal): void
+ {
+ // Check for explicit mapping first
+ $method = $this->signalMethodMap[$signal] ?? null;
+
+ if ($method !== null && method_exists($this, $method)) {
+ $this->{$method}($signal);
+
+ return;
+ }
+
+ // If no explicit mapping, try generic catch-all method
+ if (method_exists($this, 'onInterruption')) { // @phpstan-ignore-line
+ $this->onInterruption($signal);
+ }
+ }
+
+ /**
+ * Check if command should terminate.
+ */
+ protected function shouldTerminate(): bool
+ {
+ return ! $this->running;
+ }
+
+ /**
+ * Check if the process is currently running (not terminated).
+ */
+ protected function isRunning(): bool
+ {
+ return $this->running;
+ }
+
+ /**
+ * Request immediate termination.
+ */
+ protected function requestTermination(): void
+ {
+ $this->running = false;
+ }
+
+ /**
+ * Reset all states (for testing or restart scenarios).
+ */
+ protected function resetState(): void
+ {
+ $this->running = true;
+
+ // Unblock signals if they were blocked
+ if ($this->signalsBlocked) {
+ $this->unblockSignals();
+ }
+ }
+
+ /**
+ * Execute a callable with ALL signals blocked to prevent ANY interruption during critical operations.
+ *
+ * This blocks ALL interruptible signals including:
+ * - Termination signals (SIGTERM, SIGINT, etc.)
+ * - Pause/resume signals (SIGTSTP, SIGCONT)
+ * - Custom signals (SIGUSR1, SIGUSR2)
+ *
+ * Only SIGKILL (unblockable) can still terminate the process.
+ * Use this for database transactions, file operations, or any critical atomic operations.
+ *
+ * @template TReturn
+ *
+ * @param Closure():TReturn $operation
+ *
+ * @return TReturn
+ */
+ protected function withSignalsBlocked(Closure $operation)
+ {
+ $this->blockSignals();
+
+ try {
+ return $operation();
+ } finally {
+ $this->unblockSignals();
+ }
+ }
+
+ /**
+ * Block ALL interruptible signals during critical sections.
+ * Only SIGKILL (unblockable) can terminate the process.
+ */
+ protected function blockSignals(): void
+ {
+ if (! $this->signalsBlocked && $this->isPcntlAvailable()) {
+ // Block ALL signals that could interrupt critical operations
+ pcntl_sigprocmask(SIG_BLOCK, [
+ SIGTERM, SIGINT, SIGHUP, SIGQUIT, // Termination signals
+ SIGTSTP, SIGCONT, // Pause/resume signals
+ SIGUSR1, SIGUSR2, // Custom signals
+ SIGPIPE, SIGALRM, // Other common signals
+ ]);
+ $this->signalsBlocked = true;
+ }
+ }
+
+ /**
+ * Unblock previously blocked signals.
+ */
+ protected function unblockSignals(): void
+ {
+ if ($this->signalsBlocked && $this->isPcntlAvailable()) {
+ // Unblock the same signals we blocked
+ pcntl_sigprocmask(SIG_UNBLOCK, [
+ SIGTERM, SIGINT, SIGHUP, SIGQUIT, // Termination signals
+ SIGTSTP, SIGCONT, // Pause/resume signals
+ SIGUSR1, SIGUSR2, // Custom signals
+ SIGPIPE, SIGALRM, // Other common signals
+ ]);
+ $this->signalsBlocked = false;
+ }
+ }
+
+ /**
+ * Check if signals are currently blocked.
+ */
+ protected function signalsBlocked(): bool
+ {
+ return $this->signalsBlocked;
+ }
+
+ /**
+ * Add or update signal-to-method mapping at runtime.
+ */
+ protected function mapSignal(int $signal, string $method): void
+ {
+ $this->signalMethodMap[$signal] = $method;
+ }
+
+ /**
+ * Get human-readable signal name.
+ */
+ protected function getSignalName(int $signal): string
+ {
+ return match ($signal) {
+ SIGTERM => 'SIGTERM',
+ SIGINT => 'SIGINT',
+ SIGHUP => 'SIGHUP',
+ SIGQUIT => 'SIGQUIT',
+ SIGUSR1 => 'SIGUSR1',
+ SIGUSR2 => 'SIGUSR2',
+ SIGPIPE => 'SIGPIPE',
+ SIGALRM => 'SIGALRM',
+ SIGTSTP => 'SIGTSTP',
+ SIGCONT => 'SIGCONT',
+ default => "Signal {$signal}",
+ };
+ }
+
+ /**
+ * Unregister all signals (cleanup).
+ */
+ protected function unregisterSignals(): void
+ {
+ if (! $this->isPcntlAvailable()) {
+ return;
+ }
+
+ foreach ($this->registeredSignals as $signal) {
+ pcntl_signal($signal, SIG_DFL);
+ }
+
+ $this->registeredSignals = [];
+ $this->signalMethodMap = [];
+ }
+
+ /**
+ * Check if signals are registered.
+ */
+ protected function hasSignals(): bool
+ {
+ return $this->registeredSignals !== [];
+ }
+
+ /**
+ * Get list of registered signals.
+ *
+ * @return list
+ */
+ protected function getSignals(): array
+ {
+ return $this->registeredSignals;
+ }
+
+ /**
+ * Get comprehensive process state information.
+ *
+ * @return array{
+ * pid: int,
+ * running: bool,
+ * pcntl_available: bool,
+ * registered_signals: int,
+ * registered_signals_names: array,
+ * signals_blocked: bool,
+ * explicit_mappings: int,
+ * memory_usage_mb: float,
+ * memory_peak_mb: float,
+ * session_id?: false|int,
+ * process_group?: false|int,
+ * has_controlling_terminal?: bool
+ * }
+ */
+ protected function getProcessState(): array
+ {
+ $pid = getmypid();
+ $state = [
+ // Process identification
+ 'pid' => $pid,
+ 'running' => $this->running,
+
+ // Signal handling status
+ 'pcntl_available' => $this->isPcntlAvailable(),
+ 'registered_signals' => count($this->registeredSignals),
+ 'registered_signals_names' => array_map([$this, 'getSignalName'], $this->registeredSignals),
+ 'signals_blocked' => $this->signalsBlocked,
+ 'explicit_mappings' => count($this->signalMethodMap),
+
+ // System resources
+ 'memory_usage_mb' => round(memory_get_usage(true) / 1024 / 1024, 2),
+ 'memory_peak_mb' => round(memory_get_peak_usage(true) / 1024 / 1024, 2),
+ ];
+
+ // Add terminal control info if POSIX extension is available
+ if ($this->isPosixAvailable()) {
+ $state['session_id'] = posix_getsid($pid);
+ $state['process_group'] = posix_getpgid($pid);
+ $state['has_controlling_terminal'] = posix_isatty(STDIN);
+ }
+
+ return $state;
+ }
+ }
+}
diff --git a/src/Config/Queue.php b/src/Config/Queue.php
index 3a5ac76..af8a999 100644
--- a/src/Config/Queue.php
+++ b/src/Config/Queue.php
@@ -88,11 +88,6 @@ class Queue extends BaseConfig
'vhost' => '/',
];
- /**
- * Whether to keep the DONE jobs in the queue.
- */
- public bool $keepDoneJobs = false;
-
/**
* Whether to save failed jobs for later review.
*/
diff --git a/src/Events/QueueEvent.php b/src/Events/QueueEvent.php
new file mode 100644
index 0000000..14f0d00
--- /dev/null
+++ b/src/Events/QueueEvent.php
@@ -0,0 +1,222 @@
+
+ *
+ * For the full copyright and license information, please view
+ * the LICENSE file that was distributed with this source code.
+ */
+
+namespace CodeIgniter\Queue\Events;
+
+use CodeIgniter\I18n\Time;
+use CodeIgniter\Queue\Entities\QueueJob;
+use Throwable;
+
+class QueueEvent
+{
+ private readonly Time $timestamp;
+
+ public function __construct(
+ private readonly string $type,
+ private readonly string $handler,
+ private readonly ?string $queue = null,
+ private readonly array $metadata = [],
+ ?Time $timestamp = null,
+ ) {
+ $this->timestamp = $timestamp ?? Time::now();
+ }
+
+ /**
+ * Get event type
+ */
+ public function getType(): string
+ {
+ return $this->type;
+ }
+
+ /**
+ * Get handler name
+ */
+ public function getHandler(): string
+ {
+ return $this->handler;
+ }
+
+ /**
+ * Get queue name
+ */
+ public function getQueue(): ?string
+ {
+ return $this->queue;
+ }
+
+ /**
+ * Get timestamp
+ */
+ public function getTimestamp(): Time
+ {
+ return $this->timestamp;
+ }
+
+ /**
+ * Get all metadata
+ */
+ public function getAllMetadata(): array
+ {
+ return $this->metadata;
+ }
+
+ /**
+ * Get metadata value by key
+ */
+ public function getMetadata(string $key, mixed $default = null): mixed
+ {
+ return $this->metadata[$key] ?? $default;
+ }
+
+ /**
+ * Check if this is a job-related event
+ */
+ public function isJobEvent(): bool
+ {
+ return str_starts_with($this->type, 'queue.job.');
+ }
+
+ /**
+ * Check if this is a worker-related event
+ */
+ public function isWorkerEvent(): bool
+ {
+ return str_starts_with($this->type, 'queue.worker.');
+ }
+
+ /**
+ * Check if this is an operation event (like queue.cleared)
+ */
+ public function isOperationEvent(): bool
+ {
+ return str_contains($this->type, 'queue.')
+ && ! $this->isJobEvent()
+ && ! $this->isWorkerEvent()
+ && ! $this->isConnectionEvent();
+ }
+
+ /**
+ * Check if this is a connection event
+ */
+ public function isConnectionEvent(): bool
+ {
+ return str_starts_with($this->type, 'queue.handler.');
+ }
+
+ // Job-related convenience methods (metadata-based)
+
+ /**
+ * Get job ID (for job events)
+ */
+ public function getJobId(): ?int
+ {
+ $job = $this->getMetadata('job');
+
+ return $job instanceof QueueJob ? $job->id : $this->getMetadata('job_id');
+ }
+
+ /**
+ * Get job priority (for job events)
+ */
+ public function getPriority(): ?string
+ {
+ $job = $this->getMetadata('job');
+
+ return $job instanceof QueueJob ? $job->priority : $this->getMetadata('priority');
+ }
+
+ /**
+ * Get number of attempts (for job events)
+ */
+ public function getAttempts(): ?int
+ {
+ $job = $this->getMetadata('job');
+
+ return $job instanceof QueueJob ? $job->attempts : $this->getMetadata('attempts');
+ }
+
+ /**
+ * Get job status (for job events)
+ */
+ public function getStatus(): ?int
+ {
+ $job = $this->getMetadata('job');
+
+ return $job instanceof QueueJob ? $job->status : $this->getMetadata('status');
+ }
+
+ /**
+ * Get job class name (for job events)
+ */
+ public function getJobClass(): ?string
+ {
+ return $this->getMetadata('job_class');
+ }
+
+ /**
+ * Get processing time in seconds (for job events)
+ */
+ public function getProcessingTime(): float
+ {
+ return (float) $this->getMetadata('processing_time', 0.0);
+ }
+
+ /**
+ * Get processing time in milliseconds (for job events)
+ */
+ public function getProcessingTimeMs(): float
+ {
+ return $this->getProcessingTime() * 1000;
+ }
+
+ /**
+ * Get exception (for failed events)
+ */
+ public function getException(): ?Throwable
+ {
+ return $this->getMetadata('exception');
+ }
+
+ /**
+ * Get exception message (for failed events)
+ */
+ public function getExceptionMessage(): ?string
+ {
+ $exception = $this->getException();
+
+ return $exception?->getMessage();
+ }
+
+ /**
+ * Check if event has failed
+ */
+ public function hasFailed(): bool
+ {
+ return $this->getException() !== null;
+ }
+
+ /**
+ * Convert to array for serialization
+ */
+ public function toArray(): array
+ {
+ return [
+ 'type' => $this->type,
+ 'handler' => $this->handler,
+ 'queue' => $this->queue,
+ 'metadata' => $this->metadata,
+ 'timestamp' => $this->timestamp->toDateTimeString(),
+ ];
+ }
+}
diff --git a/src/Events/QueueEventManager.php b/src/Events/QueueEventManager.php
new file mode 100644
index 0000000..a344a32
--- /dev/null
+++ b/src/Events/QueueEventManager.php
@@ -0,0 +1,250 @@
+
+ *
+ * For the full copyright and license information, please view
+ * the LICENSE file that was distributed with this source code.
+ */
+
+namespace CodeIgniter\Queue\Events;
+
+use CodeIgniter\Events\Events;
+use CodeIgniter\Queue\Entities\QueueJob;
+use Throwable;
+
+class QueueEventManager
+{
+ // Event names for queue operations
+ public const JOB_PUSHED = 'queue.job.pushed';
+ public const JOB_PUSH_FAILED = 'queue.job.push.failed';
+ public const JOB_PROCESSING_STARTED = 'queue.job.processing.started';
+ public const JOB_PROCESSING_COMPLETED = 'queue.job.processing.completed';
+ public const JOB_FAILED = 'queue.job.failed';
+ public const QUEUE_CLEARED = 'queue.cleared';
+ public const WORKER_STARTED = 'queue.worker.started';
+ public const WORKER_STOPPED = 'queue.worker.stopped';
+ public const HANDLER_CONNECTION_FAILED = 'queue.handler.connection.failed';
+ public const HANDLER_CONNECTION_ESTABLISHED = 'queue.handler.connection.established';
+
+ /**
+ * Emit job pushed event
+ */
+ public static function jobPushed(
+ string $handler,
+ string $queue,
+ QueueJob $job,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::JOB_PUSHED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'job_class' => $job->payload['job'],
+ 'job' => $job,
+ ], $metadata),
+ );
+
+ Events::trigger(self::JOB_PUSHED, $event);
+ }
+
+ /**
+ * Emit job push failed event
+ */
+ public static function jobPushFailed(
+ string $handler,
+ string $queue,
+ string $jobClass,
+ Throwable $exception,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::JOB_PUSH_FAILED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'job_class' => $jobClass,
+ 'exception' => $exception,
+ ], $metadata),
+ );
+
+ Events::trigger(self::JOB_PUSH_FAILED, $event);
+ }
+
+ /**
+ * Emit job processing started event
+ */
+ public static function jobProcessingStarted(
+ string $handler,
+ string $queue,
+ QueueJob $job,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::JOB_PROCESSING_STARTED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'job_class' => $job->payload['job'],
+ 'job' => $job,
+ ], $metadata),
+ );
+
+ Events::trigger(self::JOB_PROCESSING_STARTED, $event);
+ }
+
+ /**
+ * Emit job processing completed event
+ */
+ public static function jobProcessingCompleted(
+ string $handler,
+ string $queue,
+ QueueJob $job,
+ float $processingTime,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::JOB_PROCESSING_COMPLETED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'job_class' => $job->payload['job'],
+ 'job' => $job,
+ 'processing_time' => $processingTime,
+ ], $metadata),
+ );
+
+ Events::trigger(self::JOB_PROCESSING_COMPLETED, $event);
+ }
+
+ /**
+ * Emit job failed event
+ */
+ public static function jobFailed(
+ string $handler,
+ string $queue,
+ QueueJob $job,
+ Throwable $exception,
+ float $processingTime = 0.0,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::JOB_FAILED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'job_class' => $job->payload['job'],
+ 'job' => $job,
+ 'exception' => $exception,
+ 'processing_time' => $processingTime,
+ ], $metadata),
+ );
+
+ Events::trigger(self::JOB_FAILED, $event);
+ }
+
+ /**
+ * Emit queue cleared event
+ */
+ public static function queueCleared(
+ string $handler,
+ ?string $queue = null,
+ ): void {
+ $event = new QueueEvent(
+ type: self::QUEUE_CLEARED,
+ handler: $handler,
+ queue: $queue,
+ );
+
+ Events::trigger(self::QUEUE_CLEARED, $event);
+ }
+
+ /**
+ * Emit worker started event
+ */
+ public static function workerStarted(
+ string $handler,
+ string $queue,
+ array $priorities,
+ array $config = [],
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::WORKER_STARTED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'priorities' => $priorities,
+ 'config' => $config,
+ ], $metadata),
+ );
+
+ Events::trigger(self::WORKER_STARTED, $event);
+ }
+
+ /**
+ * Emit worker stopped event
+ */
+ public static function workerStopped(
+ string $handler,
+ string $queue,
+ array $priorities,
+ float $uptime,
+ int $jobsProcessed,
+ array $metadata = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::WORKER_STOPPED,
+ handler: $handler,
+ queue: $queue,
+ metadata: array_merge([
+ 'priorities' => $priorities,
+ 'uptime_seconds' => $uptime,
+ 'jobs_processed' => $jobsProcessed,
+ ], $metadata),
+ );
+
+ Events::trigger(self::WORKER_STOPPED, $event);
+ }
+
+ /**
+ * Emit handler connection established event
+ */
+ public static function handlerConnectionEstablished(
+ string $handler,
+ array $config = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::HANDLER_CONNECTION_ESTABLISHED,
+ handler: $handler,
+ metadata: ['config' => $config],
+ );
+
+ Events::trigger(self::HANDLER_CONNECTION_ESTABLISHED, $event);
+ }
+
+ /**
+ * Emit handler connection failed event
+ */
+ public static function handlerConnectionFailed(
+ string $handler,
+ Throwable $exception,
+ array $config = [],
+ ): void {
+ $event = new QueueEvent(
+ type: self::HANDLER_CONNECTION_FAILED,
+ handler: $handler,
+ metadata: [
+ 'exception' => $exception,
+ 'config' => $config,
+ ],
+ );
+
+ Events::trigger(self::HANDLER_CONNECTION_FAILED, $event);
+ }
+}
diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php
index f364802..38c9beb 100644
--- a/src/Handlers/BaseHandler.php
+++ b/src/Handlers/BaseHandler.php
@@ -48,7 +48,7 @@ abstract public function later(QueueJob $queueJob, int $seconds): bool;
abstract public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool;
- abstract public function done(QueueJob $queueJob, bool $keepJob): bool;
+ abstract public function done(QueueJob $queueJob): bool;
abstract public function clear(?string $queue = null): bool;
diff --git a/src/Handlers/DatabaseHandler.php b/src/Handlers/DatabaseHandler.php
index 403e7cd..27eb34e 100644
--- a/src/Handlers/DatabaseHandler.php
+++ b/src/Handlers/DatabaseHandler.php
@@ -13,16 +13,19 @@
namespace CodeIgniter\Queue\Handlers;
+use CodeIgniter\Exceptions\CriticalError;
use CodeIgniter\I18n\Time;
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
+use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Models\QueueJobModel;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use ReflectionException;
+use RuntimeException;
use Throwable;
class DatabaseHandler extends BaseHandler implements QueueInterface
@@ -31,8 +34,25 @@ class DatabaseHandler extends BaseHandler implements QueueInterface
public function __construct(protected QueueConfig $config)
{
- $connection = db_connect($config->database['dbGroup'], $config->database['getShared']);
- $this->jobModel = model(QueueJobModel::class, true, $connection);
+ try {
+ $connection = db_connect($config->database['dbGroup'], $config->database['getShared']);
+ $this->jobModel = model(QueueJobModel::class, true, $connection);
+
+ // Emit connection established event
+ QueueEventManager::handlerConnectionEstablished(
+ handler: $this->name(),
+ config: $config->database,
+ );
+ } catch (Throwable $e) {
+ // Emit connection failed event
+ QueueEventManager::handlerConnectionFailed(
+ handler: $this->name(),
+ exception: $e,
+ config: $config->database,
+ );
+
+ throw new CriticalError('Queue: Database connection failed. ' . $e->getMessage());
+ }
}
/**
@@ -64,13 +84,39 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
try {
$jobId = $this->jobModel->insert($queueJob);
} catch (Throwable $e) {
+ // Emit push failed event
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $e,
+ );
+
return QueuePushResult::failure($e->getMessage());
}
if ($jobId === 0) {
- return QueuePushResult::failure('Failed to insert job into the database.');
+ $err = new RuntimeException('Failed to insert job into the database.');
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $err,
+ );
+
+ return QueuePushResult::failure($err->getMessage());
}
+ // Set the job ID for the successful push event
+ $queueJob->id = $jobId;
+
+ // Emit job pushed event
+ QueueEventManager::jobPushed(
+ handler: $this->name(),
+ queue: $queue,
+ job: $queueJob,
+ );
+
return QueuePushResult::success($jobId);
}
@@ -122,16 +168,10 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
}
/**
- * Change job status to DONE od delete it.
- *
- * @throws ReflectionException
+ * Change job status to DONE or delete it.
*/
- public function done(QueueJob $queueJob, bool $keepJob): bool
+ public function done(QueueJob $queueJob): bool
{
- if ($keepJob) {
- return $this->jobModel->update($queueJob->id, ['status' => Status::DONE->value]);
- }
-
return $this->jobModel->delete($queueJob->id);
}
@@ -144,6 +184,16 @@ public function clear(?string $queue = null): bool
$this->jobModel->where('queue', $queue);
}
- return $this->jobModel->delete();
+ $result = $this->jobModel->delete();
+
+ if ($result) {
+ // Emit queue cleared event
+ QueueEventManager::queueCleared(
+ handler: $this->name(),
+ queue: $queue,
+ );
+ }
+
+ return $result;
}
}
diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php
index 22be7cf..6c1695b 100644
--- a/src/Handlers/PredisHandler.php
+++ b/src/Handlers/PredisHandler.php
@@ -19,12 +19,14 @@
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
+use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use Exception;
use Predis\Client;
+use RuntimeException;
use Throwable;
class PredisHandler extends BaseHandler implements QueueInterface
@@ -44,7 +46,20 @@ public function __construct(protected QueueConfig $config)
throw new CriticalError('Queue: LUA script for Predis is not available.');
}
$this->luaScript = file_get_contents($luaScript);
+
+ // Emit connection established event
+ QueueEventManager::handlerConnectionEstablished(
+ handler: $this->name(),
+ config: $config->predis,
+ );
} catch (Exception $e) {
+ // Emit connection failed event
+ QueueEventManager::handlerConnectionFailed(
+ handler: $this->name(),
+ exception: $e,
+ config: $config->predis,
+ );
+
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
}
}
@@ -82,16 +97,38 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
try {
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]);
} catch (Throwable $e) {
+ // Emit push failed event
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $e,
+ );
+
return QueuePushResult::failure('Unexpected Redis error: ' . $e->getMessage());
} finally {
$this->priority = $this->delay = null;
}
- $this->priority = $this->delay = null;
+ if ($result > 0) {
+ // Emit job pushed event
+ QueueEventManager::jobPushed(
+ handler: $this->name(),
+ queue: $queue,
+ job: $queueJob,
+ );
+
+ return QueuePushResult::success($jobId);
+ }
+ $error = new RuntimeException('Job already exists in the queue.');
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $error,
+ );
- return $result > 0
- ? QueuePushResult::success($jobId)
- : QueuePushResult::failure('Job already exists in the queue.');
+ return QueuePushResult::failure($error->getMessage());
}
/**
@@ -160,13 +197,8 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
/**
* Change job status to DONE or delete it.
*/
- public function done(QueueJob $queueJob, bool $keepJob): bool
+ public function done(QueueJob $queueJob): bool
{
- if ($keepJob) {
- $queueJob->status = Status::DONE->value;
- $this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]);
- }
-
return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", [$queueJob->id]);
}
@@ -176,19 +208,21 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
public function clear(?string $queue = null): bool
{
if ($queue !== null) {
- $keys = $this->predis->keys("queues:{$queue}:*");
- if ($keys !== []) {
- return $this->predis->del($keys) > 0;
- }
-
- return true;
+ $keys = $this->predis->keys("queues:{$queue}:*");
+ $result = $keys !== [] ? $this->predis->del($keys) > 0 : true;
+ } else {
+ $keys = $this->predis->keys('queues:*');
+ $result = $keys !== [] ? $this->predis->del($keys) > 0 : true;
}
- $keys = $this->predis->keys('queues:*');
- if ($keys !== []) {
- return $this->predis->del($keys) > 0;
+ if ($result) {
+ // Emit queue cleared event
+ QueueEventManager::queueCleared(
+ handler: $this->name(),
+ queue: $queue,
+ );
}
- return true;
+ return $result;
}
}
diff --git a/src/Handlers/RabbitMQHandler.php b/src/Handlers/RabbitMQHandler.php
index 9f67bc0..3041a27 100644
--- a/src/Handlers/RabbitMQHandler.php
+++ b/src/Handlers/RabbitMQHandler.php
@@ -18,6 +18,7 @@
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
+use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Exceptions\QueueException;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payloads\Payload;
@@ -68,7 +69,20 @@ public function __construct(protected QueueConfig $config)
$this->channel->set_return_listener(static function ($replyCode, $replyText, $exchange, $routingKey, $properties, $body): void {
log_message('error', "RabbitMQ returned unroutable message: {$replyCode} {$replyText} exchange={$exchange} routing_key={$routingKey}");
});
+
+ // Emit connection established event
+ QueueEventManager::handlerConnectionEstablished(
+ handler: $this->name(),
+ config: $config->rabbitmq,
+ );
} catch (Throwable $e) {
+ // Emit connection failed event
+ QueueEventManager::handlerConnectionFailed(
+ handler: $this->name(),
+ exception: $e,
+ config: $config->rabbitmq,
+ );
+
throw new CriticalError('Queue: RabbitMQ connection failed. ' . $e->getMessage());
}
}
@@ -138,8 +152,23 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
$this->priority = $this->delay = null;
+ // Emit job pushed event
+ QueueEventManager::jobPushed(
+ handler: $this->name(),
+ queue: $queue,
+ job: $queueJob,
+ );
+
return QueuePushResult::success($jobId);
} catch (Throwable $e) {
+ // Emit push failed event
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $e,
+ );
+
return QueuePushResult::failure($e->getMessage());
}
}
@@ -222,7 +251,7 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
/**
* Mark job as completed.
*/
- public function done(QueueJob $queueJob, bool $keepJob): bool
+ public function done(QueueJob $queueJob): bool
{
try {
// Acknowledge the message to remove it from the queue
@@ -230,13 +259,6 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
$this->channel->basic_ack($queueJob->amqpDeliveryTag);
}
- if ($keepJob) {
- // For RabbitMQ, we don't need to persist completed jobs anywhere
- // as the message is already acknowledged and removed from the queue
- // @TODO remove the $keepDoneJobs option entirely
- $queueJob->status = Status::DONE->value;
- }
-
return true;
} catch (Throwable $e) {
log_message('error', 'RabbitMQ done error: ' . $e->getMessage());
@@ -260,6 +282,12 @@ public function clear(?string $queue = null): bool
$this->clearQueue($queue);
}
+ // Emit queue cleared event
+ QueueEventManager::queueCleared(
+ handler: $this->name(),
+ queue: $queue,
+ );
+
return true;
} catch (Throwable $e) {
log_message('error', 'RabbitMQ clear error: ' . $e->getMessage());
diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php
index 3fcd67f..4e071cc 100644
--- a/src/Handlers/RedisHandler.php
+++ b/src/Handlers/RedisHandler.php
@@ -19,12 +19,14 @@
use CodeIgniter\Queue\Config\Queue as QueueConfig;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Enums\Status;
+use CodeIgniter\Queue\Events\QueueEventManager;
use CodeIgniter\Queue\Interfaces\QueueInterface;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadMetadata;
use CodeIgniter\Queue\QueuePushResult;
use Redis;
use RedisException;
+use RuntimeException;
use Throwable;
class RedisHandler extends BaseHandler implements QueueInterface
@@ -62,7 +64,20 @@ public function __construct(protected QueueConfig $config)
throw new CriticalError('Queue: LUA script for Redis is not available.');
}
$this->luaScript = file_get_contents($luaScript);
+
+ // Emit connection established event
+ QueueEventManager::handlerConnectionEstablished(
+ handler: $this->name(),
+ config: $config->redis,
+ );
} catch (RedisException $e) {
+ // Emit connection failed event
+ QueueEventManager::handlerConnectionFailed(
+ handler: $this->name(),
+ exception: $e,
+ config: $config->redis,
+ );
+
throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').');
}
}
@@ -102,18 +117,50 @@ public function push(string $queue, string $job, array $data, ?PayloadMetadata $
try {
$result = $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob));
} catch (Throwable $e) {
+ // Emit push failed event
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $e,
+ );
+
return QueuePushResult::failure('Unexpected Redis error: ' . $e->getMessage());
} finally {
$this->priority = $this->delay = null;
}
if ($result === false) {
- return QueuePushResult::failure('Failed to add job to Redis.');
+ $error = new RuntimeException('Failed to add job to Redis.');
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $error,
+ );
+
+ return QueuePushResult::failure($error->getMessage());
+ }
+
+ if ((int) $result > 0) {
+ // Emit job pushed event
+ QueueEventManager::jobPushed(
+ handler: $this->name(),
+ queue: $queue,
+ job: $queueJob,
+ );
+
+ return QueuePushResult::success($jobId);
}
+ $error = new RuntimeException('Job already exists in the queue.');
+ QueueEventManager::jobPushFailed(
+ handler: $this->name(),
+ queue: $queue,
+ jobClass: $job,
+ exception: $error,
+ );
- return (int) $result > 0
- ? QueuePushResult::success($jobId)
- : QueuePushResult::failure('Job already exists in the queue.');
+ return QueuePushResult::failure($error->getMessage());
}
/**
@@ -189,13 +236,8 @@ public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
*
* @throws RedisException
*/
- public function done(QueueJob $queueJob, bool $keepJob): bool
+ public function done(QueueJob $queueJob): bool
{
- if ($keepJob) {
- $queueJob->status = Status::DONE->value;
- $this->redis->lPush("queues:{$queueJob->queue}::done", json_encode($queueJob));
- }
-
return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", (string) $queueJob->id);
}
@@ -207,17 +249,21 @@ public function done(QueueJob $queueJob, bool $keepJob): bool
public function clear(?string $queue = null): bool
{
if ($queue !== null) {
- if ($keys = $this->redis->keys("queues:{$queue}:*")) {
- return (int) $this->redis->del($keys) > 0;
- }
-
- return true;
+ $result = ($keys = $this->redis->keys("queues:{$queue}:*")) ? (int) $this->redis->del($keys) > 0 : true;
+ } elseif ($keys = $this->redis->keys('queues:*')) {
+ $result = (int) $this->redis->del($keys) > 0;
+ } else {
+ $result = true;
}
- if ($keys = $this->redis->keys('queues:*')) {
- return (int) $this->redis->del($keys) > 0;
+ if ($result) {
+ // Emit queue cleared event
+ QueueEventManager::queueCleared(
+ handler: $this->name(),
+ queue: $queue,
+ );
}
- return true;
+ return $result;
}
}
diff --git a/src/Interfaces/QueueInterface.php b/src/Interfaces/QueueInterface.php
index 30ab6c9..1ca68fa 100644
--- a/src/Interfaces/QueueInterface.php
+++ b/src/Interfaces/QueueInterface.php
@@ -26,7 +26,7 @@ public function later(QueueJob $queueJob, int $seconds);
public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob);
- public function done(QueueJob $queueJob, bool $keepJob);
+ public function done(QueueJob $queueJob);
public function clear(?string $queue = null);
diff --git a/src/Payloads/ChainBuilder.php b/src/Payloads/ChainBuilder.php
index 772e376..26a0bb7 100644
--- a/src/Payloads/ChainBuilder.php
+++ b/src/Payloads/ChainBuilder.php
@@ -64,6 +64,7 @@ public function dispatch(): QueuePushResult
}
// Set chained jobs for the next job
+ // @phpstan-ignore greater.alwaysTrue
if ($this->payloads->count() > 0) {
$current->setChainedJobs($this->payloads);
}
diff --git a/stubs/SignalTrait.phpstub b/stubs/SignalTrait.phpstub
new file mode 100644
index 0000000..7297e45
--- /dev/null
+++ b/stubs/SignalTrait.phpstub
@@ -0,0 +1,132 @@
+ */
+ private array $registeredSignals = [];
+
+ /** @var array */
+ private array $signalMethodMap = [];
+
+ private static ?bool $isPcntlAvailable = null;
+ private static ?bool $isPosixAvailable = null;
+
+ protected function isPcntlAvailable(): bool
+ {
+ }
+
+ protected function isPosixAvailable(): bool
+ {
+ }
+
+ /**
+ * @param list $signals
+ * @param array $methodMap
+ */
+ protected function registerSignals(array $signals = [SIGTERM, SIGINT, SIGHUP, SIGQUIT], array $methodMap = []): void
+ {
+ }
+
+ protected function handleSignal(int $signal): void
+ {
+ }
+
+ protected function shouldTerminate(): bool
+ {
+ }
+
+ protected function isRunning(): bool
+ {
+ }
+
+ protected function requestTermination(): void
+ {
+ }
+
+ protected function resetState(): void
+ {
+ }
+
+ /**
+ * @template TReturn
+ * @param Closure():TReturn $operation
+ * @return TReturn
+ */
+ protected function withSignalsBlocked(Closure $operation)
+ {
+ }
+
+ protected function blockSignals(): void
+ {
+ }
+
+ protected function unblockSignals(): void
+ {
+ }
+
+ protected function signalsBlocked(): bool
+ {
+ }
+
+ protected function mapSignal(int $signal, string $method): void
+ {
+ }
+
+ protected function getSignalName(int $signal): string
+ {
+ }
+
+ protected function unregisterSignals(): void
+ {
+ }
+
+ protected function hasSignals(): bool
+ {
+ }
+
+ /**
+ * @return list
+ */
+ protected function getSignals(): array
+ {
+ }
+
+ /**
+ * @return array{
+ * pid: int,
+ * running: bool,
+ * pcntl_available: bool,
+ * registered_signals: int,
+ * registered_signals_names: array,
+ * signals_blocked: bool,
+ * explicit_mappings: int,
+ * memory_usage_mb: float,
+ * memory_peak_mb: float,
+ * session_id?: false|int,
+ * process_group?: false|int,
+ * has_controlling_terminal?: bool
+ * }
+ */
+ protected function getProcessState(): array
+ {
+ }
+}
diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php
index f189f78..77df5be 100644
--- a/tests/DatabaseHandlerTest.php
+++ b/tests/DatabaseHandlerTest.php
@@ -403,29 +403,12 @@ public function testFailedAndDontKeepJob(): void
/**
* @throws ReflectionException
*/
- public function testDoneAndKeepJob(): void
+ public function testDone(): void
{
$handler = new DatabaseHandler($this->config);
$queueJob = $handler->pop('queue1', ['default']);
- $result = $handler->done($queueJob, true);
-
- $this->assertTrue($result);
- $this->seeInDatabase('queue_jobs', [
- 'id' => 2,
- 'status' => Status::DONE->value,
- ]);
- }
-
- /**
- * @throws ReflectionException
- */
- public function testDoneAndDontKeepJob(): void
- {
- $handler = new DatabaseHandler($this->config);
- $queueJob = $handler->pop('queue1', ['default']);
-
- $result = $handler->done($queueJob, false);
+ $result = $handler->done($queueJob);
$this->assertTrue($result);
$this->dontSeeInDatabase('queue_jobs', [
diff --git a/tests/Events/QueueEventManagerTest.php b/tests/Events/QueueEventManagerTest.php
new file mode 100644
index 0000000..3513473
--- /dev/null
+++ b/tests/Events/QueueEventManagerTest.php
@@ -0,0 +1,290 @@
+
+ *
+ * For the full copyright and license information, please view
+ * the LICENSE file that was distributed with this source code.
+ */
+
+namespace Tests\Events;
+
+use CodeIgniter\Events\Events;
+use CodeIgniter\Queue\Entities\QueueJob;
+use CodeIgniter\Queue\Events\QueueEvent;
+use CodeIgniter\Queue\Events\QueueEventManager;
+use CodeIgniter\Queue\Payloads\Payload;
+use Exception;
+use Tests\Support\TestCase;
+
+/**
+ * @internal
+ */
+final class QueueEventManagerTest extends TestCase
+{
+ private array $capturedEvents = [];
+
+ protected function setUp(): void
+ {
+ parent::setUp();
+
+ // Clear captured events
+ $this->capturedEvents = [];
+
+ // Set up event listener to capture all queue events
+ $this->setupEventCapture();
+ }
+
+ protected function tearDown(): void
+ {
+ parent::tearDown();
+
+ // Clear all event listeners
+ Events::removeAllListeners();
+ }
+
+ public function testJobPushedEvent(): void
+ {
+ $queueJob = $this->createTestQueueJob();
+
+ QueueEventManager::jobPushed('database', 'emails', $queueJob, ['extra' => 'data']);
+
+ $this->assertEventWasTriggered(QueueEventManager::JOB_PUSHED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.job.pushed', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame('App\\Jobs\\TestJob', $event->getJobClass());
+ $this->assertSame($queueJob, $event->getMetadata('job'));
+ $this->assertSame('data', $event->getMetadata('extra'));
+ }
+
+ public function testJobPushFailedEvent(): void
+ {
+ $exception = new Exception('Push failed');
+
+ QueueEventManager::jobPushFailed('redis', 'notifications', 'App\\Jobs\\EmailJob', $exception);
+
+ $this->assertEventWasTriggered(QueueEventManager::JOB_PUSH_FAILED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.job.push.failed', $event->getType());
+ $this->assertSame('redis', $event->getHandler());
+ $this->assertSame('notifications', $event->getQueue());
+ $this->assertSame('App\\Jobs\\EmailJob', $event->getJobClass());
+ $this->assertSame($exception, $event->getException());
+ }
+
+ public function testJobProcessingStartedEvent(): void
+ {
+ $queueJob = $this->createTestQueueJob();
+
+ QueueEventManager::jobProcessingStarted('database', 'emails', $queueJob, ['worker_id' => 'worker-1']);
+
+ $this->assertEventWasTriggered(QueueEventManager::JOB_PROCESSING_STARTED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.job.processing.started', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame('App\\Jobs\\TestJob', $event->getJobClass());
+ $this->assertSame($queueJob, $event->getMetadata('job'));
+ $this->assertSame('worker-1', $event->getMetadata('worker_id'));
+ }
+
+ public function testJobProcessingCompletedEvent(): void
+ {
+ $queueJob = $this->createTestQueueJob();
+ $processingTime = 1.5;
+
+ QueueEventManager::jobProcessingCompleted('database', 'emails', $queueJob, $processingTime, ['worker_id' => 'worker-1']);
+
+ $this->assertEventWasTriggered(QueueEventManager::JOB_PROCESSING_COMPLETED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.job.processing.completed', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame('App\\Jobs\\TestJob', $event->getJobClass());
+ $this->assertSame($queueJob, $event->getMetadata('job'));
+ $this->assertEqualsWithDelta(1.5, $event->getProcessingTime(), PHP_FLOAT_EPSILON);
+ $this->assertSame('worker-1', $event->getMetadata('worker_id'));
+ }
+
+ public function testJobFailedEvent(): void
+ {
+ $queueJob = $this->createTestQueueJob();
+ $exception = new Exception('Job failed');
+ $processingTime = 2.3;
+
+ QueueEventManager::jobFailed('database', 'emails', $queueJob, $exception, $processingTime, ['worker_id' => 'worker-1']);
+
+ $this->assertEventWasTriggered(QueueEventManager::JOB_FAILED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.job.failed', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame('App\\Jobs\\TestJob', $event->getJobClass());
+ $this->assertSame($queueJob, $event->getMetadata('job'));
+ $this->assertSame($exception, $event->getException());
+ $this->assertEqualsWithDelta(2.3, $event->getProcessingTime(), PHP_FLOAT_EPSILON);
+ $this->assertSame('worker-1', $event->getMetadata('worker_id'));
+ }
+
+ public function testQueueClearedEventWithSpecificQueue(): void
+ {
+ QueueEventManager::queueCleared('database', 'emails');
+
+ $this->assertEventWasTriggered(QueueEventManager::QUEUE_CLEARED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.cleared', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ }
+
+ public function testQueueClearedEventWithoutQueue(): void
+ {
+ QueueEventManager::queueCleared('database');
+
+ $this->assertEventWasTriggered(QueueEventManager::QUEUE_CLEARED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.cleared', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertNull($event->getQueue());
+ }
+
+ public function testWorkerStartedEvent(): void
+ {
+ $priorities = ['high', 'default'];
+ $config = ['timeout' => 300];
+
+ QueueEventManager::workerStarted('database', 'emails', $priorities, $config, ['worker_id' => 'worker-1']);
+
+ $this->assertEventWasTriggered(QueueEventManager::WORKER_STARTED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.worker.started', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame($priorities, $event->getMetadata('priorities'));
+ $this->assertSame($config, $event->getMetadata('config'));
+ $this->assertSame('worker-1', $event->getMetadata('worker_id'));
+ }
+
+ public function testWorkerStoppedEvent(): void
+ {
+ $priorities = ['high', 'default'];
+ $uptime = 3600.5;
+ $jobsProcessed = 25;
+
+ QueueEventManager::workerStopped('database', 'emails', $priorities, $uptime, $jobsProcessed, ['worker_id' => 'worker-1']);
+
+ $this->assertEventWasTriggered(QueueEventManager::WORKER_STOPPED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.worker.stopped', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame($priorities, $event->getMetadata('priorities'));
+ $this->assertEqualsWithDelta(3600.5, $event->getMetadata('uptime_seconds'), PHP_FLOAT_EPSILON);
+ $this->assertSame(25, $event->getMetadata('jobs_processed'));
+ $this->assertSame('worker-1', $event->getMetadata('worker_id'));
+ }
+
+ public function testHandlerConnectionEstablishedEvent(): void
+ {
+ $config = ['host' => 'localhost', 'port' => 3306];
+
+ QueueEventManager::handlerConnectionEstablished('database', $config);
+
+ $this->assertEventWasTriggered(QueueEventManager::HANDLER_CONNECTION_ESTABLISHED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.handler.connection.established', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertNull($event->getQueue());
+ $this->assertSame($config, $event->getMetadata('config'));
+ }
+
+ public function testHandlerConnectionFailedEvent(): void
+ {
+ $exception = new Exception('Connection failed');
+ $config = ['host' => 'localhost', 'port' => 3306];
+
+ QueueEventManager::handlerConnectionFailed('database', $exception, $config);
+
+ $this->assertEventWasTriggered(QueueEventManager::HANDLER_CONNECTION_FAILED);
+ $event = $this->getLastCapturedEvent();
+
+ $this->assertSame('queue.handler.connection.failed', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertNull($event->getQueue());
+ $this->assertSame($exception, $event->getException());
+ $this->assertSame($config, $event->getMetadata('config'));
+ }
+
+ private function createTestQueueJob(): QueueJob
+ {
+ $payload = new Payload('App\\Jobs\\TestJob', ['data' => 'test']);
+
+ return new QueueJob([
+ 'id' => 123,
+ 'queue' => 'test-queue',
+ 'payload' => $payload,
+ 'priority' => 'default',
+ 'status' => 0,
+ 'attempts' => 0,
+ ]);
+ }
+
+ private function setupEventCapture(): void
+ {
+ $constants = [
+ QueueEventManager::JOB_PUSHED,
+ QueueEventManager::JOB_PUSH_FAILED,
+ QueueEventManager::JOB_PROCESSING_STARTED,
+ QueueEventManager::JOB_PROCESSING_COMPLETED,
+ QueueEventManager::JOB_FAILED,
+ QueueEventManager::QUEUE_CLEARED,
+ QueueEventManager::WORKER_STARTED,
+ QueueEventManager::WORKER_STOPPED,
+ QueueEventManager::HANDLER_CONNECTION_ESTABLISHED,
+ QueueEventManager::HANDLER_CONNECTION_FAILED,
+ ];
+
+ foreach ($constants as $eventType) {
+ Events::on($eventType, function (QueueEvent $event): void {
+ $this->capturedEvents[] = $event;
+ });
+ }
+ }
+
+ private function assertEventWasTriggered(string $eventType): void
+ {
+ $found = false;
+
+ foreach ($this->capturedEvents as $event) {
+ if ($event->getType() === $eventType) {
+ $found = true;
+ break;
+ }
+ }
+
+ $this->assertTrue($found, "Event '{$eventType}' was not triggered");
+ }
+
+ private function getLastCapturedEvent(): QueueEvent
+ {
+ $this->assertNotEmpty($this->capturedEvents, 'No events were captured');
+
+ return end($this->capturedEvents);
+ }
+}
diff --git a/tests/Events/QueueEventTest.php b/tests/Events/QueueEventTest.php
new file mode 100644
index 0000000..c50f061
--- /dev/null
+++ b/tests/Events/QueueEventTest.php
@@ -0,0 +1,275 @@
+
+ *
+ * For the full copyright and license information, please view
+ * the LICENSE file that was distributed with this source code.
+ */
+
+namespace Tests\Events;
+
+use CodeIgniter\I18n\Time;
+use CodeIgniter\Queue\Entities\QueueJob;
+use CodeIgniter\Queue\Events\QueueEvent;
+use Exception;
+use Tests\Support\TestCase;
+
+/**
+ * @internal
+ */
+final class QueueEventTest extends TestCase
+{
+ public function testConstructorWithMinimalParameters(): void
+ {
+ $event = new QueueEvent('queue.job.pushed', 'database');
+
+ $this->assertSame('queue.job.pushed', $event->getType());
+ $this->assertSame('database', $event->getHandler());
+ $this->assertNull($event->getQueue());
+ $this->assertSame([], $event->getAllMetadata());
+ $this->assertInstanceOf(Time::class, $event->getTimestamp());
+ }
+
+ public function testConstructorWithAllParameters(): void
+ {
+ $timestamp = Time::parse('2023-01-01 12:00:00');
+ $metadata = ['job_class' => 'TestJob', 'attempts' => 3];
+
+ $event = new QueueEvent(
+ type: 'queue.job.failed',
+ handler: 'redis',
+ queue: 'emails',
+ metadata: $metadata,
+ timestamp: $timestamp,
+ );
+
+ $this->assertSame('queue.job.failed', $event->getType());
+ $this->assertSame('redis', $event->getHandler());
+ $this->assertSame('emails', $event->getQueue());
+ $this->assertSame($metadata, $event->getAllMetadata());
+ $this->assertSame($timestamp, $event->getTimestamp());
+ }
+
+ public function testGetMetadataWithExistingKey(): void
+ {
+ $metadata = ['job_class' => 'TestJob', 'priority' => 'high'];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame('TestJob', $event->getMetadata('job_class'));
+ $this->assertSame('high', $event->getMetadata('priority'));
+ }
+
+ public function testGetMetadataWithNonExistingKey(): void
+ {
+ $event = new QueueEvent('queue.job.pushed', 'database');
+
+ $this->assertNull($event->getMetadata('non_existing'));
+ $this->assertSame('default_value', $event->getMetadata('non_existing', 'default_value'));
+ }
+
+ public function testIsJobEvent(): void
+ {
+ $jobEvent = new QueueEvent('queue.job.pushed', 'database');
+ $workerEvent = new QueueEvent('queue.worker.started', 'database');
+ $operationEvent = new QueueEvent('queue.cleared', 'database');
+
+ $this->assertTrue($jobEvent->isJobEvent());
+ $this->assertFalse($workerEvent->isJobEvent());
+ $this->assertFalse($operationEvent->isJobEvent());
+ }
+
+ public function testIsWorkerEvent(): void
+ {
+ $jobEvent = new QueueEvent('queue.job.pushed', 'database');
+ $workerEvent = new QueueEvent('queue.worker.started', 'database');
+ $operationEvent = new QueueEvent('queue.cleared', 'database');
+
+ $this->assertFalse($jobEvent->isWorkerEvent());
+ $this->assertTrue($workerEvent->isWorkerEvent());
+ $this->assertFalse($operationEvent->isWorkerEvent());
+ }
+
+ public function testIsOperationEvent(): void
+ {
+ $jobEvent = new QueueEvent('queue.job.pushed', 'database');
+ $workerEvent = new QueueEvent('queue.worker.started', 'database');
+ $operationEvent = new QueueEvent('queue.cleared', 'database');
+ $connectionEvent = new QueueEvent('queue.handler.connection.failed', 'database');
+
+ $this->assertFalse($jobEvent->isOperationEvent());
+ $this->assertFalse($workerEvent->isOperationEvent());
+ $this->assertTrue($operationEvent->isOperationEvent());
+ $this->assertFalse($connectionEvent->isOperationEvent());
+ }
+
+ public function testIsConnectionEvent(): void
+ {
+ $jobEvent = new QueueEvent('queue.job.pushed', 'database');
+ $connectionEvent = new QueueEvent('queue.handler.connection.failed', 'database');
+ $connectionEstablishedEvent = new QueueEvent('queue.handler.connection.established', 'database');
+
+ $this->assertFalse($jobEvent->isConnectionEvent());
+ $this->assertTrue($connectionEvent->isConnectionEvent());
+ $this->assertTrue($connectionEstablishedEvent->isConnectionEvent());
+ }
+
+ public function testGetJobIdFromQueueJobObject(): void
+ {
+ $queueJob = new QueueJob(['id' => 123, 'queue' => 'test', 'payload' => ['job' => 'TestJob']]);
+ $metadata = ['job' => $queueJob];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame(123, $event->getJobId());
+ }
+
+ public function testGetJobIdFromMetadata(): void
+ {
+ $metadata = ['job_id' => 456];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame(456, $event->getJobId());
+ }
+
+ public function testGetJobIdReturnsNull(): void
+ {
+ $event = new QueueEvent('queue.job.pushed', 'database');
+
+ $this->assertNull($event->getJobId());
+ }
+
+ public function testGetPriorityFromQueueJobObject(): void
+ {
+ $queueJob = new QueueJob(['priority' => 'high', 'queue' => 'test', 'payload' => ['job' => 'TestJob']]);
+ $metadata = ['job' => $queueJob];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame('high', $event->getPriority());
+ }
+
+ public function testGetPriorityFromMetadata(): void
+ {
+ $metadata = ['priority' => 'low'];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame('low', $event->getPriority());
+ }
+
+ public function testGetAttemptsFromQueueJobObject(): void
+ {
+ $queueJob = new QueueJob(['attempts' => 3, 'queue' => 'test', 'payload' => ['job' => 'TestJob']]);
+ $metadata = ['job' => $queueJob];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame(3, $event->getAttempts());
+ }
+
+ public function testGetStatusFromQueueJobObject(): void
+ {
+ $queueJob = new QueueJob(['status' => 1, 'queue' => 'test', 'payload' => ['job' => 'TestJob']]);
+ $metadata = ['job' => $queueJob];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame(1, $event->getStatus());
+ }
+
+ public function testGetJobClass(): void
+ {
+ $metadata = ['job_class' => 'App\\Jobs\\TestJob'];
+ $event = new QueueEvent('queue.job.pushed', 'database', metadata: $metadata);
+
+ $this->assertSame('App\\Jobs\\TestJob', $event->getJobClass());
+ }
+
+ public function testGetProcessingTime(): void
+ {
+ $metadata = ['processing_time' => 1.5];
+ $event = new QueueEvent('queue.job.completed', 'database', metadata: $metadata);
+
+ $this->assertEqualsWithDelta(1.5, $event->getProcessingTime(), PHP_FLOAT_EPSILON);
+ }
+
+ public function testGetProcessingTimeDefaultsToZero(): void
+ {
+ $event = new QueueEvent('queue.job.completed', 'database');
+
+ $this->assertEqualsWithDelta(0.0, $event->getProcessingTime(), PHP_FLOAT_EPSILON);
+ }
+
+ public function testGetProcessingTimeMs(): void
+ {
+ $metadata = ['processing_time' => 1.5];
+ $event = new QueueEvent('queue.job.completed', 'database', metadata: $metadata);
+
+ $this->assertEqualsWithDelta(1500.0, $event->getProcessingTimeMs(), PHP_FLOAT_EPSILON);
+ }
+
+ public function testGetExceptionAndExceptionMessage(): void
+ {
+ $exception = new Exception('Test error message');
+ $metadata = ['exception' => $exception];
+ $event = new QueueEvent('queue.job.failed', 'database', metadata: $metadata);
+
+ $this->assertSame($exception, $event->getException());
+ $this->assertSame('Test error message', $event->getExceptionMessage());
+ $this->assertTrue($event->hasFailed());
+ }
+
+ public function testGetExceptionReturnsNullWhenNoException(): void
+ {
+ $event = new QueueEvent('queue.job.completed', 'database');
+
+ $this->assertNull($event->getException());
+ $this->assertNull($event->getExceptionMessage());
+ $this->assertFalse($event->hasFailed());
+ }
+
+ public function testToArray(): void
+ {
+ $timestamp = Time::parse('2023-01-01 12:00:00');
+ $metadata = ['job_class' => 'TestJob', 'attempts' => 3];
+
+ $event = new QueueEvent(
+ type: 'queue.job.failed',
+ handler: 'redis',
+ queue: 'emails',
+ metadata: $metadata,
+ timestamp: $timestamp,
+ );
+
+ $expected = [
+ 'type' => 'queue.job.failed',
+ 'handler' => 'redis',
+ 'queue' => 'emails',
+ 'metadata' => $metadata,
+ 'timestamp' => $timestamp->toDateTimeString(),
+ ];
+
+ $this->assertSame($expected, $event->toArray());
+ }
+
+ public function testToArrayWithNullQueue(): void
+ {
+ $timestamp = Time::parse('2023-01-01 12:00:00');
+
+ $event = new QueueEvent(
+ type: 'queue.job.failed',
+ handler: 'redis',
+ timestamp: $timestamp,
+ );
+
+ $expected = [
+ 'type' => 'queue.job.failed',
+ 'handler' => 'redis',
+ 'queue' => null,
+ 'metadata' => [],
+ 'timestamp' => $timestamp->toDateTimeString(),
+ ];
+
+ $this->assertSame($expected, $event->toArray());
+ }
+}
diff --git a/tests/Models/QueueJobModelTest.php b/tests/Models/QueueJobModelTest.php
index cf829d3..4d61f7e 100644
--- a/tests/Models/QueueJobModelTest.php
+++ b/tests/Models/QueueJobModelTest.php
@@ -41,9 +41,9 @@ public function testSkipLocked(): void
if ($model->db->DBDriver === 'SQLite3') {
$this->assertSame($sql, $result);
} elseif ($model->db->DBDriver === 'SQLSRV') {
- $this->assertStringContainsString('WITH (ROWLOCK,UPDLOCK,READPAST) WHERE', $result);
+ $this->assertStringContainsString('WITH (ROWLOCK,UPDLOCK,READPAST) WHERE', (string) $result);
} else {
- $this->assertStringContainsString('FOR UPDATE SKIP LOCKED', $result);
+ $this->assertStringContainsString('FOR UPDATE SKIP LOCKED', (string) $result);
}
}
diff --git a/tests/Payloads/ChainElementTest.php b/tests/Payloads/ChainElementTest.php
index 2bb232a..466b602 100644
--- a/tests/Payloads/ChainElementTest.php
+++ b/tests/Payloads/ChainElementTest.php
@@ -11,7 +11,7 @@
* the LICENSE file that was distributed with this source code.
*/
-namespace App\ThirdParty\queue\tests\Payloads;
+namespace Tests\Payloads;
use CodeIgniter\Queue\Payloads\ChainBuilder;
use CodeIgniter\Queue\Payloads\ChainElement;
diff --git a/tests/Payloads/PayloadCollectionTest.php b/tests/Payloads/PayloadCollectionTest.php
index 13ef3d6..7e60c24 100644
--- a/tests/Payloads/PayloadCollectionTest.php
+++ b/tests/Payloads/PayloadCollectionTest.php
@@ -11,7 +11,7 @@
* the LICENSE file that was distributed with this source code.
*/
-namespace App\ThirdParty\queue\tests\Payloads;
+namespace Tests\Payloads;
use ArrayIterator;
use CodeIgniter\Queue\Payloads\Payload;
diff --git a/tests/Payloads/PayloadMetadataTest.php b/tests/Payloads/PayloadMetadataTest.php
index e0af535..8bcaff8 100644
--- a/tests/Payloads/PayloadMetadataTest.php
+++ b/tests/Payloads/PayloadMetadataTest.php
@@ -11,7 +11,7 @@
* the LICENSE file that was distributed with this source code.
*/
-namespace App\ThirdParty\queue\tests\Payloads;
+namespace Tests\Payloads;
use CodeIgniter\Queue\Payloads\Payload;
use CodeIgniter\Queue\Payloads\PayloadCollection;
@@ -195,6 +195,7 @@ public function testFromArrayWithChainedJobs(): void
$this->assertCount(2, $chainedJobs);
$job1 = $chainedJobs->shift();
+ $this->assertInstanceOf(Payload::class, $job1);
$this->assertSame('job1', $job1->getJob());
$this->assertSame(['key1' => 'value1'], $job1->getData());
$this->assertSame('queue1', $job1->getQueue());
diff --git a/tests/Payloads/PayloadTest.php b/tests/Payloads/PayloadTest.php
index 570cff2..b09507f 100644
--- a/tests/Payloads/PayloadTest.php
+++ b/tests/Payloads/PayloadTest.php
@@ -11,7 +11,7 @@
* the LICENSE file that was distributed with this source code.
*/
-namespace App\ThirdParty\queue\tests\Payloads;
+namespace Tests\Payloads;
use CodeIgniter\Queue\Exceptions\QueueException;
use CodeIgniter\Queue\Payloads\Payload;
@@ -272,8 +272,10 @@ public function testFromArrayWithChainedJobs(): void
$this->assertTrue($payload->hasChainedJobs());
$chainedJobs = $payload->getChainedJobs();
$this->assertCount(1, $chainedJobs);
+ $this->assertInstanceOf(PayloadCollection::class, $chainedJobs);
$nextJob = $chainedJobs->shift();
+ $this->assertInstanceOf(Payload::class, $nextJob);
$this->assertSame('nextJob', $nextJob->getJob());
$this->assertSame(['nextKey' => 'nextValue'], $nextJob->getData());
$this->assertSame('nextQueue', $nextJob->getQueue());
diff --git a/tests/PredisHandlerTest.php b/tests/PredisHandlerTest.php
index 05d79d0..935dd65 100644
--- a/tests/PredisHandlerTest.php
+++ b/tests/PredisHandlerTest.php
@@ -11,7 +11,7 @@
* the LICENSE file that was distributed with this source code.
*/
-namespace ThirdParty\queue\tests;
+namespace Tests;
use CodeIgniter\I18n\Time;
use CodeIgniter\Queue\Entities\QueueJob;
@@ -323,24 +323,7 @@ public function testFailedAndDontKeepJob(): void
/**
* @throws ReflectionException
*/
- public function testDoneAndKeepJob(): void
- {
- $handler = new PredisHandler($this->config);
- $queueJob = $handler->pop('queue1', ['default']);
-
- $result = $handler->done($queueJob, true);
-
- $predis = self::getPrivateProperty($handler, 'predis');
-
- $this->assertTrue($result);
- $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id));
- $this->assertSame(1, $predis->llen('queues:queue1::done'));
- }
-
- /**
- * @throws ReflectionException
- */
- public function testDoneAndDontKeepJob(): void
+ public function testDone(): void
{
$handler = new PredisHandler($this->config);
$queueJob = $handler->pop('queue1', ['default']);
@@ -348,11 +331,10 @@ public function testDoneAndDontKeepJob(): void
$predis = self::getPrivateProperty($handler, 'predis');
$this->assertSame(0, $predis->zcard('queues:queue1:default'));
- $result = $handler->done($queueJob, false);
+ $result = $handler->done($queueJob);
$this->assertTrue($result);
$this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id));
- $this->assertSame(0, $predis->llen('queues:queue1::done'));
}
/**
diff --git a/tests/RabbitMQDelayTest.php b/tests/RabbitMQDelayTest.php
index 47232fe..b4787f2 100644
--- a/tests/RabbitMQDelayTest.php
+++ b/tests/RabbitMQDelayTest.php
@@ -17,7 +17,7 @@
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Handlers\RabbitMQHandler;
use CodeIgniter\Queue\QueuePushResult;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Connection\AMQPConnectionFactory;
use Tests\Support\Config\Queue as QueueConfig;
use Tests\Support\TestCase;
use Throwable;
@@ -83,7 +83,7 @@ public function testDelayedMessageWithRealTiming(): void
$job = $this->handler->pop('delay-test-queue', ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('immediate', $job->payload['data']['type']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
// Should not get delayed job yet (within first second)
$job = $this->handler->pop('delay-test-queue', ['default']);
@@ -103,7 +103,7 @@ public function testDelayedMessageWithRealTiming(): void
$this->assertGreaterThanOrEqual($delaySeconds, $elapsedTime);
// Clean up
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
public function testMultipleDelayedJobsWithDifferentDelays(): void
@@ -121,14 +121,14 @@ public function testMultipleDelayedJobsWithDifferentDelays(): void
$job = $this->handler->pop('delay-test-queue', ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('immediate', $job->payload['data']['order']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
// Wait 2 seconds - should get first delayed job
sleep(2);
$job = $this->handler->pop('delay-test-queue', ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('first', $job->payload['data']['order']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
// Should not get second job yet
$job = $this->handler->pop('delay-test-queue', ['default']);
@@ -139,7 +139,7 @@ public function testMultipleDelayedJobsWithDifferentDelays(): void
$job = $this->handler->pop('delay-test-queue', ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('second', $job->payload['data']['order']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
public function testZeroDelayWorksImmediately(): void
@@ -153,7 +153,7 @@ public function testZeroDelayWorksImmediately(): void
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('zero-delay', $job->payload['data']['type']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
/**
@@ -161,6 +161,6 @@ public function testZeroDelayWorksImmediately(): void
*/
private function isRabbitMQAvailable(): bool
{
- return class_exists(AMQPStreamConnection::class);
+ return class_exists(AMQPConnectionFactory::class);
}
}
diff --git a/tests/RabbitMQHandlerTest.php b/tests/RabbitMQHandlerTest.php
index 2836f2c..d9b26c2 100644
--- a/tests/RabbitMQHandlerTest.php
+++ b/tests/RabbitMQHandlerTest.php
@@ -15,13 +15,12 @@
use CodeIgniter\Exceptions\CriticalError;
use CodeIgniter\Queue\Entities\QueueJob;
-use CodeIgniter\Queue\Enums\Status;
use CodeIgniter\Queue\Exceptions\QueueException;
use CodeIgniter\Queue\Handlers\RabbitMQHandler;
use CodeIgniter\Queue\QueuePushResult;
use CodeIgniter\Test\ReflectionHelper;
use Exception;
-use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Connection\AMQPConnectionFactory;
use Tests\Support\Config\Queue as QueueConfig;
use Tests\Support\TestCase;
use Throwable;
@@ -133,7 +132,7 @@ public function testPopJob(): void
$this->assertSame(['message' => 'Test Pop'], $job->payload['data']);
// Clean up - mark as done
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
}
@@ -153,7 +152,7 @@ public function testPopJobWithPriorities(): void
if ($job !== null) {
$this->assertSame('high', $job->priority);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
}
@@ -245,21 +244,21 @@ public function testCustomPriorityMapping(): void
$job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']);
if ($job !== null) {
$this->assertSame('urgent', $job->payload['data']['priority']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
// Then normal priority
$job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']);
if ($job !== null) {
$this->assertSame('normal', $job->payload['data']['priority']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
// Finally low priority
$job = $this->handler->pop('custom-priority-queue', ['urgent', 'normal', 'low']);
if ($job !== null) {
$this->assertSame('low', $job->payload['data']['priority']);
- $this->handler->done($job, false);
+ $this->handler->done($job);
}
}
@@ -321,27 +320,14 @@ public function testFailedAndDontKeepJob(): void
]);
}
- public function testDoneAndKeepJob(): void
+ public function testDone(): void
{
$this->handler->push('test-queue', 'success', ['test' => 'data']);
$queueJob = $this->handler->pop('test-queue', ['default']);
$this->assertInstanceOf(QueueJob::class, $queueJob);
- $result = $this->handler->done($queueJob, true);
-
- $this->assertTrue($result);
- $this->assertSame(Status::DONE->value, $queueJob->status);
- }
-
- public function testDoneAndDontKeepJob(): void
- {
- $this->handler->push('test-queue', 'success', ['test' => 'data']);
- $queueJob = $this->handler->pop('test-queue', ['default']);
-
- $this->assertInstanceOf(QueueJob::class, $queueJob);
-
- $result = $this->handler->done($queueJob, false);
+ $result = $this->handler->done($queueJob);
// Job is acknowledged and removed from RabbitMQ
$this->assertTrue($result);
@@ -397,6 +383,6 @@ public function testJsonEncodeExceptionMethod(): void
*/
private function isRabbitMQAvailable(): bool
{
- return class_exists(AMQPStreamConnection::class);
+ return class_exists(AMQPConnectionFactory::class);
}
}
diff --git a/tests/RedisHandlerTest.php b/tests/RedisHandlerTest.php
index 7707cd4..ba12413 100644
--- a/tests/RedisHandlerTest.php
+++ b/tests/RedisHandlerTest.php
@@ -302,21 +302,7 @@ public function testFailedAndDontKeepJob(): void
]);
}
- public function testDoneAndKeepJob(): void
- {
- $handler = new RedisHandler($this->config);
- $queueJob = $handler->pop('queue1', ['default']);
-
- $result = $handler->done($queueJob, true);
-
- $redis = self::getPrivateProperty($handler, 'redis');
-
- $this->assertTrue($result);
- $this->assertFalse($redis->hExists('queues:queue1::reserved', (string) $queueJob->id));
- $this->assertSame(1, $redis->lLen('queues:queue1::done'));
- }
-
- public function testDoneAndDontKeepJob(): void
+ public function testDone(): void
{
$handler = new RedisHandler($this->config);
$queueJob = $handler->pop('queue1', ['default']);
@@ -324,11 +310,10 @@ public function testDoneAndDontKeepJob(): void
$redis = self::getPrivateProperty($handler, 'redis');
$this->assertSame(0, $redis->zCard('queues:queue1:default'));
- $result = $handler->done($queueJob, false);
+ $result = $handler->done($queueJob);
$this->assertTrue($result);
$this->assertFalse($redis->hExists('queues:queue1::reserved', (string) $queueJob->id));
- $this->assertSame(0, $redis->lLen('queues:queue1::done'));
}
public function testClear(): void