diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 61fe365..7da63f2 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -3,6 +3,7 @@ package jobscheduler import ( "context" + "log/slog" "runtime" "strings" "sync" @@ -214,27 +215,44 @@ func (q *RootScheduler) worker(ctx context.Context, id int) { if !ok { continue } - jobAttrs := attribute.String("job.type", jobType(job.id)) - start := time.Now() - logger.InfoContext(ctx, "Starting job", "job", job) - err := job.run(ctx) - elapsed := time.Since(start) - status := "success" - if err != nil { - status = "error" - logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) - } else { - logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) - } - statusAttr := attribute.String("status", status) - q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr)) - q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr)) - q.markQueueInactive(job.queue) - q.workAvailable <- true + q.runJob(ctx, logger, job) } } } +func (q *RootScheduler) runJob(ctx context.Context, logger *slog.Logger, job queueJob) { + defer q.markQueueInactive(job.queue) + defer func() { q.workAvailable <- true }() + + jobAttrs := attribute.String("job.type", jobType(job.id)) + start := time.Now() + logger.InfoContext(ctx, "Starting job", "job", job) + + var err error + func() { + defer func() { + if r := recover(); r != nil { + stack := make([]byte, 4096) + stack = stack[:runtime.Stack(stack, false)] + err = errors.Errorf("panic: %v\n%s", r, stack) + } + }() + err = job.run(ctx) + }() + + elapsed := time.Since(start) + status := "success" + if err != nil { + status = "error" + logger.ErrorContext(ctx, "Job failed", "job", job, "error", err, "elapsed", elapsed) + } else { + logger.InfoContext(ctx, "Job completed", "job", job, "elapsed", elapsed) + } + statusAttr := attribute.String("status", status) + q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr)) + q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr)) +} + // jobType extracts a normalised job type from the job ID for metric labels. func jobType(id string) string { switch {