Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/Actions/CalculateQueueMetricsAction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?php

declare(strict_types=1);

namespace PHPeek\LaravelQueueMetrics\Actions;

use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository;

/**
* Calculate aggregated queue-level metrics from job-level metrics.
*
* This action aggregates metrics across all job classes for a given queue,
* calculating weighted averages and totals for throughput, duration, and failure rates.
*/
final readonly class CalculateQueueMetricsAction
{
public function __construct(
private JobMetricsRepository $jobRepository,
private QueueMetricsRepository $queueRepository,
) {}

/**
* Calculate and store aggregated metrics for a specific queue.
*/
public function execute(string $connection, string $queue): void
{
// Get all jobs for this queue
$allJobs = $this->jobRepository->listJobs();
$queueJobs = array_filter($allJobs, fn ($job) => $job['connection'] === $connection && $job['queue'] === $queue);

if (empty($queueJobs)) {
// No jobs found for this queue - record zero metrics
$this->queueRepository->recordSnapshot($connection, $queue, [
'throughput_per_minute' => 0.0,
'avg_duration' => 0.0,
'failure_rate' => 0.0,
]);

return;
}

// Aggregate metrics across all job classes
$totalProcessed = 0;
$totalFailed = 0;
$totalDurationMs = 0.0;
$lastProcessedAt = null;

foreach ($queueJobs as $job) {
$jobClass = $job['jobClass'];
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);

$totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0;
$totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0;
$totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms'])
? (float) $metrics['total_duration_ms']
: 0.0;

if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) {
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
$lastProcessedAt = $metrics['last_processed_at'];
}
}
}

// Calculate aggregated metrics
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
$failureRate = ($totalProcessed + $totalFailed) > 0
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
: 0.0;

// Calculate throughput per minute (jobs completed in last 60 seconds)
$throughputPerMinute = 0.0;
foreach ($queueJobs as $job) {
$jobClass = $job['jobClass'];
$throughputPerMinute += $this->jobRepository->getThroughput(
$jobClass,
$connection,
$queue,
60 // last 60 seconds
);
}

// Store aggregated metrics
$this->queueRepository->recordSnapshot($connection, $queue, [
'throughput_per_minute' => $throughputPerMinute,
'avg_duration' => $avgDuration,
'failure_rate' => $failureRate,
'total_processed' => $totalProcessed,
'total_failed' => $totalFailed,
'last_processed_at' => $lastProcessedAt?->timestamp,
]);
}

/**
* Calculate metrics for all discovered queues.
*/
public function executeForAllQueues(): int
{
$queues = $this->queueRepository->listQueues();
$count = 0;

foreach ($queues as $queue) {
$this->execute($queue['connection'], $queue['queue']);
$count++;
}

return $count;
}
}
6 changes: 1 addition & 5 deletions src/Actions/RecordJobStartAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Carbon\Carbon;
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\JobMetricsRepository;
use PHPeek\LaravelQueueMetrics\Repositories\Contracts\QueueMetricsRepository;

/**
* Record when a job starts processing.
Expand All @@ -15,7 +14,6 @@
{
public function __construct(
private JobMetricsRepository $repository,
private QueueMetricsRepository $queueMetricsRepository,
) {}

public function execute(
Expand All @@ -28,9 +26,7 @@ public function execute(
return;
}

// Mark queue as discovered for listQueues() to find it
$this->queueMetricsRepository->markQueueDiscovered($connection, $queue);

// Queue discovery now happens atomically inside recordStart()
$this->repository->recordStart(
jobId: $jobId,
jobClass: $jobClass,
Expand Down
67 changes: 67 additions & 0 deletions src/Console/CalculateQueueMetricsCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);

namespace PHPeek\LaravelQueueMetrics\Console;

use Illuminate\Console\Command;
use PHPeek\LaravelQueueMetrics\Actions\CalculateQueueMetricsAction;

/**
* Calculate and update queue-level aggregated metrics.
*
* This command aggregates job-level metrics into queue-level metrics including
* throughput_per_minute, avg_duration, and failure_rate. These metrics are
* essential for auto-scaling calculations and queue health monitoring.
*/
final class CalculateQueueMetricsCommand extends Command
{
protected $signature = 'queue-metrics:calculate
{--connection= : Calculate metrics only for this connection}
{--queue= : Calculate metrics only for this queue (requires --connection)}';

protected $description = 'Calculate aggregated queue-level metrics from job metrics';

public function __construct(
private readonly CalculateQueueMetricsAction $action,
) {
parent::__construct();
}

public function handle(): int
{
$connection = $this->option('connection');
$queue = $this->option('queue');

// Validate options
if ($queue !== null && $connection === null) {
$this->error('The --queue option requires --connection to be specified');

return self::FAILURE;
}

try {
if ($connection !== null && $queue !== null) {
// Calculate for specific queue
$connectionStr = is_string($connection) ? $connection : '';
$queueStr = is_string($queue) ? $queue : '';
$this->info("Calculating metrics for {$connectionStr}:{$queueStr}...");
$this->action->execute($connectionStr, $queueStr);
$this->info('✓ Metrics calculated successfully');

return self::SUCCESS;
}

// Calculate for all queues
$this->info('Calculating metrics for all queues...');
$count = $this->action->executeForAllQueues();
$this->info("✓ Metrics calculated for {$count} queue(s)");

return self::SUCCESS;
} catch (\Exception $e) {
$this->error("Failed to calculate queue metrics: {$e->getMessage()}");

return self::FAILURE;
}
}
}
7 changes: 7 additions & 0 deletions src/LaravelQueueMetricsServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use PHPeek\LaravelQueueMetrics\Commands\CleanupStaleWorkersCommand;
use PHPeek\LaravelQueueMetrics\Config\QueueMetricsConfig;
use PHPeek\LaravelQueueMetrics\Config\StorageConfig;
use PHPeek\LaravelQueueMetrics\Console\CalculateQueueMetricsCommand;
use PHPeek\LaravelQueueMetrics\Console\DetectStaleWorkersCommand;
use PHPeek\LaravelQueueMetrics\Console\RecordTrendDataCommand;
use PHPeek\LaravelQueueMetrics\Contracts\QueueInspector;
Expand Down Expand Up @@ -69,6 +70,7 @@ public function configurePackage(Package $package): void
->hasRoute('api')
->hasMigration('2024_01_01_000001_create_queue_metrics_storage_tables')
->hasCommand(CalculateBaselinesCommand::class)
->hasCommand(CalculateQueueMetricsCommand::class)
->hasCommand(CleanupStaleWorkersCommand::class)
->hasCommand(DetectStaleWorkersCommand::class)
->hasCommand(RecordTrendDataCommand::class);
Expand Down Expand Up @@ -199,6 +201,11 @@ protected function registerScheduledTasks(): void
// Schedule adaptive baseline calculation
$this->scheduleAdaptiveBaselineCalculation($scheduler);

// Schedule queue metrics calculation (aggregate job metrics into queue metrics)
$scheduler->command('queue-metrics:calculate')
->everyMinute()
->withoutOverlapping();

// Schedule trend data recording (every minute for real-time trends)
$scheduler->command('queue-metrics:record-trends')
->everyMinute()
Expand Down
Loading