Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions cmd/asb-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,34 @@ package main

import (
"context"
"errors"
"flag"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/evalops/asb/internal/bootstrap"
"github.com/evalops/asb/internal/worker"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

var (
interval time.Duration
limit int
once bool
interval time.Duration
limit int
once bool
metricsAddr string
)
flag.DurationVar(&interval, "interval", 30*time.Second, "cleanup interval")
flag.IntVar(&limit, "limit", 100, "maximum expired items processed per cleanup pass")
flag.BoolVar(&once, "once", false, "run a single cleanup pass and exit")
flag.StringVar(&metricsAddr, "metrics-addr", os.Getenv("ASB_WORKER_METRICS_ADDR"), "optional metrics listen address")
flag.Parse()

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
Expand All @@ -35,13 +41,43 @@ func main() {
}
defer cleanup()

workerMetrics, err := worker.NewMetrics("asb", worker.MetricsOptions{
Registerer: prometheus.DefaultRegisterer,
})
if err != nil {
logger.Error("configure worker metrics", "error", err)
os.Exit(1)
}

runner := worker.NewRunner(worker.Config{
Service: svc,
Interval: interval,
Limit: limit,
Logger: logger,
Metrics: workerMetrics,
})

var metricsServer *http.Server
if metricsAddr != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
metricsServer = &http.Server{
Addr: metricsAddr,
Handler: mux,
}
go func() {
if err := metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("worker metrics server exited", "error", err)
}
}()
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = metricsServer.Shutdown(shutdownCtx)
}()
logger.Info("worker metrics server listening", "addr", metricsAddr)
}

if once {
if _, err := runner.RunOnce(ctx); err != nil {
logger.Error("worker cleanup pass failed", "error", err)
Expand Down
137 changes: 137 additions & 0 deletions internal/worker/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package worker

import (
"fmt"
"strings"
"time"
"unicode"

"github.com/evalops/asb/internal/app"
"github.com/prometheus/client_golang/prometheus"
)

type MetricsOptions struct {
Registerer prometheus.Registerer
DurationBuckets []float64
}

type Metrics struct {
processed *prometheus.CounterVec
duration prometheus.Histogram
}

func NewMetrics(serviceName string, opts MetricsOptions) (*Metrics, error) {
if opts.Registerer == nil {
opts.Registerer = prometheus.DefaultRegisterer
}
if len(opts.DurationBuckets) == 0 {
opts.DurationBuckets = prometheus.DefBuckets
}

prefix := metricsPrefix(serviceName)
processed, err := registerCounterVec(
opts.Registerer,
prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: prefix + "_cleanup_processed_total",
Help: "Count of ASB cleanup items processed by type.",
},
[]string{"item_type"},
),
)
if err != nil {
return nil, err
}

duration, err := registerHistogram(
opts.Registerer,
prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: prefix + "_cleanup_pass_seconds",
Help: "Duration of ASB cleanup passes in seconds.",
Buckets: opts.DurationBuckets,
},
),
)
if err != nil {
return nil, err
}

return &Metrics{
processed: processed,
duration: duration,
}, nil
}

func (m *Metrics) recordCleanupPass(stats *app.CleanupStats, duration time.Duration) {
if m == nil {
return
}
m.duration.Observe(duration.Seconds())
if stats == nil {
return
}
m.processed.WithLabelValues("approvals").Add(float64(stats.ApprovalsExpired))
m.processed.WithLabelValues("sessions").Add(float64(stats.SessionsExpired))
m.processed.WithLabelValues("grants").Add(float64(stats.GrantsExpired))
m.processed.WithLabelValues("artifacts").Add(float64(stats.ArtifactsExpired))
}

func metricsPrefix(serviceName string) string {
serviceName = strings.TrimSpace(serviceName)
if serviceName == "" {
return "service"
}

var builder strings.Builder
for index, runeValue := range serviceName {
switch {
case unicode.IsLetter(runeValue), unicode.IsDigit(runeValue):
builder.WriteRune(unicode.ToLower(runeValue))
default:
builder.WriteByte('_')
}
if index == 0 && unicode.IsDigit(runeValue) {
builder.WriteByte('_')
}
}

prefix := strings.Trim(builder.String(), "_")
if prefix == "" {
return "service"
}
if prefix[0] >= '0' && prefix[0] <= '9' {
return "service_" + prefix
}
return prefix
}

func registerCounterVec(registerer prometheus.Registerer, collector *prometheus.CounterVec) (*prometheus.CounterVec, error) {
if err := registerer.Register(collector); err != nil {
alreadyRegistered, ok := err.(prometheus.AlreadyRegisteredError)
if !ok {
return nil, err
}
existing, ok := alreadyRegistered.ExistingCollector.(*prometheus.CounterVec)
if !ok {
return nil, err
}
return existing, nil
}
return collector, nil
}

