Skip to content

Commit

Permalink
Merge pull request #43 from bricks-cloud/v1.8.1
Browse files Browse the repository at this point in the history
[V1.8.2] Added new querying options for metrics and events API
  • Loading branch information
spikelu2016 committed Feb 1, 2024
2 parents af2e014 + 33ac595 commit 4f0518a
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 59 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,9 @@ This endpoint is retrieving aggregated metrics given an array of key ids and tag
> | Field | required | type | example | description |
> |---------------|-----------------------------------|-|-|-|
> | keyIds | required | `[]string` | `["key-1", "key-2", "key-3" ]` | Array of ids that specicify the keys that you want to aggregate stats from. |
> | tags | required | `[]string` | `["tag-1", "tag-2"]` | Array of tags that specicify the keys that you want to aggregate stats from. |
> | tags | required | `[]string` | `["tag-1", "tag-2"]` | Array of tags that specicify the key tags that you want to aggregate stats from. |
> | customIds | required | `[]string` | `["customId-1", "customId-2"]` | A list of custom IDs that you want to aggregate stats from. |
> | filters | required | `[]string` | `["model", "keyId"]` | Group by data points through different filters(`model`,`keyId` or `customId`). |
> | start | required | `int64` | `1699933571` | Start timestamp for the requested timeseries data. |
> | end | required | `int64` | `1699933571` | End timestamp for the requested timeseries data. |
> | increment | required | `int` | `60` | This field is the increment in seconds for the requested timeseries data. |
Expand Down Expand Up @@ -546,7 +548,10 @@ This endpoint is for getting events.
##### Query Parameters
> | name | type | data type | description |
> |--------|------------|----------------|------------------------------------------------------|
> | `customId` | optional | string | Custom identifier attached to an event |
> | `customId` | optional | `string` | Custom identifier attached to an event. |
> | `keyIds` | optional | `[]string` | A list of key IDs. |
> | `start` | required if `keyIds` is specified | `int64` | Start timestamp. |
> | `end` | required if `keyIds` is specified | `int64` | End timestamp. |
##### Error Response
> | http code | content-type |
Expand Down
2 changes: 2 additions & 0 deletions internal/event/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type DataPoint struct {
SuccessCount int `json:"successCount"`
Model string `json:"model"`
KeyId string `json:"keyId"`
CustomId string `json:"customId"`
}

