Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func main() {
log.Sugar().Fatalf("error creating user id for users table: %v", err)
}

go store.PrepareEventsIndexes(log)

cpMemStore, err := memdb.NewCustomProvidersMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
log.Sugar().Fatalf("cannot initialize custom providers memdb: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/manager/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ func (rm *ReportingManager) GetSpentKeyReporting(r *event.SpentKeyReportingReque

func (rm *ReportingManager) GetUsageReporting(r *event.UsageReportingRequest) (*event.UsageReportingResponse, error) {
if r == nil {
return nil, internal_errors.NewValidationError("key reporting requst cannot be nil")
return nil, internal_errors.NewValidationError("key reporting request cannot be nil")
}
for _, tag := range r.Tags {
if len(tag) == 0 {
return nil, internal_errors.NewValidationError("key reporting requst tag cannot be empty")
return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty")
}
}
usage, err := rm.es.GetUsageData(r.Tags)
Expand Down
91 changes: 57 additions & 34 deletions internal/storage/postgresql/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"strings"
"time"

internal_errors "github.com/bricks-cloud/bricksllm/internal/errors"
"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/lib/pq"
"go.uber.org/zap"
)

var allowedTopBy = []string{"total_cost_in_usd", "total_requests"}
Expand Down Expand Up @@ -97,6 +99,31 @@ func (s *Store) CreateKeyIdIndexForEventsTable() error {
return nil
}

func (s *Store) PrepareEventsIndexes(logger *zap.Logger) error {
queries := []string{
`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags ON events USING GIN(tags);`,
`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_created_at_brin ON events USING BRIN(created_at);`,
`CREATE EXTENSION IF NOT EXISTS btree_gin;`,
`CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_events_tags_created_at_gin ON events USING GIN (tags, created_at);`,
}

indexTimeout := 15 * time.Minute
ctxTimeout, cancel := context.WithTimeout(context.Background(), indexTimeout)
Comment thread
sergei-bronnikov marked this conversation as resolved.
defer cancel()

for _, query := range queries {
start := time.Now()
_, err := s.db.ExecContext(ctxTimeout, query)
if err != nil {
logger.Sugar().Errorf("error preparing events indexes: %v, with query: %s", err, query)
}
execT := time.Since(start).Milliseconds()
logger.Sugar().Infof("Exec query: %s. time: %d ms", query, execT)
}

return nil
}

func (s *Store) CreateEventsTable() error {
createTableQuery := `
CREATE TABLE IF NOT EXISTS events (
Expand Down Expand Up @@ -217,7 +244,7 @@ func (s *Store) GetLatencyPercentiles(start, end int64, tags, keyIds []string) (
eventSelectionBlock := `
WITH events_table AS
(
SELECT * FROM events
SELECT * FROM events
`

conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at <= %d ", start, end)
Expand Down Expand Up @@ -402,9 +429,9 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord
WITH keys_table AS
(
SELECT key_id FROM keys WHERE created_at >= %d AND created_at < %d %s
),top_keys_table AS
),top_keys_table AS
(
SELECT
SELECT
events.key_id,
SUM(cost_in_usd) AS "CostInUsd"
FROM events
Expand All @@ -416,12 +443,12 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord
SELECT CASE
WHEN top_keys_table.key_id IS NOT NULL THEN top_keys_table.key_id
ELSE keys_table.key_id
END
END
AS key_id
, COALESCE(top_keys_table."CostInUsd", 0) AS cost_in_usd
FROM keys_table
FULL JOIN top_keys_table
ON top_keys_table.key_id = keys_table.key_id
ON top_keys_table.key_id = keys_table.key_id

`, start, end, condition, start, end, condition2)

Expand All @@ -431,7 +458,7 @@ func (s *Store) GetTopKeyDataPoints(start, end int64, tags, keyIds []string, ord
}

query += fmt.Sprintf(`
ORDER BY cost_in_usd %s
ORDER BY cost_in_usd %s
`, qorder)

if limit != 0 {
Expand Down Expand Up @@ -507,9 +534,9 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s
WITH keys_table AS
(
SELECT key_id, key_ring FROM keys WHERE created_at >= %d AND created_at < %d %s
),top_keys_table AS
),top_keys_table AS
(
SELECT
SELECT
key_ring,
SUM(cost_in_usd) AS total_cost_in_usd,
COUNT(*) AS total_requests
Expand All @@ -531,7 +558,7 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s
qtopBy = topBy
}
query += fmt.Sprintf(`
ORDER BY %s %s
ORDER BY %s %s
`, qtopBy, qorder)

if limit != 0 {
Expand Down Expand Up @@ -576,35 +603,31 @@ func (s *Store) GetTopKeyRingDataPoints(start, end int64, tags []string, order s
}

func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) {
if len(tags) == 0 {
return nil, internal_errors.NewValidationError("key reporting request tag cannot be empty")
}
condition := "tags @> $1 AND created_at > $2"
nowTime := time.Now()
sixMonthsAgo := nowTime.Add(-6 * 30 * 24 * time.Hour).Unix()
dayAgo := nowTime.Add(-24 * time.Hour).Unix()
weekAgo := nowTime.Add(-7 * 24 * time.Hour).Unix()
monthAgo := nowTime.Add(-30 * 24 * time.Hour).Unix()

args := []any{}
condition := ""

index := 1
if len(tags) > 0 {
condition += fmt.Sprintf(" tags @> $%d", index)

args = append(args, pq.Array(tags))
index++
}
args := []any{pq.Array(tags), sixMonthsAgo, dayAgo, weekAgo, monthAgo}

query := fmt.Sprintf(`
SELECT
COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd,
COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_day,
COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_week,
COALESCE(SUM(CASE WHEN created_at > %d THEN cost_in_usd ELSE 0 END), 0) AS total_cost_in_usd_last_month,
COALESCE(SUM(1), 0) AS total_requests,
COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_day,
COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_week,
COALESCE(SUM(CASE WHEN created_at > %d THEN 1 ELSE 0 END), 0) AS total_requests_last_month
FROM events
WHERE %s
`, dayAgo, weekAgo, monthAgo, dayAgo, weekAgo, monthAgo, condition)
SELECT
COALESCE(SUM(cost_in_usd), 0) AS total_cost_in_usd,
COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $3), 0) AS total_cost_in_usd_last_day,
COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $4), 0) AS total_cost_in_usd_last_week,
COALESCE(SUM(cost_in_usd) FILTER (WHERE created_at > $5), 0) AS total_cost_in_usd_last_month,
COALESCE(COUNT(*), 0) AS total_requests,
COALESCE(COUNT(*) FILTER (WHERE created_at > $3), 0) AS total_requests_last_day,
COALESCE(COUNT(*) FILTER (WHERE created_at > $4), 0) AS total_requests_last_week,
COALESCE(COUNT(*) FILTER (WHERE created_at > $5), 0) AS total_requests_last_month
FROM events
WHERE %s
`, condition)

ctx, cancel := context.WithTimeout(context.Background(), s.rt)
defer cancel()
Expand All @@ -620,7 +643,7 @@ func (s *Store) GetUsageData(tags []string) (*event.UsageData, error) {
&data.LastWeekUsageRequests,
&data.LastMonthUsageRequests,
); err != nil {
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, err
Expand Down Expand Up @@ -722,7 +745,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu
%s
FROM time_series_table
LEFT JOIN events_table
ON events_table.created_at >= time_series_table.series
ON events_table.created_at >= time_series_table.series
AND events_table.created_at < time_series_table.series + %d
%s
ORDER BY time_series_table.series;
Expand All @@ -733,7 +756,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, cu
eventSelectionBlock := `
WITH events_table AS
(
SELECT * FROM events
SELECT * FROM events
`

conditionBlock := fmt.Sprintf("WHERE created_at >= %d AND created_at < %d ", start, end)
Expand Down
Loading