From 405dbfb9affb38d7677aaae9cb9b6e035550d479 Mon Sep 17 00:00:00 2001 From: Sergei Bronnikov Date: Tue, 21 Apr 2026 09:45:17 +0100 Subject: [PATCH 1/3] wip --- internal/manager/reporting.go | 4 +-- internal/storage/postgresql/event.go | 42 +++++++++++++--------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/internal/manager/reporting.go b/internal/manager/reporting.go index 2560a37..ffe2578 100644 --- a/internal/manager/reporting.go +++ b/internal/manager/reporting.go @@ -171,11 +171,11 @@ func (rm *ReportingManager) GetSpentKeyReporting(r *event.SpentKeyReportingReque func (rm *ReportingManager) GetUsageReporting(r *event.UsageReportingRequest) (*event.UsageReportingResponse, error) { if r == nil { - return nil, internal_errors.NewValidationError("key reporting requst cannot be nil") + return nil, internal_errors.NewValidationError("key reporting request cannot be nil") } for _, tag := range r.Tags { if len(tag) == 0 { - return nil, internal_errors.NewValidationError("key reporting requst tag cannot be empty") + return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty") } } usage, err := rm.es.GetUsageData(r.Tags) diff --git a/internal/storage/postgresql/event.go b/internal/storage/postgresql/event.go index 274d19b..5b56a19 100644 --- a/internal/storage/postgresql/event.go +++ b/internal/storage/postgresql/event.go @@ -10,6 +10,7 @@ import ( "strings" "time" + internal_errors "github.com/bricks-cloud/bricksllm/internal/errors" "github.com/bricks-cloud/bricksllm/internal/event" "github.com/lib/pq" ) @@ -576,35 +577,30 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s } func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { + if len(tags) == 0 { + return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty") + } + condition := "tags @> $1" nowTime := time.Now() dayAgo := nowTime.Add(-24 * time.Hour).Unix() weekAgo := nowTime.Add(-7 * 24 * time.Hour).Unix() monthAgo := nowTime.Add(-30 * 24 * time.Hour).Unix() - args := []any{} - condition := "" - - index := 1 - if len(tags) > 0 { - condition += fmt.Sprintf(" tags @> $%d", index) - - args = append(args, pq.Array(tags)) - index++ - } + args := []any{pq.Array(tags), dayAgo, weekAgo, monthAgo} query := fmt.Sprintf(` - SELECT - COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_day, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_week, - COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_month, - COALESCE(SUM(1), 0) AS total_requests, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_day, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_week, - COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_month - FROM events - WHERE %s - `, dayAgo, weekAgo, monthAgo, dayAgo, weekAgo, monthAgo, condition) + SELECT + COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $2), 0) AS total_cost_in_usd_last_day, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $3), 0) AS total_cost_in_usd_last_week, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $4), 0) AS total_cost_in_usd_last_month, + COALESCE(COUNT(*), 0) AS total_requests, + COALESCE(COUNT(*) FILTER (WHERE created_at > $2), 0) AS total_requests_last_day, + COALESCE(COUNT(*) FILTER (WHERE created_at > $3), 0) AS total_requests_last_week, + COALESCE(COUNT(*) FILTER (WHERE created_at > $4), 0) AS total_requests_last_month + FROM events + WHERE %s + `, condition) ctx, cancel := context.WithTimeout(context.Background(), s.rt) defer cancel() @@ -620,7 +616,7 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { &data.LastWeekUsageRequests, &data.LastMonthUsageRequests, ); err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return nil, nil } return nil, err From 4fe82f768199e1742f1de0c46f9c95349e3861bf Mon Sep 17 00:00:00 2001 From: Sergei Bronnikov Date: Tue, 21 Apr 2026 12:29:29 +0100 Subject: [PATCH 2/3] optimize query. six month --- internal/event/key_reporting.go | 18 ++++++++++-------- internal/storage/postgresql/event.go | 21 +++++++++++---------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/internal/event/key_reporting.go b/internal/event/key_reporting.go index 6ebf3f4..bc8de8b 100644 --- a/internal/event/key_reporting.go +++ b/internal/event/key_reporting.go @@ -63,14 +63,16 @@ type UsageReportingRequest struct { } type UsageData struct { - LastDayUsage float64 `json:"lastDayUsage"` - LastWeekUsage float64 `json:"lastWeekUsage"` - LastMonthUsage float64 `json:"lastMonthUsage"` - TotalUsage float64 `json:"totalUsage"` - LastDayUsageRequests int `json:"lastDayUsageRequests"` - LastWeekUsageRequests int `json:"lastWeekUsageRequests"` - LastMonthUsageRequests int `json:"lastMonthUsageRequests"` - TotalUsageRequests int `json:"totalUsageRequests"` + LastDayUsage float64 `json:"lastDayUsage"` + LastWeekUsage float64 `json:"lastWeekUsage"` + LastMonthUsage float64 `json:"lastMonthUsage"` + LastSixMonthUsage float64 `json:"lastSixMonthUsage"` + TotalUsage float64 `json:"totalUsage"` + LastDayUsageRequests int `json:"lastDayUsageRequests"` + LastWeekUsageRequests int `json:"lastWeekUsageRequests"` + LastMonthUsageRequests int `json:"lastMonthUsageRequests"` + LastSixMonthUsageRequests int `json:"lastSixMonthUsageRequests"` + TotalUsageRequests int `json:"totalUsageRequests"` } type UsageReportingResponse struct { diff --git a/internal/storage/postgresql/event.go b/internal/storage/postgresql/event.go index 5b56a19..b0fcfc1 100644 --- a/internal/storage/postgresql/event.go +++ b/internal/storage/postgresql/event.go @@ -580,24 +580,25 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { if len(tags) == 0 { return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty") } - condition := "tags @> $1" + condition := "tags @> $1 AND created_at > $2" nowTime := time.Now() + sixMonthsAgo := nowTime.Add(-6 * 30 * 24 * time.Hour).Unix() dayAgo := nowTime.Add(-24 * time.Hour).Unix() weekAgo := nowTime.Add(-7 * 24 * time.Hour).Unix() monthAgo := nowTime.Add(-30 * 24 * time.Hour).Unix() - args := []any{pq.Array(tags), dayAgo, weekAgo, monthAgo} + args := []any{pq.Array(tags), sixMonthsAgo, dayAgo, weekAgo, monthAgo} query := fmt.Sprintf(` SELECT COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd, - COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $2), 0) AS total_cost_in_usd_last_day, - COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $3), 0) AS total_cost_in_usd_last_week, - COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $4), 0) AS total_cost_in_usd_last_month, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $3), 0) AS total_cost_in_usd_last_day, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $4), 0) AS total_cost_in_usd_last_week, + COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $5), 0) AS total_cost_in_usd_last_month, COALESCE(COUNT(*), 0) AS total_requests, - COALESCE(COUNT(*) FILTER (WHERE created_at > $2), 0) AS total_requests_last_day, - COALESCE(COUNT(*) FILTER (WHERE created_at > $3), 0) AS total_requests_last_week, - COALESCE(COUNT(*) FILTER (WHERE created_at > $4), 0) AS total_requests_last_month + COALESCE(COUNT(*) FILTER (WHERE created_at > $3), 0) AS total_requests_last_day, + COALESCE(COUNT(*) FILTER (WHERE created_at > $4), 0) AS total_requests_last_week, + COALESCE(COUNT(*) FILTER (WHERE created_at > $5), 0) AS total_requests_last_month FROM events WHERE %s `, condition) @@ -607,11 +608,11 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { data := &event.UsageData{} if err := s.db.QueryRowContext(ctx, query, args...).Scan( - &data.TotalUsage, + &data.LastSixMonthUsage, &data.LastDayUsage, &data.LastWeekUsage, &data.LastMonthUsage, - &data.TotalUsageRequests, + &data.LastSixMonthUsageRequests, &data.LastDayUsageRequests, &data.LastWeekUsageRequests, &data.LastMonthUsageRequests, From 458c9c22b6bb0b28d67e45dee2080086efd76f56 Mon Sep 17 00:00:00 2001 From: Sergei Bronnikov Date: Tue, 21 Apr 2026 14:22:27 +0100 Subject: [PATCH 3/3] indexes --- cmd/bricksllm/main.go | 2 ++ internal/event/key_reporting.go | 18 +++++----- internal/storage/postgresql/event.go | 52 +++++++++++++++++++++------- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/cmd/bricksllm/main.go b/cmd/bricksllm/main.go index 0bdd4b3..a95a273 100644 --- a/cmd/bricksllm/main.go +++ b/cmd/bricksllm/main.go @@ -162,6 +162,8 @@ func main() { log.Sugar().Fatalf("error creating user id for users table: %v", err) } + go store.PrepareEventsIndexes(log) + cpMemStore, err := memdb.NewCustomProvidersMemDb(store, log, cfg.InMemoryDbUpdateInterval) if err != nil { log.Sugar().Fatalf("cannot initialize custom providers memdb: %v", err) diff --git a/internal/event/key_reporting.go b/internal/event/key_reporting.go index bc8de8b..6ebf3f4 100644 --- a/internal/event/key_reporting.go +++ b/internal/event/key_reporting.go @@ -63,16 +63,14 @@ type UsageReportingRequest struct { } type UsageData struct { - LastDayUsage float64 `json:"lastDayUsage"` - LastWeekUsage float64 `json:"lastWeekUsage"` - LastMonthUsage float64 `json:"lastMonthUsage"` - LastSixMonthUsage float64 `json:"lastSixMonthUsage"` - TotalUsage float64 `json:"totalUsage"` - LastDayUsageRequests int `json:"lastDayUsageRequests"` - LastWeekUsageRequests int `json:"lastWeekUsageRequests"` - LastMonthUsageRequests int `json:"lastMonthUsageRequests"` - LastSixMonthUsageRequests int `json:"lastSixMonthUsageRequests"` - TotalUsageRequests int `json:"totalUsageRequests"` + LastDayUsage float64 `json:"lastDayUsage"` + LastWeekUsage float64 `json:"lastWeekUsage"` + LastMonthUsage float64 `json:"lastMonthUsage"` + TotalUsage float64 `json:"totalUsage"` + LastDayUsageRequests int `json:"lastDayUsageRequests"` + LastWeekUsageRequests int `json:"lastWeekUsageRequests"` + LastMonthUsageRequests int `json:"lastMonthUsageRequests"` + TotalUsageRequests int `json:"totalUsageRequests"` } type UsageReportingResponse struct { diff --git a/internal/storage/postgresql/event.go b/internal/storage/postgresql/event.go index b0fcfc1..c3877e6 100644 --- a/internal/storage/postgresql/event.go +++ b/internal/storage/postgresql/event.go @@ -13,6 +13,7 @@ import ( internal_errors "github.com/bricks-cloud/bricksllm/internal/errors" "github.com/bricks-cloud/bricksllm/internal/event" "github.com/lib/pq" + "go.uber.org/zap" ) var allowedTopBy = []string{"total_cost_in_usd", "total_requests"} @@ -98,6 +99,31 @@ func (s *Store) CreateKeyIdIndexForEventsTable() error { return nil } +func (s *Store) PrepareEventsIndexes(logger *zap.Logger) error { + queries := []string{ + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags ON events USING GIN(tags);`, + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_created_at_brin ON events USING BRIN(created_at);`, + `CREATE EXTENSION IF NOT EXISTS btree_gin;`, + `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags_created_at_gin ON events USING GIN (tags, created_at);`, + } + + indexTimeout := 15 * time.Minute + ctxTimeout, cancel := context.WithTimeout(context.Background(), indexTimeout) + defer cancel() + + for _, query := range queries { + start := time.Now() + _, err := s.db.ExecContext(ctxTimeout, query) + if err != nil { + logger.Sugar().Errorf("error preparing events indexes: %v, with query: %s", err, query) + } + execT := time.Since(start).Milliseconds() + logger.Sugar().Infof("Exec query: %s. time: %d ms", query, execT) + } + + return nil +} + func (s *Store) CreateEventsTable() error { createTableQuery := ` CREATE TABLE IF NOT EXISTS events ( @@ -218,7 +244,7 @@ func (s *Store) GetLatencyPercentiles(start, end int64, tags, keyIds []string) ( eventSelectionBlock := ` WITH events_table AS ( - SELECT * FROM events + SELECT * FROM events ` conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at <= %d ", start, end) @@ -403,9 +429,9 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord WITH keys_table AS ( SELECT key_id FROM keys WHERE created_at >= %d AND created_at < %d %s - ),top_keys_table AS + ),top_keys_table AS ( - SELECT + SELECT events.key_id, SUM(cost_in_usd) AS "CostInUsd" FROM events @@ -417,12 +443,12 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord SELECT CASE WHEN top_keys_table.key_id IS NOT NULL THEN top_keys_table.key_id ELSE keys_table.key_id - END + END AS key_id , COALESCE(top_keys_table."CostInUsd", 0) AS cost_in_usd FROM keys_table FULL JOIN top_keys_table - ON top_keys_table.key_id = keys_table.key_id + ON top_keys_table.key_id = keys_table.key_id `, start, end, condition, start, end, condition2) @@ -432,7 +458,7 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord } query += fmt.Sprintf(` - ORDER BY cost_in_usd %s + ORDER BY cost_in_usd %s `, qorder) if limit != 0 { @@ -508,9 +534,9 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s WITH keys_table AS ( SELECT key_id, key_ring FROM keys WHERE created_at >= %d AND created_at < %d %s - ),top_keys_table AS + ),top_keys_table AS ( - SELECT + SELECT key_ring, SUM(cost_in_usd) AS total_cost_in_usd, COUNT(*) AS total_requests @@ -532,7 +558,7 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s qtopBy = topBy } query += fmt.Sprintf(` - ORDER BY %s %s + ORDER BY %s %s `, qtopBy, qorder) if limit != 0 { @@ -608,11 +634,11 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) { data := &event.UsageData{} if err := s.db.QueryRowContext(ctx, query, args...).Scan( - &data.LastSixMonthUsage, + &data.TotalUsage, &data.LastDayUsage, &data.LastWeekUsage, &data.LastMonthUsage, - &data.LastSixMonthUsageRequests, + &data.TotalUsageRequests, &data.LastDayUsageRequests, &data.LastWeekUsageRequests, &data.LastMonthUsageRequests, @@ -719,7 +745,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu %s FROM time_series_table LEFT JOIN events_table - ON events_table.created_at >= time_series_table.series + ON events_table.created_at >= time_series_table.series AND events_table.created_at < time_series_table.series + %d %s ORDER BY time_series_table.series; @@ -730,7 +756,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu eventSelectionBlock := ` WITH events_table AS ( - SELECT * FROM events + SELECT * FROM events ` conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at < %d ", start, end)