diff --git a/src/Actions/CalculateQueueMetricsAction.php b/src/Actions/CalculateQueueMetricsAction.php new file mode 100644 index 0000000..c38f66d --- /dev/null +++ b/src/Actions/CalculateQueueMetricsAction.php @@ -0,0 +1,110 @@ +jobRepository->listJobs(); + $queueJobs = array_filter($allJobs, fn ($job) => $job['connection'] === $connection && $job['queue'] === $queue); + + if (empty($queueJobs)) { + // No jobs found for this queue - record zero metrics + $this->queueRepository->recordSnapshot($connection, $queue, [ + 'throughput_per_minute' => 0.0, + 'avg_duration' => 0.0, + 'failure_rate' => 0.0, + ]); + + return; + } + + // Aggregate metrics across all job classes + $totalProcessed = 0; + $totalFailed = 0; + $totalDurationMs = 0.0; + $lastProcessedAt = null; + + foreach ($queueJobs as $job) { + $jobClass = $job['jobClass']; + $metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue); + + $totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0; + $totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0; + $totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms']) + ? (float) $metrics['total_duration_ms'] + : 0.0; + + if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) { + if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) { + $lastProcessedAt = $metrics['last_processed_at']; + } + } + } + + // Calculate aggregated metrics + $avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0; + $failureRate = ($totalProcessed + $totalFailed) > 0 + ? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0 + : 0.0; + + // Calculate throughput per minute (jobs completed in last 60 seconds) + $throughputPerMinute = 0.0; + foreach ($queueJobs as $job) { + $jobClass = $job['jobClass']; + $throughputPerMinute += $this->jobRepository->getThroughput( + $jobClass, + $connection, + $queue, + 60 // last 60 seconds + ); + } + + // Store aggregated metrics + $this->queueRepository->recordSnapshot($connection, $queue, [ + 'throughput_per_minute' => $throughputPerMinute, + 'avg_duration' => $avgDuration, + 'failure_rate' => $failureRate, + 'total_processed' => $totalProcessed, + 'total_failed' => $totalFailed, + 'last_processed_at' => $lastProcessedAt?->timestamp, + ]); + } + + /** + * Calculate metrics for all discovered queues. + */ + public function executeForAllQueues(): int + { + $queues = $this->queueRepository->listQueues(); + $count = 0; + + foreach ($queues as $queue) { + $this->execute($queue['connection'], $queue['queue']); + $count++; + } + + return $count; + } +} diff --git a/src/Actions/RecordJobStartAction.php b/src/Actions/RecordJobStartAction.php index ccbeccf..86d81c9 100644 --- a/src/Actions/RecordJobStartAction.php +++ b/src/Actions/RecordJobStartAction.php @@ -6,7 +6,6 @@ use Carbon\Carbon; use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository; -use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository; /** * Record when a job starts processing. @@ -15,7 +14,6 @@ { public function __construct( private JobMetricsRepository $repository, - private QueueMetricsRepository $queueMetricsRepository, ) {} public function execute( @@ -28,9 +26,7 @@ public function execute( return; } - // Mark queue as discovered for listQueues() to find it - $this->queueMetricsRepository->markQueueDiscovered($connection, $queue); - + // Queue discovery now happens atomically inside recordStart() $this->repository->recordStart( jobId: $jobId, jobClass: $jobClass, diff --git a/src/Console/CalculateQueueMetricsCommand.php b/src/Console/CalculateQueueMetricsCommand.php new file mode 100644 index 0000000..5a68fdc --- /dev/null +++ b/src/Console/CalculateQueueMetricsCommand.php @@ -0,0 +1,67 @@ +option('connection'); + $queue = $this->option('queue'); + + // Validate options + if ($queue !== null && $connection === null) { + $this->error('The --queue option requires --connection to be specified'); + + return self::FAILURE; + } + + try { + if ($connection !== null && $queue !== null) { + // Calculate for specific queue + $connectionStr = is_string($connection) ? $connection : ''; + $queueStr = is_string($queue) ? $queue : ''; + $this->info("Calculating metrics for {$connectionStr}:{$queueStr}..."); + $this->action->execute($connectionStr, $queueStr); + $this->info('✓ Metrics calculated successfully'); + + return self::SUCCESS; + } + + // Calculate for all queues + $this->info('Calculating metrics for all queues...'); + $count = $this->action->executeForAllQueues(); + $this->info("✓ Metrics calculated for {$count} queue(s)"); + + return self::SUCCESS; + } catch (\Exception $e) { + $this->error("Failed to calculate queue metrics: {$e->getMessage()}"); + + return self::FAILURE; + } + } +} diff --git a/src/LaravelQueueMetricsServiceProvider.php b/src/LaravelQueueMetricsServiceProvider.php index 40f6d83..f0feac3 100644 --- a/src/LaravelQueueMetricsServiceProvider.php +++ b/src/LaravelQueueMetricsServiceProvider.php @@ -24,6 +24,7 @@ use PHPeek\LaravelQueueMetrics\Commands\CleanupStaleWorkersCommand; use PHPeek\LaravelQueueMetrics\Config\QueueMetricsConfig; use PHPeek\LaravelQueueMetrics\Config\StorageConfig; +use PHPeek\LaravelQueueMetrics\Console\CalculateQueueMetricsCommand; use PHPeek\LaravelQueueMetrics\Console\DetectStaleWorkersCommand; use PHPeek\LaravelQueueMetrics\Console\RecordTrendDataCommand; use PHPeek\LaravelQueueMetrics\Contracts\QueueInspector; @@ -69,6 +70,7 @@ public function configurePackage(Package $package): void ->hasRoute('api') ->hasMigration('2024_01_01_000001_create_queue_metrics_storage_tables') ->hasCommand(CalculateBaselinesCommand::class) + ->hasCommand(CalculateQueueMetricsCommand::class) ->hasCommand(CleanupStaleWorkersCommand::class) ->hasCommand(DetectStaleWorkersCommand::class) ->hasCommand(RecordTrendDataCommand::class); @@ -199,6 +201,11 @@ protected function registerScheduledTasks(): void // Schedule adaptive baseline calculation $this->scheduleAdaptiveBaselineCalculation($scheduler); + // Schedule queue metrics calculation (aggregate job metrics into queue metrics) + $scheduler->command('queue-metrics:calculate') + ->everyMinute() + ->withoutOverlapping(); + // Schedule trend data recording (every minute for real-time trends) $scheduler->command('queue-metrics:record-trends') ->everyMinute() diff --git a/src/Repositories/RedisJobMetricsRepository.php b/src/Repositories/RedisJobMetricsRepository.php index 401fea3..d751c91 100644 --- a/src/Repositories/RedisJobMetricsRepository.php +++ b/src/Repositories/RedisJobMetricsRepository.php @@ -24,27 +24,48 @@ public function recordStart( string $queue, Carbon $startedAt, ): void { - $driver = $this->redis->driver(); $metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass); $jobKey = $this->redis->key('job', $jobId); + $jobDiscoveryKey = $this->redis->key('discovery', 'jobs'); + $queueDiscoveryKey = $this->redis->key('discovery', 'queues'); $ttl = $this->redis->getTtl('raw'); + $discoveryTtl = $this->redis->getTtl('aggregated'); - // Register job in discovery set (push-based tracking) - $this->markJobDiscovered($connection, $queue, $jobClass); - - // Increment total queued counter - $driver->incrementHashField($metricsKey, 'total_queued', 1); - - // Store job start time - $driver->setHash($jobKey, [ - 'job_class' => $jobClass, - 'connection' => $connection, - 'queue' => $queue, - 'started_at' => $startedAt->timestamp, - ], $ttl); - - // Ensure TTL is set on metrics key - $driver->expire($metricsKey, $ttl); + // Use transaction to ensure atomic registration of both discovery sets with metrics + $this->redis->transaction(function ($pipe) use ( + $jobDiscoveryKey, + $queueDiscoveryKey, + $metricsKey, + $jobKey, + $connection, + $queue, + $jobClass, + $startedAt, + $ttl, + $discoveryTtl + ) { + // Register queue in discovery set (push-based tracking) + $pipe->addToSet($queueDiscoveryKey, ["{$connection}:{$queue}"]); + $pipe->expire($queueDiscoveryKey, $discoveryTtl); + + // Register job in discovery set (push-based tracking) + $pipe->addToSet($jobDiscoveryKey, ["{$connection}:{$queue}:{$jobClass}"]); + $pipe->expire($jobDiscoveryKey, $discoveryTtl); + + // Increment total queued counter + $pipe->incrementHashField($metricsKey, 'total_queued', 1); + + // Store job start time + $pipe->setHash($jobKey, [ + 'job_class' => $jobClass, + 'connection' => $connection, + 'queue' => $queue, + 'started_at' => $startedAt->timestamp, + ], $ttl); + + // Ensure TTL is set on metrics key + $pipe->expire($metricsKey, $ttl); + }); } public function recordCompletion( @@ -132,35 +153,52 @@ public function recordFailure( Carbon $failedAt, ?string $hostname = null, ): void { - $driver = $this->redis->driver(); $metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass); + $jobKey = $this->redis->key('job', $jobId); $ttl = $this->redis->getTtl('raw'); - $driver->pipeline(function ($pipe) use ($metricsKey, $exception, $failedAt, $ttl) { - $pipe->incrementHashField($metricsKey, 'total_failed', 1); - $pipe->setHash($metricsKey, [ - 'last_failed_at' => $failedAt->timestamp, - 'last_exception' => substr($exception, 0, 1000), - ]); - // Refresh TTL on metrics key - $pipe->expire($metricsKey, $ttl); - }); - - // Store hostname-scoped metrics if hostname is provided if ($hostname !== null) { - $this->recordHostnameMetrics( - $hostname, - $connection, - $queue, - $jobClass, - 0.0, - false, - $failedAt - ); + // Include hostname metrics in the same transaction + $serverKey = $this->redis->key('server_jobs', $hostname, $connection, $queue, $jobClass); + + $this->redis->transaction(function ($pipe) use ( + $metricsKey, + $serverKey, + $jobKey, + $exception, + $failedAt, + $ttl + ) { + // Job-level failure metrics + $pipe->incrementHashField($metricsKey, 'total_failed', 1); + $pipe->setHash($metricsKey, [ + 'last_failed_at' => $failedAt->timestamp, + 'last_exception' => substr($exception, 0, 1000), + ]); + $pipe->expire($metricsKey, $ttl); + + // Hostname-level failure metrics + $pipe->incrementHashField($serverKey, 'total_failed', 1); + $pipe->setHash($serverKey, ['last_updated_at' => $failedAt->timestamp]); + $pipe->expire($serverKey, $ttl); + + // Clean up job tracking key + $pipe->delete($jobKey); + }); + } else { + // Transaction without hostname metrics + $this->redis->transaction(function ($pipe) use ($metricsKey, $jobKey, $exception, $failedAt, $ttl) { + $pipe->incrementHashField($metricsKey, 'total_failed', 1); + $pipe->setHash($metricsKey, [ + 'last_failed_at' => $failedAt->timestamp, + 'last_exception' => substr($exception, 0, 1000), + ]); + $pipe->expire($metricsKey, $ttl); + + // Clean up job tracking key + $pipe->delete($jobKey); + }); } - - // Clean up job tracking key - $driver->delete($this->redis->key('job', $jobId)); } /** @@ -178,7 +216,8 @@ private function recordHostnameMetrics( $serverKey = $this->redis->key('server_jobs', $hostname, $connection, $queue, $jobClass); $ttl = $this->redis->getTtl('raw'); - $this->redis->driver()->pipeline(function ($pipe) use ($serverKey, $durationMs, $success, $timestamp, $ttl) { + // Use transaction instead of pipeline to ensure atomicity + $this->redis->transaction(function ($pipe) use ($serverKey, $durationMs, $success, $timestamp, $ttl) { if ($success) { $pipe->incrementHashField($serverKey, 'total_processed', 1); $pipe->incrementHashField($serverKey, 'total_duration_ms', $durationMs); @@ -256,7 +295,18 @@ public function getMetrics( } /** - * @return array + * Get duration samples for job performance analysis. + * + * Returns the most recent duration measurements, with timestamps stored as + * sorted set scores in Redis. Samples are stored in timestamp order, so + * negative indices retrieve the most recent samples first. + * + * Sample Limit Behavior: + * - If fewer than $limit samples exist, returns all available samples + * - Samples are returned in chronological order (oldest to newest) + * - Default limit of 1000 provides sufficient data for statistical analysis + * + * @return array Duration values in milliseconds, chronologically ordered */ public function getDurationSamples( string $jobClass, @@ -268,6 +318,7 @@ public function getDurationSamples( $driver = $this->redis->driver(); // Get most recent samples (reverse order, so use negative indices) + // Timestamps are used as sorted set scores for time-based querying /** @var array */ $samples = $driver->getSortedSetByRank($key, -$limit, -1); @@ -275,7 +326,17 @@ public function getDurationSamples( } /** - * @return array + * Get memory usage samples for resource analysis. + * + * Returns the most recent memory measurements, with timestamps stored as + * sorted set scores in Redis. Samples are stored in timestamp order. + * + * Sample Limit Behavior: + * - If fewer than $limit samples exist, returns all available samples + * - Samples are returned in chronological order (oldest to newest) + * - Timestamps in sorted set scores enable time-based filtering + * + * @return array Memory values in megabytes, chronologically ordered */ public function getMemorySamples( string $jobClass, @@ -286,6 +347,7 @@ public function getMemorySamples( $key = $this->redis->key('memory', $connection, $queue, $jobClass); $driver = $this->redis->driver(); + // Timestamps are used as sorted set scores for time-based querying /** @var array */ $samples = $driver->getSortedSetByRank($key, -$limit, -1); @@ -293,7 +355,17 @@ public function getMemorySamples( } /** - * @return array + * Get CPU time samples for performance analysis. + * + * Returns the most recent CPU time measurements, with timestamps stored as + * sorted set scores in Redis. Samples are stored in timestamp order. + * + * Sample Limit Behavior: + * - If fewer than $limit samples exist, returns all available samples + * - Samples are returned in chronological order (oldest to newest) + * - Timestamps in sorted set scores enable time-based filtering + * + * @return array CPU time values in milliseconds, chronologically ordered */ public function getCpuTimeSamples( string $jobClass, @@ -304,6 +376,7 @@ public function getCpuTimeSamples( $key = $this->redis->key('cpu', $connection, $queue, $jobClass); $driver = $this->redis->driver(); + // Timestamps are used as sorted set scores for time-based querying /** @var array */ $samples = $driver->getSortedSetByRank($key, -$limit, -1); @@ -319,10 +392,18 @@ public function getThroughput( $key = $this->redis->key('durations', $connection, $queue, $jobClass); $driver = $this->redis->driver(); - $cutoff = Carbon::now()->subSeconds($windowSeconds)->timestamp; - - // Count samples within time window - return $driver->countSortedSetByScore($key, (string) $cutoff, '+inf'); + // Use Lua script to atomically calculate cutoff and count items + // This prevents race conditions where items could be added between + // calculating the cutoff timestamp and executing the count + $script = <<<'LUA' + local key = KEYS[1] + local windowSeconds = tonumber(ARGV[1]) + local cutoff = redis.call('TIME')[1] - windowSeconds + return redis.call('ZCOUNT', key, cutoff, '+inf') + LUA; + + /** @var int */ + return $driver->eval($script, 1, $key, $windowSeconds); } public function recordQueuedAt( @@ -347,21 +428,30 @@ public function recordRetryRequested( Carbon $retryRequestedAt, int $attemptNumber, ): void { - $driver = $this->redis->driver(); $metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass); $retryKey = $this->redis->key('retries', $connection, $queue, $jobClass); $ttl = $this->redis->getTtl('raw'); + $retryData = json_encode(['job_id' => $jobId, 'attempt' => $attemptNumber], JSON_THROW_ON_ERROR); - // Increment retry counter - $driver->incrementHashField($metricsKey, 'total_retries', 1); + // Use transaction to ensure atomic retry recording + $this->redis->transaction(function ($pipe) use ( + $metricsKey, + $retryKey, + $retryData, + $retryRequestedAt, + $ttl + ) { + // Increment retry counter + $pipe->incrementHashField($metricsKey, 'total_retries', 1); - // Store retry event for pattern analysis - $driver->addToSortedSet($retryKey, [ - json_encode(['job_id' => $jobId, 'attempt' => $attemptNumber], JSON_THROW_ON_ERROR) => (int) $retryRequestedAt->timestamp, - ], $ttl); + // Store retry event for pattern analysis + $pipe->addToSortedSet($retryKey, [ + $retryData => (int) $retryRequestedAt->timestamp, + ], $ttl); - // Refresh TTL on metrics key - $driver->expire($metricsKey, $ttl); + // Refresh TTL on metrics key + $pipe->expire($metricsKey, $ttl); + }); } public function recordTimeout( @@ -371,16 +461,18 @@ public function recordTimeout( string $queue, Carbon $timedOutAt, ): void { - $driver = $this->redis->driver(); $metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass); $ttl = $this->redis->getTtl('raw'); - // Increment timeout counter - $driver->incrementHashField($metricsKey, 'total_timeouts', 1); - $driver->setHash($metricsKey, ['last_timeout_at' => $timedOutAt->timestamp]); + // Use transaction to ensure atomic timeout recording + $this->redis->transaction(function ($pipe) use ($metricsKey, $timedOutAt, $ttl) { + // Increment timeout counter + $pipe->incrementHashField($metricsKey, 'total_timeouts', 1); + $pipe->setHash($metricsKey, ['last_timeout_at' => $timedOutAt->timestamp]); - // Refresh TTL on metrics key - $driver->expire($metricsKey, $ttl); + // Refresh TTL on metrics key + $pipe->expire($metricsKey, $ttl); + }); } public function recordException( @@ -392,20 +484,29 @@ public function recordException( string $exceptionMessage, Carbon $occurredAt, ): void { - $driver = $this->redis->driver(); $metricsKey = $this->redis->key('jobs', $connection, $queue, $jobClass); $exceptionsKey = $this->redis->key('exceptions', $connection, $queue, $jobClass); $ttl = $this->redis->getTtl('raw'); + $aggregatedTtl = $this->redis->getTtl('aggregated'); - // Increment exception counter - $driver->incrementHashField($metricsKey, 'total_exceptions', 1); + // Use transaction to ensure atomic exception recording + $this->redis->transaction(function ($pipe) use ( + $metricsKey, + $exceptionsKey, + $exceptionClass, + $ttl, + $aggregatedTtl + ) { + // Increment exception counter + $pipe->incrementHashField($metricsKey, 'total_exceptions', 1); - // Track exception types - $driver->incrementHashField($exceptionsKey, $exceptionClass, 1); - $driver->expire($exceptionsKey, $this->redis->getTtl('aggregated')); + // Track exception types + $pipe->incrementHashField($exceptionsKey, $exceptionClass, 1); + $pipe->expire($exceptionsKey, $aggregatedTtl); - // Refresh TTL on metrics key - $driver->expire($metricsKey, $ttl); + // Refresh TTL on metrics key + $pipe->expire($metricsKey, $ttl); + }); } public function cleanup(int $olderThanSeconds): int diff --git a/src/Services/TrendAnalysisService.php b/src/Services/TrendAnalysisService.php index ef4139b..63f59e7 100644 --- a/src/Services/TrendAnalysisService.php +++ b/src/Services/TrendAnalysisService.php @@ -175,8 +175,8 @@ public function analyzeThroughputTrend( $avg = array_sum($throughputs) / $count; $totalJobs = array_sum($throughputs); - // Calculate jobs per minute - $jobsPerMinute = ($totalJobs / $periodSeconds) * 60; + // Calculate jobs per minute (with defensive division by zero check) + $jobsPerMinute = $periodSeconds > 0 ? ($totalJobs / $periodSeconds) * 60 : 0.0; // Trend analysis $trend = $this->calculateLinearTrend($values); diff --git a/src/Support/PipelineWrapper.php b/src/Support/PipelineWrapper.php index b281c37..b710cd2 100644 --- a/src/Support/PipelineWrapper.php +++ b/src/Support/PipelineWrapper.php @@ -101,7 +101,7 @@ public function removeFromSortedSet(string $key, string $member): void public function addToSet(string $key, array $members): void { if (! empty($members)) { - $this->pipe->sadd($key, $members); + $this->pipe->sadd($key, ...$members); } } @@ -116,7 +116,7 @@ public function getSetMembers(string $key): mixed public function removeFromSet(string $key, array $members): void { if (! empty($members)) { - $this->pipe->srem($key, $members); + $this->pipe->srem($key, ...$members); } } diff --git a/tests/Feature/CalculateQueueMetricsTest.php b/tests/Feature/CalculateQueueMetricsTest.php new file mode 100644 index 0000000..0081592 --- /dev/null +++ b/tests/Feature/CalculateQueueMetricsTest.php @@ -0,0 +1,245 @@ +markTestSkipped('Requires Redis - run with redis group'); + } + + config()->set('queue-metrics.enabled', true); + config()->set('queue-metrics.storage.driver', 'redis'); + config()->set('queue-metrics.storage.connection', 'default'); + + // Flush Redis before each test to ensure clean state + \Illuminate\Support\Facades\Redis::connection('default')->flushdb(); +}); + +it('calculates queue metrics from job metrics', function () { + $connection = 'redis'; + $queue = 'default'; + $jobClass1 = 'App\\Jobs\\ProcessOrder'; + $jobClass2 = 'App\\Jobs\\SendEmail'; + + // Setup - Record job metrics for two different job classes + $jobRepo = app(JobMetricsRepository::class); + $queueRepo = app(QueueMetricsRepository::class); + + // Mark queue as discovered + $queueRepo->markQueueDiscovered($connection, $queue); + + // Record job completions for job 1: 10 jobs, 100ms avg duration + // Note: recordStart() now handles job discovery atomically + for ($i = 0; $i < 10; $i++) { + $jobId = "job-1-{$i}"; + $jobRepo->recordStart($jobId, $jobClass1, $connection, $queue, \Carbon\Carbon::now()); + $jobRepo->recordCompletion( + jobId: $jobId, + jobClass: $jobClass1, + connection: $connection, + queue: $queue, + durationMs: 100.0, + memoryMb: 10.0, + cpuTimeMs: 50.0, + completedAt: \Carbon\Carbon::now(), + ); + } + + // Record job completions for job 2: 5 jobs, 200ms avg duration + for ($i = 0; $i < 5; $i++) { + $jobId = "job-2-{$i}"; + $jobRepo->recordStart($jobId, $jobClass2, $connection, $queue, \Carbon\Carbon::now()); + $jobRepo->recordCompletion( + jobId: $jobId, + jobClass: $jobClass2, + connection: $connection, + queue: $queue, + durationMs: 200.0, + memoryMb: 15.0, + cpuTimeMs: 75.0, + completedAt: \Carbon\Carbon::now(), + ); + } + + // Execute - Calculate queue metrics + $action = app(CalculateQueueMetricsAction::class); + $action->execute($connection, $queue); + + // Assert - Verify aggregated metrics + $metrics = $queueRepo->getLatestMetrics($connection, $queue); + + expect($metrics)->not()->toBeEmpty('Queue metrics should not be empty'); + + // Weighted average duration: (10 * 100 + 5 * 200) / 15 = 133.33ms + $expectedAvgDuration = (10 * 100.0 + 5 * 200.0) / 15; + expect($metrics['avg_duration'])->toEqualWithDelta($expectedAvgDuration, 0.01); + + // Throughput should be sum of both job classes + expect($metrics['throughput_per_minute'])->toBeGreaterThan(0.0); + + // Failure rate should be 0 (no failures recorded) + expect($metrics['failure_rate'])->toBe(0.0); +})->group('redis'); + +it('handles queue with no jobs', function () { + $connection = 'redis'; + $queue = 'empty'; + + $queueRepo = app(QueueMetricsRepository::class); + $queueRepo->markQueueDiscovered($connection, $queue); + + // Execute - Calculate metrics for empty queue + $action = app(CalculateQueueMetricsAction::class); + $action->execute($connection, $queue); + + // Assert - Should record zero metrics + $metrics = $queueRepo->getLatestMetrics($connection, $queue); + + expect($metrics)->not()->toBeEmpty(); + expect($metrics['throughput_per_minute'])->toBe(0.0); + expect($metrics['avg_duration'])->toBe(0.0); + expect($metrics['failure_rate'])->toBe(0.0); +})->group('redis'); + +it('calculates failure rate correctly', function () { + $connection = 'redis'; + $queue = 'default'; + $jobClass = 'App\\Jobs\\RiskyJob'; + + $jobRepo = app(JobMetricsRepository::class); + $queueRepo = app(QueueMetricsRepository::class); + + $queueRepo->markQueueDiscovered($connection, $queue); + + // Record 7 successful completions + // Note: recordStart() now handles job discovery atomically + for ($i = 0; $i < 7; $i++) { + $jobId = "job-success-{$i}"; + $jobRepo->recordStart($jobId, $jobClass, $connection, $queue, \Carbon\Carbon::now()); + $jobRepo->recordCompletion( + jobId: $jobId, + jobClass: $jobClass, + connection: $connection, + queue: $queue, + durationMs: 100.0, + memoryMb: 10.0, + cpuTimeMs: 50.0, + completedAt: \Carbon\Carbon::now(), + ); + } + + // Record 3 failures + for ($i = 0; $i < 3; $i++) { + $jobRepo->recordFailure( + jobId: "job-failure-{$i}", + jobClass: $jobClass, + connection: $connection, + queue: $queue, + exception: 'Test exception', + failedAt: \Carbon\Carbon::now(), + ); + } + + // Execute + $action = app(CalculateQueueMetricsAction::class); + $action->execute($connection, $queue); + + // Assert - Failure rate should be 3/10 = 30.0% + $metrics = $queueRepo->getLatestMetrics($connection, $queue); + expect($metrics['failure_rate'])->toBe(30.0); +})->group('redis'); + +it('command calculates all queues', function () { + $jobRepo = app(JobMetricsRepository::class); + $queueRepo = app(QueueMetricsRepository::class); + + // Setup multiple queues + $queues = [ + ['connection' => 'redis', 'queue' => 'default'], + ['connection' => 'redis', 'queue' => 'high'], + ['connection' => 'redis', 'queue' => 'low'], + ]; + + foreach ($queues as $q) { + $queueRepo->markQueueDiscovered($q['connection'], $q['queue']); + $jobClass = 'App\\Jobs\\TestJob'.ucfirst($q['queue']); + + // Record some completions + // Note: recordStart() now handles job discovery atomically + $jobId = "job-{$q['queue']}-1"; + $jobRepo->recordStart($jobId, $jobClass, $q['connection'], $q['queue'], \Carbon\Carbon::now()); + $jobRepo->recordCompletion( + jobId: $jobId, + jobClass: $jobClass, + connection: $q['connection'], + queue: $q['queue'], + durationMs: 150.0, + memoryMb: 12.0, + cpuTimeMs: 60.0, + completedAt: \Carbon\Carbon::now(), + ); + } + + // Execute command + $exitCode = Artisan::call('queue-metrics:calculate'); + + expect($exitCode)->toBe(0); + + // Verify all queues have metrics + foreach ($queues as $q) { + $metrics = $queueRepo->getLatestMetrics($q['connection'], $q['queue']); + expect($metrics)->not()->toBeEmpty(); + expect($metrics['avg_duration'])->toEqualWithDelta(150.0, 0.01); + } +})->group('redis'); + +it('command calculates specific queue', function () { + $connection = 'redis'; + $queue = 'specific'; + $jobClass = 'App\\Jobs\\SpecificJob'; + + $jobRepo = app(JobMetricsRepository::class); + $queueRepo = app(QueueMetricsRepository::class); + + $queueRepo->markQueueDiscovered($connection, $queue); + + // Record job completion + // Note: recordStart() now handles job discovery atomically + $jobId = 'job-specific-1'; + $jobRepo->recordStart($jobId, $jobClass, $connection, $queue, \Carbon\Carbon::now()); + $jobRepo->recordCompletion( + jobId: $jobId, + jobClass: $jobClass, + connection: $connection, + queue: $queue, + durationMs: 250.0, + memoryMb: 20.0, + cpuTimeMs: 100.0, + completedAt: \Carbon\Carbon::now(), + ); + + // Execute command for specific queue + $exitCode = Artisan::call('queue-metrics:calculate', [ + '--connection' => $connection, + '--queue' => $queue, + ]); + + expect($exitCode)->toBe(0); + + $metrics = $queueRepo->getLatestMetrics($connection, $queue); + expect($metrics['avg_duration'])->toEqualWithDelta(250.0, 0.01); +})->group('redis'); + +it('command fails when queue specified without connection', function () { + $exitCode = Artisan::call('queue-metrics:calculate', [ + '--queue' => 'test', + ]); + + expect($exitCode)->toBe(1); +})->group('redis'); diff --git a/tests/Unit/Actions/RecordJobStartActionTest.php b/tests/Unit/Actions/RecordJobStartActionTest.php index 5238191..931ba45 100644 --- a/tests/Unit/Actions/RecordJobStartActionTest.php +++ b/tests/Unit/Actions/RecordJobStartActionTest.php @@ -22,10 +22,7 @@ }); it('records job start with all parameters', function () { - $this->queueRepository->shouldReceive('markQueueDiscovered') - ->once() - ->with('redis', 'default'); - + // Queue discovery now happens atomically inside recordStart() $this->repository->shouldReceive('recordStart') ->once() ->with( @@ -58,10 +55,7 @@ })->group('functional'); it('handles different queue connections', function () { - $this->queueRepository->shouldReceive('markQueueDiscovered') - ->once() - ->with('database', 'emails'); - + // Queue discovery now happens atomically inside recordStart() $this->repository->shouldReceive('recordStart') ->once() ->with( @@ -83,10 +77,7 @@ it('records start time at execution moment', function () { Carbon::setTestNow('2024-01-15 14:45:30'); - $this->queueRepository->shouldReceive('markQueueDiscovered') - ->once() - ->with('redis', 'reports'); - + // Queue discovery now happens atomically inside recordStart() $this->repository->shouldReceive('recordStart') ->once() ->with( @@ -106,10 +97,7 @@ })->group('functional'); it('handles job IDs with special characters', function () { - $this->queueRepository->shouldReceive('markQueueDiscovered') - ->once() - ->with('redis', 'default'); - + // Queue discovery now happens atomically inside recordStart() $this->repository->shouldReceive('recordStart') ->once() ->with(