diff --git a/README.md b/README.md index b2d8dd4..3451e9b 100644 --- a/README.md +++ b/README.md @@ -311,7 +311,7 @@ The JSON API now applies per-IP rate limiting through the shared `service-runtim ### Metrics -ASB now exposes Prometheus metrics on `/metrics` through the shared `service-runtime/observability` package. The initial slice covers HTTP request counters and request-latency histograms across the broker entrypoints. +ASB now exposes Prometheus metrics on `/metrics` through the shared `service-runtime/observability` package. The current slice covers HTTP request counters and request-latency histograms across the broker entrypoints, plus Postgres pool gauges when `ASB_POSTGRES_DSN` is configured. ### Shared approval notifications diff --git a/cmd/asb-api/main.go b/cmd/asb-api/main.go index 1b754d1..799c438 100644 --- a/cmd/asb-api/main.go +++ b/cmd/asb-api/main.go @@ -14,6 +14,7 @@ import ( "github.com/evalops/asb/internal/bootstrap" "github.com/evalops/service-runtime/observability" "github.com/evalops/service-runtime/ratelimit" + "github.com/prometheus/client_golang/prometheus" ) func main() { @@ -39,6 +40,10 @@ func main() { logger.Error("initialize metrics", "error", err) os.Exit(1) } + if err := registerRuntimeMetrics(runtime, prometheus.DefaultRegisterer); err != nil { + logger.Error("register runtime metrics", "error", err) + os.Exit(1) + } limiter := ratelimit.New(ratelimit.Config{ RequestsPerSecond: cfg.rateLimitRPS, diff --git a/cmd/asb-api/observability.go b/cmd/asb-api/observability.go index 2be5c20..47f37ca 100644 --- a/cmd/asb-api/observability.go +++ b/cmd/asb-api/observability.go @@ -4,8 +4,10 @@ import ( "log/slog" "net/http" + "github.com/evalops/asb/internal/bootstrap" "github.com/evalops/service-runtime/httpkit" "github.com/evalops/service-runtime/observability" + "github.com/prometheus/client_golang/prometheus" ) func newObservedHandler(logger *slog.Logger, metrics *observability.Metrics, next http.Handler) http.Handler { @@ -23,3 +25,12 @@ func newObservedHandler(logger *slog.Logger, metrics *observability.Metrics, nex observed.ServeHTTP(w, r) }) } + +func registerRuntimeMetrics(runtime *bootstrap.ServiceRuntime, registerer prometheus.Registerer) error { + if runtime == nil || runtime.DBStats == nil { + return nil + } + return observability.RegisterDBStats("asb", runtime.DBStats, observability.DBStatsOptions{ + Registerer: registerer, + }) +} diff --git a/cmd/asb-api/observability_test.go b/cmd/asb-api/observability_test.go index 227a103..29f8da1 100644 --- a/cmd/asb-api/observability_test.go +++ b/cmd/asb-api/observability_test.go @@ -1,6 +1,7 @@ package main import ( + "database/sql" "io" "log/slog" "net/http" @@ -8,6 +9,7 @@ import ( "strings" "testing" + "github.com/evalops/asb/internal/bootstrap" "github.com/evalops/service-runtime/observability" "github.com/prometheus/client_golang/prometheus" ) @@ -88,6 +90,49 @@ func TestNewObservedHandlerRecordsRequestMetrics(t *testing.T) { } } +func TestRegisterRuntimeMetricsRegistersDBStats(t *testing.T) { + t.Parallel() + + registry := prometheus.NewRegistry() + runtime := &bootstrap.ServiceRuntime{ + DBStats: func() sql.DBStats { + return sql.DBStats{ + MaxOpenConnections: 8, + OpenConnections: 5, + InUse: 3, + Idle: 2, + } + }, + } + + if err := registerRuntimeMetrics(runtime, registry); err != nil { + t.Fatalf("registerRuntimeMetrics() error = %v", err) + } + + metrics, err := observability.NewMetrics("asb", observability.MetricsOptions{ + Registerer: registry, + Gatherer: registry, + }) + if err != nil { + t.Fatalf("NewMetrics() error = %v", err) + } + + handler := newObservedHandler(discardLogger(), metrics, http.NewServeMux()) + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/metrics", nil)) + + body := recorder.Body.String() + if !strings.Contains(body, "asb_db_open_connections 5") { + t.Fatalf("metrics body = %q, want open connection gauge", body) + } + if !strings.Contains(body, "asb_db_in_use_connections 3") { + t.Fatalf("metrics body = %q, want in-use connection gauge", body) + } + if !strings.Contains(body, "asb_db_idle_connections 2") { + t.Fatalf("metrics body = %q, want idle connection gauge", body) + } +} + func discardLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } diff --git a/internal/bootstrap/service.go b/internal/bootstrap/service.go index d09de88..910a601 100644 --- a/internal/bootstrap/service.go +++ b/internal/bootstrap/service.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "crypto/rsa" "crypto/x509" + "database/sql" "encoding/json" "encoding/pem" "fmt" @@ -50,6 +51,7 @@ type ServiceRuntime struct { Service *app.Service Cleanup func() Health *HealthChecker + DBStats func() sql.DBStats } type HealthChecker struct { @@ -96,7 +98,7 @@ func NewServiceRuntime(ctx context.Context, logger *slog.Logger, options ...Serv if err != nil { return nil, err } - repository, cleanupRepository, postgresProbe, err := newRepository(ctx) + repository, cleanupRepository, postgresProbe, dbStats, err := newRepository(ctx) if err != nil { return nil, err } @@ -266,6 +268,7 @@ func NewServiceRuntime(ctx context.Context, logger *slog.Logger, options ...Serv redisProbe: redisProbe, sessionTokensReady: sessionTokens != nil, }, + DBStats: dbStats, }, nil } @@ -446,15 +449,15 @@ func newApprovalNotifier() (core.ApprovalNotifier, error) { }) } -func newRepository(ctx context.Context) (core.Repository, func(), readinessProbe, error) { +func newRepository(ctx context.Context) (core.Repository, func(), readinessProbe, func() sql.DBStats, error) { if dsn := os.Getenv("ASB_POSTGRES_DSN"); dsn != "" { pool, err := pgxpool.New(ctx, dsn) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } - return postgresstore.NewRepository(pool), pool.Close, pool.Ping, nil + return postgresstore.NewRepository(pool), pool.Close, pool.Ping, pgxPoolDBStats(pool), nil } - return memstore.NewRepository(), func() {}, nil, nil + return memstore.NewRepository(), func() {}, nil, nil, nil } func newRuntimeStore(ctx context.Context) (core.RuntimeStore, func(), readinessProbe, error) { @@ -474,6 +477,21 @@ func newRuntimeStore(ctx context.Context) (core.RuntimeStore, func(), readinessP return memstore.NewRuntimeStore(), func() {}, nil, nil } +func pgxPoolDBStats(pool *pgxpool.Pool) func() sql.DBStats { + if pool == nil { + return nil + } + return func() sql.DBStats { + stats := pool.Stat() + return sql.DBStats{ + MaxOpenConnections: int(stats.MaxConns()), + OpenConnections: int(stats.TotalConns()), + InUse: int(stats.AcquiredConns()), + Idle: int(stats.IdleConns()), + } + } +} + func loadPublicKey(path string) (any, error) { contents, err := os.ReadFile(path) if err != nil {