type ReportingResponse struct {
Expand All @@ -21,6 +22,7 @@ type ReportingResponse struct {
type ReportingRequest struct {
KeyIds []string `json:"keyIds"`
Tags []string `json:"tags"`
CustomIds []string `json:"customIds"`
Start int64 `json:"start"`
End int64 `json:"end"`
Increment int64 `json:"increment"`
Expand Down
30 changes: 5 additions & 25 deletions internal/manager/reporting.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package manager

import (
"errors"
"fmt"

internal_errors "github.com/bricks-cloud/bricksllm/internal/errors"
"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
Expand All @@ -18,8 +15,8 @@ type keyStorage interface {
}

type eventStorage interface {
GetEvents(customId string) ([]*event.Event, error)
GetEventDataPoints(start, end, increment int64, tags, keyIds []string, filters []string) ([]*event.DataPoint, error)
GetEvents(customId string, keyIds []string, start, end int64) ([]*event.Event, error)
GetEventDataPoints(start, end, increment int64, tags, keyIds, customIds []string, filters []string) ([]*event.DataPoint, error)
GetLatencyPercentiles(start, end int64, tags, keyIds []string) ([]float64, error)
}

Expand All @@ -38,7 +35,7 @@ func NewReportingManager(cs costStorage, ks keyStorage, es eventStorage) *Report
}

func (rm *ReportingManager) GetEventReporting(e *event.ReportingRequest) (*event.ReportingResponse, error) {
dataPoints, err := rm.es.GetEventDataPoints(e.Start, e.End, e.Increment, e.Tags, e.KeyIds, e.Filters)
dataPoints, err := rm.es.GetEventDataPoints(e.Start, e.End, e.Increment, e.Tags, e.KeyIds, e.CustomIds, e.Filters)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -80,25 +77,8 @@ func (rm *ReportingManager) GetKeyReporting(keyId string) (*key.KeyReporting, er
}, err
}

func (rm *ReportingManager) GetEvent(customId string) (*event.Event, error) {
if len(customId) == 0 {
return nil, errors.New("customId cannot be empty")
}

events, err := rm.es.GetEvents(customId)
if err != nil {
return nil, err
}

if len(events) >= 1 {
return events[0], nil
}

return nil, internal_errors.NewNotFoundError(fmt.Sprintf("event is not found for customId: %s", customId))
}

func (rm *ReportingManager) GetEvents(customId string) ([]*event.Event, error) {
events, err := rm.es.GetEvents(customId)
func (rm *ReportingManager) GetEvents(customId string, keyIds []string, start, end int64) ([]*event.Event, error) {
events, err := rm.es.GetEvents(customId, keyIds, start, end)
if err != nil {
return nil, err
}
Expand Down
22 changes: 5 additions & 17 deletions internal/provider/openai/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
var OpenAiPerThousandTokenCost = map[string]map[string]float64{
"prompt": {
"gpt-4-1106-preview": 0.01,
"gpt-4-0125-preview": 0.01,
"gpt-4-1106-vision-preview": 0.01,
"gpt-4": 0.03,
"gpt-4-0314": 0.03,
Expand Down Expand Up @@ -50,27 +51,14 @@ var OpenAiPerThousandTokenCost = map[string]map[string]float64{
"ada": 0.0004,
},
"embeddings": {
"text-embedding-ada-002": 0.0001,
"text-similarity-ada-001": 0.004,
"text-search-ada-doc-001": 0.004,
"text-search-ada-query-001": 0.004,
"code-search-ada-code-001": 0.004,
"code-search-ada-text-001": 0.004,
"code-search-babbage-code-001": 0.005,
"code-search-babbage-text-001": 0.005,
"text-similarity-babbage-001": 0.005,
"text-search-babbage-doc-001": 0.005,
"text-search-babbage-query-001": 0.005,
"text-similarity-curie-001": 0.02,
"text-search-curie-doc-001": 0.02,
"text-search-curie-query-001": 0.02,
"text-search-davinci-doc-001": 0.2,
"text-search-davinci-query-001": 0.2,
"text-similarity-davinci-001": 0.2,
"text-embedding-ada-002": 0.0001,
"text-embedding-3-small": 0.00002,
"text-embedding-3-large": 0.00013,
},
"completion": {
"gpt-3.5-turbo-1106": 0.002,
"gpt-4-1106-preview": 0.03,
"gpt-4-0125-preview": 0.03,
"gpt-4-1106-vision-preview": 0.03,
"gpt-4": 0.06,
"gpt-4-0314": 0.06,
Expand Down
85 changes: 74 additions & 11 deletions internal/server/web/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/bricks-cloud/bricksllm/internal/event"
Expand Down Expand Up @@ -37,7 +38,7 @@ type KeyManager interface {

type KeyReportingManager interface {
GetKeyReporting(keyId string) (*key.KeyReporting, error)
GetEvent(customId string) (*event.Event, error)
GetEvents(customId string, keyIds []string, start int64, end int64) ([]*event.Event, error)
GetEventReporting(e *event.ReportingRequest) (*event.ReportingResponse, error)
}

Expand Down Expand Up @@ -819,27 +820,89 @@ func getGetEventsHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.
}

cid := c.GetString(correlationId)
customId, ok := c.GetQuery("customId")
if !ok {
customId, ciok := c.GetQuery("customId")
keyIds, kiok := c.GetQueryArray("keyIds")
if !ciok && !kiok {
c.JSON(http.StatusBadRequest, &ErrorResponse{
Type: "/errors/custom-id-empty",
Title: "custom id is empty",
Type: "/errors/no-filters-empty",
Title: "neither customId nor keyIds are specified",
Status: http.StatusBadRequest,
Detail: "query param customId is empty. it is required for retrieving an event.",
Detail: "both query params customId and keyIds are empty. either of them is required for retrieving events.",
Instance: path,
})

return
}

ev, err := m.GetEvent(customId)
var qstart int64 = 0
var qend int64 = 0

if kiok {
startstr, sok := c.GetQuery("start")
if !sok {
c.JSON(http.StatusBadRequest, &ErrorResponse{
Type: "/errors/query-param-start-missing",
Title: "query param start is missing",
Status: http.StatusBadRequest,
Detail: "start query param is not provided",
Instance: path,
})

return
}

parsedStart, err := strconv.ParseInt(startstr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, &ErrorResponse{
Type: "/errors/bad-start-query-param",
Title: "start query cannot be parsed",
Status: http.StatusBadRequest,
Detail: "start query param must be int64",
Instance: path,
})

return
}

qstart = parsedStart

endstr, eoi := c.GetQuery("end")
if !eoi {
c.JSON(http.StatusBadRequest, &ErrorResponse{
Type: "/errors/query-param-end-missing",
Title: "query param end is missing",
Status: http.StatusBadRequest,
Detail: "end query param is not provided",
Instance: path,
})

return
}

parsedEnd, err := strconv.ParseInt(endstr, 10, 64)
if err != nil {
c.JSON(http.StatusBadRequest, &ErrorResponse{
Type: "/errors/bad-end-query-param",
Title: "end query cannot be parsed",
Status: http.StatusBadRequest,
Detail: "end query param must be int64",
Instance: path,
})

return
}

qend = parsedEnd
}

evs, err := m.GetEvents(customId, keyIds, qstart, qend)
if err != nil {
stats.Incr("bricksllm.admin.get_get_events_handler.get_event_error", nil, 1)
stats.Incr("bricksllm.admin.get_get_events_handler.get_events_error", nil, 1)

logError(log, "error when getting an event", prod, cid, err)
logError(log, "error when getting events", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/event-manager",
Title: "getting an event error",
Title: "getting events error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
Expand All @@ -849,7 +912,7 @@ func getGetEventsHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.

stats.Incr("bricksllm.admin.get_get_events_handler.success", nil, 1)

c.JSON(http.StatusOK, []*event.Event{ev})
c.JSON(http.StatusOK, evs)
}
}

Expand Down
43 changes: 39 additions & 4 deletions internal/storage/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,36 @@ func (s *Store) InsertEvent(e *event.Event) error {
return nil
}

func (s *Store) GetEvents(customId string) ([]*event.Event, error) {
func (s *Store) GetEvents(customId string, keyIds []string, start int64, end int64) ([]*event.Event, error) {
if len(customId) == 0 && len(keyIds) == 0 {
return nil, errors.New("neither customId nor keyIds are specified")
}

if len(keyIds) == 0 && (start == 0 || end == 0) {
return nil, errors.New("keyIds are provided but either start or end is not specified")
}

query := `
SELECT * FROM events WHERE $1 = custom_id
SELECT * FROM events WHERE
`

if len(customId) != 0 {
query += fmt.Sprintf(" custom_id = '%s'", customId)
}

if len(customId) != 0 && len(keyIds) != 0 {
query += " AND"
}

if len(keyIds) != 0 {
query += fmt.Sprintf(" key_id = ANY('%s') AND created_at >= %d AND created_at <= %d", sliceToSqlStringArray(keyIds), start, end)
}

ctxTimeout, cancel := context.WithTimeout(context.Background(), s.rt)
defer cancel()

events := []*event.Event{}
rows, err := s.db.QueryContext(ctxTimeout, query, customId)
rows, err := s.db.QueryContext(ctxTimeout, query)
if err != nil {
if err == sql.ErrNoRows {
return events, nil
Expand Down Expand Up @@ -356,7 +376,7 @@ func (s *Store) GetLatencyPercentiles(start, end int64, tags, keyIds []string) (
return data, nil
}

func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []string, filters []string) ([]*event.DataPoint, error) {
func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds, customIds []string, filters []string) ([]*event.DataPoint, error) {
groupByQuery := "GROUP BY time_series_table.series"
selectQuery := "SELECT series AS time_stamp, COALESCE(COUNT(events_table.event_id),0) AS num_of_requests, COALESCE(SUM(events_table.cost_in_usd),0) AS cost_in_usd, COALESCE(SUM(events_table.latency_in_ms),0) AS latency_in_ms, COALESCE(SUM(events_table.prompt_token_count),0) AS prompt_token_count, COALESCE(SUM(events_table.completion_token_count),0) AS completion_token_count, COALESCE(SUM(CASE WHEN status_code = 200 THEN 1 END),0) AS success_count"

Expand All @@ -371,6 +391,11 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []s
groupByQuery += ",events_table.key_id"
selectQuery += ",events_table.key_id as keyId"
}

if filter == "customId" {
groupByQuery += ",events_table.custom_id"
selectQuery += ",events_table.custom_id as customId"
}
}
}

Expand Down Expand Up @@ -406,6 +431,10 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []s
conditionBlock += fmt.Sprintf("AND key_id = ANY('%s')", sliceToSqlStringArray(keyIds))
}

if len(customIds) != 0 {
conditionBlock += fmt.Sprintf("AND custom_id = ANY('%s')", sliceToSqlStringArray(customIds))
}

eventSelectionBlock += conditionBlock
eventSelectionBlock += ")"

Expand All @@ -425,6 +454,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []s
var e event.DataPoint
var model sql.NullString
var keyId sql.NullString
var customId sql.NullString

additional := []any{
&e.TimeStamp,
Expand All @@ -445,6 +475,10 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []s
if filter == "keyId" {
additional = append(additional, &keyId)
}

if filter == "customId" {
additional = append(additional, &customId)
}
}
}

Expand All @@ -457,6 +491,7 @@ func (s *Store) GetEventDataPoints(start, end, increment int64, tags, keyIds []s
pe := &e
pe.Model = model.String
pe.KeyId = keyId.String
pe.CustomId = customId.String

data = append(data, pe)
}
Expand Down

0 comments on commit 4f0518a

Please sign in to comment.