Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ The JSON API now applies per-IP rate limiting through the shared `service-runtim

### Metrics

ASB now exposes Prometheus metrics on `/metrics` through the shared `service-runtime/observability` package. The initial slice covers HTTP request counters and request-latency histograms across the broker entrypoints.
ASB now exposes Prometheus metrics on `/metrics` through the shared `service-runtime/observability` package. The current slice covers HTTP request counters and request-latency histograms across the broker entrypoints, plus Postgres pool gauges when `ASB_POSTGRES_DSN` is configured.

### Shared approval notifications

Expand Down
5 changes: 5 additions & 0 deletions cmd/asb-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/evalops/asb/internal/bootstrap"
"github.com/evalops/service-runtime/observability"
"github.com/evalops/service-runtime/ratelimit"
"github.com/prometheus/client_golang/prometheus"
)

func main() {
Expand All @@ -39,6 +40,10 @@ func main() {
logger.Error("initialize metrics", "error", err)
os.Exit(1)
}
if err := registerRuntimeMetrics(runtime, prometheus.DefaultRegisterer); err != nil {
logger.Error("register runtime metrics", "error", err)
os.Exit(1)
}

limiter := ratelimit.New(ratelimit.Config{
RequestsPerSecond: cfg.rateLimitRPS,
Expand Down
11 changes: 11 additions & 0 deletions cmd/asb-api/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"log/slog"
"net/http"

"github.com/evalops/asb/internal/bootstrap"
"github.com/evalops/service-runtime/httpkit"
"github.com/evalops/service-runtime/observability"
"github.com/prometheus/client_golang/prometheus"
)

func newObservedHandler(logger *slog.Logger, metrics *observability.Metrics, next http.Handler) http.Handler {
Expand All @@ -23,3 +25,12 @@ func newObservedHandler(logger *slog.Logger, metrics *observability.Metrics, nex
observed.ServeHTTP(w, r)
})
}

func registerRuntimeMetrics(runtime *bootstrap.ServiceRuntime, registerer prometheus.Registerer) error {
if runtime == nil || runtime.DBStats == nil {
return nil
}
return observability.RegisterDBStats("asb", runtime.DBStats, observability.DBStatsOptions{
Registerer: registerer,
})
}
45 changes: 45 additions & 0 deletions cmd/asb-api/observability_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package main

import (
"database/sql"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/evalops/asb/internal/bootstrap"
"github.com/evalops/service-runtime/observability"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -88,6 +90,49 @@ func TestNewObservedHandlerRecordsRequestMetrics(t *testing.T) {
}
}

func TestRegisterRuntimeMetricsRegistersDBStats(t *testing.T) {
t.Parallel()

registry := prometheus.NewRegistry()
runtime := &bootstrap.ServiceRuntime{
DBStats: func() sql.DBStats {
return sql.DBStats{
MaxOpenConnections: 8,
OpenConnections: 5,
InUse: 3,
Idle: 2,
}
},
}

if err := registerRuntimeMetrics(runtime, registry); err != nil {
t.Fatalf("registerRuntimeMetrics() error = %v", err)
}

metrics, err := observability.NewMetrics("asb", observability.MetricsOptions{
Registerer: registry,
Gatherer: registry,
})
if err != nil {
t.Fatalf("NewMetrics() error = %v", err)
}

handler := newObservedHandler(discardLogger(), metrics, http.NewServeMux())
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/metrics", nil))

body := recorder.Body.String()
if !strings.Contains(body, "asb_db_open_connections 5") {
t.Fatalf("metrics body = %q, want open connection gauge", body)
}
if !strings.Contains(body, "asb_db_in_use_connections 3") {
t.Fatalf("metrics body = %q, want in-use connection gauge", body)
}
if !strings.Contains(body, "asb_db_idle_connections 2") {
t.Fatalf("metrics body = %q, want idle connection gauge", body)
}
}

func discardLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
}
28 changes: 23 additions & 5 deletions internal/bootstrap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"database/sql"
"encoding/json"
"encoding/pem"
"fmt"
Expand Down Expand Up @@ -50,6 +51,7 @@ type ServiceRuntime struct {
Service *app.Service
Cleanup func()
Health *HealthChecker
DBStats func() sql.DBStats
}

type HealthChecker struct {
Expand Down Expand Up @@ -96,7 +98,7 @@ func NewServiceRuntime(ctx context.Context, logger *slog.Logger, options ...Serv
if err != nil {
return nil, err
}
repository, cleanupRepository, postgresProbe, err := newRepository(ctx)
repository, cleanupRepository, postgresProbe, dbStats, err := newRepository(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -266,6 +268,7 @@ func NewServiceRuntime(ctx context.Context, logger *slog.Logger, options ...Serv
redisProbe: redisProbe,
sessionTokensReady: sessionTokens != nil,
},
DBStats: dbStats,
}, nil
}

Expand Down Expand Up @@ -446,15 +449,15 @@ func newApprovalNotifier() (core.ApprovalNotifier, error) {
})
}

func newRepository(ctx context.Context) (core.Repository, func(), readinessProbe, error) {
func newRepository(ctx context.Context) (core.Repository, func(), readinessProbe, func() sql.DBStats, error) {
if dsn := os.Getenv("ASB_POSTGRES_DSN"); dsn != "" {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
return postgresstore.NewRepository(pool), pool.Close, pool.Ping, nil
return postgresstore.NewRepository(pool), pool.Close, pool.Ping, pgxPoolDBStats(pool), nil
}
return memstore.NewRepository(), func() {}, nil, nil
return memstore.NewRepository(), func() {}, nil, nil, nil
}

func newRuntimeStore(ctx context.Context) (core.RuntimeStore, func(), readinessProbe, error) {
Expand All @@ -474,6 +477,21 @@ func newRuntimeStore(ctx context.Context) (core.RuntimeStore, func(), readinessP
return memstore.NewRuntimeStore(), func() {}, nil, nil
}

func pgxPoolDBStats(pool *pgxpool.Pool) func() sql.DBStats {
if pool == nil {
return nil
}
return func() sql.DBStats {
stats := pool.Stat()
return sql.DBStats{
MaxOpenConnections: int(stats.MaxConns()),
OpenConnections: int(stats.TotalConns()),
InUse: int(stats.AcquiredConns()),
Idle: int(stats.IdleConns()),
}
}
}

func loadPublicKey(path string) (any, error) {
contents, err := os.ReadFile(path)
if err != nil {
Expand Down
Loading