Skip to content

Commit

Permalink
#46 Added latency filed to langModels + protected movingAverage by mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-glushko committed Jan 13, 2024
1 parent b5a8fe6 commit 47f7a50
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 25 deletions.
10 changes: 8 additions & 2 deletions pkg/providers/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package providers
import (
"context"
"errors"

"glide/pkg/providers/clients"
"glide/pkg/routers/health"
"glide/pkg/routers/latency"

"glide/pkg/api/schemas"
)
Expand All @@ -16,12 +16,13 @@ type LangModelProvider interface {
Chat(ctx context.Context, request *schemas.UnifiedChatRequest) (*schemas.UnifiedChatResponse, error)
}

// LangModel
// LangModel wraps provider client and expend it with health & latency tracking
type LangModel struct {
modelID string
client LangModelProvider
rateLimit *health.RateLimitTracker
errorBudget *health.TokenBucket // TODO: centralize provider API health tracking in the registry
latency *latency.MovingAverage
}

func NewLangModel(modelID string, client LangModelProvider, budget health.ErrorBudget) *LangModel {
Expand All @@ -30,6 +31,7 @@ func NewLangModel(modelID string, client LangModelProvider, budget health.ErrorB
client: client,
rateLimit: health.NewRateLimitTracker(),
errorBudget: health.NewTokenBucket(budget.TimePerTokenMicro(), budget.Budget()),
latency: latency.NewMovingAverage(0.05, 3), // TODO: set from configs
}
}

Expand All @@ -41,6 +43,10 @@ func (m *LangModel) Provider() string {
return m.client.Provider()
}

func (m *LangModel) Latency() *latency.MovingAverage {
return m.latency
}

func (m *LangModel) Healthy() bool {
return !m.rateLimit.Limited() && m.errorBudget.HasTokens()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package routing
package latency

import "sync"

// MovingAverage represents the exponentially weighted moving average of a series of numbers
type MovingAverage struct {
mu sync.RWMutex
// The multiplier factor by which the previous samples decay
decay float64
// The current value of the average
Expand All @@ -14,6 +17,7 @@ type MovingAverage struct {

func NewMovingAverage(decay float64, warmupSamples uint8) *MovingAverage {
return &MovingAverage{
mu: sync.RWMutex{},
decay: decay,
warmupSamples: warmupSamples,
count: 0,
Expand All @@ -23,6 +27,9 @@ func NewMovingAverage(decay float64, warmupSamples uint8) *MovingAverage {

// Add a value to the series and updates the moving average
func (e *MovingAverage) Add(value float64) {
e.mu.Lock()
defer e.mu.Unlock()

switch {
case e.count < e.warmupSamples:
e.count++
Expand All @@ -37,12 +44,18 @@ func (e *MovingAverage) Add(value float64) {
}

func (e *MovingAverage) WarmedUp() bool {
e.mu.RLock()
defer e.mu.RUnlock()

return e.count > e.warmupSamples
}

// Value returns the current value of the average, or 0.0 if the series hasn't
// warmed up yet
func (e *MovingAverage) Value() float64 {
e.mu.RLock()
defer e.mu.RUnlock()

if !e.WarmedUp() {
return 0.0
}
Expand All @@ -52,7 +65,9 @@ func (e *MovingAverage) Value() float64 {

// Set sets the moving average value
func (e *MovingAverage) Set(value float64) {
e.mu.Lock()
e.value = value
e.mu.Unlock()

if !e.WarmedUp() {
e.count = e.warmupSamples + 1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package routing
package latency

import (
"testing"
Expand Down
42 changes: 21 additions & 21 deletions pkg/routers/routing/least_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ func NewLeastLatencyRouting(models []*providers.LangModel) *LeastLatencyRouting
}
}

//func (r *LeastLatencyRouting) Iterator() LangModelIterator {
// return r
//}

//func (r LeastLatencyRouting) Next() (*providers.LangModel, error) {
// models := r.models
//
// for _, model := range models {
// if !model.Healthy() {
// // cannot do much with unavailable model
// continue
// }
//
// return model, nil
// }
//
// // responseTime := time.Since(startTime)
// // h.avgResponseTime = lb.alpha*float64(responseTime/time.Millisecond) + (1.0-lb.alpha)*h.avgResponseTime
//
// //return nil, ErrNoHealthyModels
//}
func (r *LeastLatencyRouting) Iterator() LangModelIterator {
return r
}

func (r LeastLatencyRouting) Next() (*providers.LangModel, error) {
models := r.models

for _, model := range models {
if !model.Healthy() {
// cannot do much with unavailable model
continue
}

return model, nil
}

// responseTime := time.Since(startTime)
// h.avgResponseTime = lb.alpha*float64(responseTime/time.Millisecond) + (1.0-lb.alpha)*h.avgResponseTime

//return nil, ErrNoHealthyModels
}

Check failure on line 44 in pkg/routers/routing/least_latency.go

View workflow job for this annotation

GitHub Actions / Vulnerability Check

missing return

Check failure on line 44 in pkg/routers/routing/least_latency.go

View workflow job for this annotation

GitHub Actions / Build

missing return

Check failure on line 44 in pkg/routers/routing/least_latency.go

View workflow job for this annotation

GitHub Actions / Tests

missing return

0 comments on commit 47f7a50

Please sign in to comment.