diff --git a/cmd/asb-worker/main.go b/cmd/asb-worker/main.go index a2013b5..14b8662 100644 --- a/cmd/asb-worker/main.go +++ b/cmd/asb-worker/main.go @@ -2,8 +2,10 @@ package main import ( "context" + "errors" "flag" "log/slog" + "net/http" "os" "os/signal" "syscall" @@ -11,19 +13,23 @@ import ( "github.com/evalops/asb/internal/bootstrap" "github.com/evalops/asb/internal/worker" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" ) func main() { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) var ( - interval time.Duration - limit int - once bool + interval time.Duration + limit int + once bool + metricsAddr string ) flag.DurationVar(&interval, "interval", 30*time.Second, "cleanup interval") flag.IntVar(&limit, "limit", 100, "maximum expired items processed per cleanup pass") flag.BoolVar(&once, "once", false, "run a single cleanup pass and exit") + flag.StringVar(&metricsAddr, "metrics-addr", os.Getenv("ASB_WORKER_METRICS_ADDR"), "optional metrics listen address") flag.Parse() ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) @@ -35,13 +41,43 @@ func main() { } defer cleanup() + workerMetrics, err := worker.NewMetrics("asb", worker.MetricsOptions{ + Registerer: prometheus.DefaultRegisterer, + }) + if err != nil { + logger.Error("configure worker metrics", "error", err) + os.Exit(1) + } + runner := worker.NewRunner(worker.Config{ Service: svc, Interval: interval, Limit: limit, Logger: logger, + Metrics: workerMetrics, }) + var metricsServer *http.Server + if metricsAddr != "" { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + metricsServer = &http.Server{ + Addr: metricsAddr, + Handler: mux, + } + go func() { + if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Error("worker metrics server exited", "error", err) + } + }() + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = metricsServer.Shutdown(shutdownCtx) + }() + logger.Info("worker metrics server listening", "addr", metricsAddr) + } + if once { if _, err := runner.RunOnce(ctx); err != nil { logger.Error("worker cleanup pass failed", "error", err) diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go new file mode 100644 index 0000000..230e530 --- /dev/null +++ b/internal/worker/metrics.go @@ -0,0 +1,137 @@ +package worker + +import ( + "fmt" + "strings" + "time" + "unicode" + + "github.com/evalops/asb/internal/app" + "github.com/prometheus/client_golang/prometheus" +) + +type MetricsOptions struct { + Registerer prometheus.Registerer + DurationBuckets []float64 +} + +type Metrics struct { + processed *prometheus.CounterVec + duration prometheus.Histogram +} + +func NewMetrics(serviceName string, opts MetricsOptions) (*Metrics, error) { + if opts.Registerer == nil { + opts.Registerer = prometheus.DefaultRegisterer + } + if len(opts.DurationBuckets) == 0 { + opts.DurationBuckets = prometheus.DefBuckets + } + + prefix := metricsPrefix(serviceName) + processed, err := registerCounterVec( + opts.Registerer, + prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: prefix + "_cleanup_processed_total", + Help: "Count of ASB cleanup items processed by type.", + }, + []string{"item_type"}, + ), + ) + if err != nil { + return nil, err + } + + duration, err := registerHistogram( + opts.Registerer, + prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: prefix + "_cleanup_pass_seconds", + Help: "Duration of ASB cleanup passes in seconds.", + Buckets: opts.DurationBuckets, + }, + ), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + processed: processed, + duration: duration, + }, nil +} + +func (m *Metrics) recordCleanupPass(stats *app.CleanupStats, duration time.Duration) { + if m == nil { + return + } + m.duration.Observe(duration.Seconds()) + if stats == nil { + return + } + m.processed.WithLabelValues("approvals").Add(float64(stats.ApprovalsExpired)) + m.processed.WithLabelValues("sessions").Add(float64(stats.SessionsExpired)) + m.processed.WithLabelValues("grants").Add(float64(stats.GrantsExpired)) + m.processed.WithLabelValues("artifacts").Add(float64(stats.ArtifactsExpired)) +} + +func metricsPrefix(serviceName string) string { + serviceName = strings.TrimSpace(serviceName) + if serviceName == "" { + return "service" + } + + var builder strings.Builder + for index, runeValue := range serviceName { + switch { + case unicode.IsLetter(runeValue), unicode.IsDigit(runeValue): + builder.WriteRune(unicode.ToLower(runeValue)) + default: + builder.WriteByte('_') + } + if index == 0 && unicode.IsDigit(runeValue) { + builder.WriteByte('_') + } + } + + prefix := strings.Trim(builder.String(), "_") + if prefix == "" { + return "service" + } + if prefix[0] >= '0' && prefix[0] <= '9' { + return "service_" + prefix + } + return prefix +} + +func registerCounterVec(registerer prometheus.Registerer, collector *prometheus.CounterVec) (*prometheus.CounterVec, error) { + if err := registerer.Register(collector); err != nil { + alreadyRegistered, ok := err.(prometheus.AlreadyRegisteredError) + if !ok { + return nil, err + } + existing, ok := alreadyRegistered.ExistingCollector.(*prometheus.CounterVec) + if !ok { + return nil, err + } + return existing, nil + } + return collector, nil +} + +func registerHistogram(registerer prometheus.Registerer, collector prometheus.Histogram) (prometheus.Histogram, error) { + if err := registerer.Register(collector); err != nil { + alreadyRegistered, ok := err.(prometheus.AlreadyRegisteredError) + if !ok { + return nil, err + } + existing, ok := alreadyRegistered.ExistingCollector.(prometheus.Histogram) + if !ok { + return nil, fmt.Errorf("register histogram: existing collector has unexpected type %T", alreadyRegistered.ExistingCollector) + } + return existing, nil + } + return collector, nil +} diff --git a/internal/worker/runner.go b/internal/worker/runner.go index d1ed354..60b7c4e 100644 --- a/internal/worker/runner.go +++ b/internal/worker/runner.go @@ -18,6 +18,7 @@ type Config struct { Interval time.Duration Limit int Logger *slog.Logger + Metrics *Metrics } type Runner struct { @@ -25,6 +26,7 @@ type Runner struct { interval time.Duration limit int logger *slog.Logger + metrics *Metrics } func NewRunner(cfg Config) *Runner { @@ -42,6 +44,7 @@ func NewRunner(cfg Config) *Runner { interval: cfg.Interval, limit: cfg.Limit, logger: cfg.Logger, + metrics: cfg.Metrics, } } @@ -49,10 +52,12 @@ func (r *Runner) RunOnce(ctx context.Context) (*app.CleanupStats, error) { if r.service == nil { return nil, fmt.Errorf("cleanup service is required") } + startedAt := time.Now() stats, err := r.service.RunCleanupOnce(ctx, r.limit) if err != nil { return nil, err } + r.metrics.recordCleanupPass(stats, time.Since(startedAt)) r.logger.Info("worker cleanup complete", "approvals_expired", stats.ApprovalsExpired, "sessions_expired", stats.SessionsExpired, diff --git a/internal/worker/runner_test.go b/internal/worker/runner_test.go index 3440907..27236aa 100644 --- a/internal/worker/runner_test.go +++ b/internal/worker/runner_test.go @@ -10,6 +10,8 @@ import ( "github.com/evalops/asb/internal/app" "github.com/evalops/asb/internal/worker" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) func TestRunner_RunOnce(t *testing.T) { @@ -41,6 +43,54 @@ func TestRunner_RunOnce(t *testing.T) { } } +func TestRunner_RunOnceRecordsMetrics(t *testing.T) { + t.Parallel() + + registry := prometheus.NewRegistry() + metrics, err := worker.NewMetrics("asb", worker.MetricsOptions{ + Registerer: registry, + }) + if err != nil { + t.Fatalf("NewMetrics() error = %v", err) + } + + service := &fakeCleanupService{ + stats: &app.CleanupStats{ + ApprovalsExpired: 1, + SessionsExpired: 2, + GrantsExpired: 3, + ArtifactsExpired: 4, + }, + } + runner := worker.NewRunner(worker.Config{ + Service: service, + Limit: 50, + Logger: slog.New(slog.NewTextHandler(testWriter{t}, nil)), + Metrics: metrics, + }) + + if _, err := runner.RunOnce(context.Background()); err != nil { + t.Fatalf("RunOnce() error = %v", err) + } + + families := mustGatherMetrics(t, registry) + if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "approvals"}); got != 1 { + t.Fatalf("approval cleanup count = %v, want 1", got) + } + if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "sessions"}); got != 2 { + t.Fatalf("session cleanup count = %v, want 2", got) + } + if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "grants"}); got != 3 { + t.Fatalf("grant cleanup count = %v, want 3", got) + } + if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "artifacts"}); got != 4 { + t.Fatalf("artifact cleanup count = %v, want 4", got) + } + if got := histogramCountWithLabels(families, "asb_cleanup_pass_seconds", nil); got != 1 { + t.Fatalf("cleanup pass histogram count = %d, want 1", got) + } +} + func TestRunner_RunPropagatesErrors(t *testing.T) { t.Parallel() @@ -138,3 +188,63 @@ func (w testWriter) Write(p []byte) (int, error) { w.t.Log(string(p)) return len(p), nil } + +func mustGatherMetrics(t *testing.T, gatherer prometheus.Gatherer) []*dto.MetricFamily { + t.Helper() + + families, err := gatherer.Gather() + if err != nil { + t.Fatalf("Gather() error = %v", err) + } + return families +} + +func metricValueWithLabels(metricFamilies []*dto.MetricFamily, name string, labels map[string]string) float64 { + for _, family := range metricFamilies { + if family.GetName() != name { + continue + } + for _, metric := range family.Metric { + if !metricMatchesLabels(metric, labels) { + continue + } + switch family.GetType() { + case dto.MetricType_COUNTER: + return metric.GetCounter().GetValue() + case dto.MetricType_GAUGE: + return metric.GetGauge().GetValue() + } + } + } + return 0 +} + +func histogramCountWithLabels(metricFamilies []*dto.MetricFamily, name string, labels map[string]string) uint64 { + for _, family := range metricFamilies { + if family.GetName() != name || family.GetType() != dto.MetricType_HISTOGRAM { + continue + } + for _, metric := range family.Metric { + if metricMatchesLabels(metric, labels) { + return metric.GetHistogram().GetSampleCount() + } + } + } + return 0 +} + +func metricMatchesLabels(metric *dto.Metric, labels map[string]string) bool { + if len(labels) == 0 { + return true + } + values := make(map[string]string, len(metric.Label)) + for _, label := range metric.Label { + values[label.GetName()] = label.GetValue() + } + for key, want := range labels { + if values[key] != want { + return false + } + } + return true +}