func registerHistogram(registerer prometheus.Registerer, collector prometheus.Histogram) (prometheus.Histogram, error) {
if err := registerer.Register(collector); err != nil {
alreadyRegistered, ok := err.(prometheus.AlreadyRegisteredError)
if !ok {
return nil, err
}
existing, ok := alreadyRegistered.ExistingCollector.(prometheus.Histogram)
if !ok {
return nil, fmt.Errorf("register histogram: existing collector has unexpected type %T", alreadyRegistered.ExistingCollector)
}
return existing, nil
}
return collector, nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three helper functions fully duplicated across packages

Low Severity

metricsPrefix, registerCounterVec, and registerHistogram are verbatim copies of the identical unexported functions already in internal/app/metrics.go. Any future bug fix to one copy risks being missed in the other. These could live in a shared internal package (e.g. internal/promutil) to eliminate the duplication.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit ef4f60c. Configure here.

5 changes: 5 additions & 0 deletions internal/worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ type Config struct {
Interval time.Duration
Limit int
Logger *slog.Logger
Metrics *Metrics
}

type Runner struct {
service CleanupService
interval time.Duration
limit int
logger *slog.Logger
metrics *Metrics
}

func NewRunner(cfg Config) *Runner {
Expand All @@ -42,17 +44,20 @@ func NewRunner(cfg Config) *Runner {
interval: cfg.Interval,
limit: cfg.Limit,
logger: cfg.Logger,
metrics: cfg.Metrics,
}
}

func (r *Runner) RunOnce(ctx context.Context) (*app.CleanupStats, error) {
if r.service == nil {
return nil, fmt.Errorf("cleanup service is required")
}
startedAt := time.Now()
stats, err := r.service.RunCleanupOnce(ctx, r.limit)
if err != nil {
return nil, err
}
r.metrics.recordCleanupPass(stats, time.Since(startedAt))
r.logger.Info("worker cleanup complete",
"approvals_expired", stats.ApprovalsExpired,
"sessions_expired", stats.SessionsExpired,
Expand Down
110 changes: 110 additions & 0 deletions internal/worker/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/evalops/asb/internal/app"
"github.com/evalops/asb/internal/worker"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

func TestRunner_RunOnce(t *testing.T) {
Expand Down Expand Up @@ -41,6 +43,54 @@ func TestRunner_RunOnce(t *testing.T) {
}
}

func TestRunner_RunOnceRecordsMetrics(t *testing.T) {
t.Parallel()

registry := prometheus.NewRegistry()
metrics, err := worker.NewMetrics("asb", worker.MetricsOptions{
Registerer: registry,
})
if err != nil {
t.Fatalf("NewMetrics() error = %v", err)
}

service := &fakeCleanupService{
stats: &app.CleanupStats{
ApprovalsExpired: 1,
SessionsExpired: 2,
GrantsExpired: 3,
ArtifactsExpired: 4,
},
}
runner := worker.NewRunner(worker.Config{
Service: service,
Limit: 50,
Logger: slog.New(slog.NewTextHandler(testWriter{t}, nil)),
Metrics: metrics,
})

if _, err := runner.RunOnce(context.Background()); err != nil {
t.Fatalf("RunOnce() error = %v", err)
}

families := mustGatherMetrics(t, registry)
if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "approvals"}); got != 1 {
t.Fatalf("approval cleanup count = %v, want 1", got)
}
if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "sessions"}); got != 2 {
t.Fatalf("session cleanup count = %v, want 2", got)
}
if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "grants"}); got != 3 {
t.Fatalf("grant cleanup count = %v, want 3", got)
}
if got := metricValueWithLabels(families, "asb_cleanup_processed_total", map[string]string{"item_type": "artifacts"}); got != 4 {
t.Fatalf("artifact cleanup count = %v, want 4", got)
}
if got := histogramCountWithLabels(families, "asb_cleanup_pass_seconds", nil); got != 1 {
t.Fatalf("cleanup pass histogram count = %d, want 1", got)
}
}

func TestRunner_RunPropagatesErrors(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -138,3 +188,63 @@ func (w testWriter) Write(p []byte) (int, error) {
w.t.Log(string(p))
return len(p), nil
}

func mustGatherMetrics(t *testing.T, gatherer prometheus.Gatherer) []*dto.MetricFamily {
t.Helper()

families, err := gatherer.Gather()
if err != nil {
t.Fatalf("Gather() error = %v", err)
}
return families
}

func metricValueWithLabels(metricFamilies []*dto.MetricFamily, name string, labels map[string]string) float64 {
for _, family := range metricFamilies {
if family.GetName() != name {
continue
}
for _, metric := range family.Metric {
if !metricMatchesLabels(metric, labels) {
continue
}
switch family.GetType() {
case dto.MetricType_COUNTER:
return metric.GetCounter().GetValue()
case dto.MetricType_GAUGE:
return metric.GetGauge().GetValue()
}
}
}
return 0
}

func histogramCountWithLabels(metricFamilies []*dto.MetricFamily, name string, labels map[string]string) uint64 {
for _, family := range metricFamilies {
if family.GetName() != name || family.GetType() != dto.MetricType_HISTOGRAM {
continue
}
for _, metric := range family.Metric {
if metricMatchesLabels(metric, labels) {
return metric.GetHistogram().GetSampleCount()
}
}
}
return 0
}

func metricMatchesLabels(metric *dto.Metric, labels map[string]string) bool {
if len(labels) == 0 {
return true
}
values := make(map[string]string, len(metric.Label))
for _, label := range metric.Label {
values[label.GetName()] = label.GetValue()
}
for key, want := range labels {
if values[key] != want {
return false
}
}
return true
}
Loading