Skip to content

Commit

Permalink
implement ratelimit smoothing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffy-mathew committed Apr 27, 2024
1 parent 8ff2add commit 15e4911
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 15 deletions.
3 changes: 3 additions & 0 deletions config/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type RateLimit struct {

// Controls which algorthm to use as a fallback when your distributed rate limiter can't be used.
DRLEnableSentinelRateLimiter bool `json:"drl_enable_sentinel_rate_limiter"`

// EnableRateLimitSmoothing enables rate limit smoothing.
EnableRateLimitSmoothing bool `json:"enable_rate_limit_smoothing"`
}

// String returns a readable setting for the rate limiter in effect.
Expand Down
1 change: 1 addition & 0 deletions gateway/event_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
EventTokenCreated apidef.TykEvent = "TokenCreated"
EventTokenUpdated apidef.TykEvent = "TokenUpdated"
EventTokenDeleted apidef.TykEvent = "TokenDeleted"
EventRateLimitingSmoothed apidef.TykEvent = "RateLimitSmoothing"
)

// EventMetaDefault is a standard embedded struct to be used with custom event metadata types, gives an interface for
Expand Down
6 changes: 6 additions & 0 deletions gateway/ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gateway
import (
"errors"
"strings"
"time"

"github.com/mavricknz/ldap"
)
Expand Down Expand Up @@ -240,3 +241,8 @@ func (l LDAPStorageHandler) Exists(keyName string) (bool, error) {
log.Error("Not implemented")
return false, nil
}

func (l LDAPStorageHandler) Lock(string, time.Duration) (bool, error) {
log.Error("Not implemented")
return false, nil
}
5 changes: 5 additions & 0 deletions gateway/rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,3 +1173,8 @@ func (r *RPCStorageHandler) Exists(keyName string) (bool, error) {
log.Error("Not implemented")
return false, nil
}

func (r *RPCStorageHandler) Lock(key string, timeout time.Duration) (bool, error) {
log.Error("Not implemented")
return false, nil
}
119 changes: 104 additions & 15 deletions gateway/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/TykTechnologies/drl"
"github.com/TykTechnologies/leakybucket"
"github.com/TykTechnologies/leakybucket/memorycache"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/internal/rate"
"github.com/TykTechnologies/tyk/internal/redis"
Expand Down Expand Up @@ -79,23 +78,43 @@ func (l *SessionLimiter) Context() context.Context {
return l.ctx
}

func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string,
func (l *SessionLimiter) doRollingWindowWrite(r *http.Request, key, rateLimiterKey, rateLimiterSentinelKey string,
currentSession *user.SessionState,
store storage.Handler,
globalConf *config.Config,
apiLimit *user.APILimit, dryRun bool) bool {

ctx := l.Context()

var per, cost float64
var per, cost, maxCost float64

if apiLimit != nil { // respect limit on API level
per = apiLimit.Per
cost = apiLimit.Rate
maxCost = apiLimit.Rate
} else {
per = currentSession.Per
cost = currentSession.Rate
maxCost = currentSession.Rate
}

smoothingEnabled := globalConf.EnableRateLimitSmoothing
var smoothedAllowance float64
var smoothingConfig *user.RateLimitSmoothing
if smoothingEnabled && apiLimit != nil && apiLimit.Smoothing != nil {
smoothedAllowance = float64(getAllowanceFromSmoothing(apiLimit.Smoothing))
smoothingConfig = apiLimit.Smoothing
} else if smoothingEnabled && currentSession.Smoothing != nil {
smoothedAllowance = float64(getAllowanceFromSmoothing(currentSession.Smoothing))
smoothingConfig = currentSession.Smoothing
}
log.Debugf("before cost=%v, smoothedAllowance=%v", cost, smoothedAllowance)

if smoothedAllowance > 0 && smoothedAllowance <= cost {
cost = smoothedAllowance
}

log.Debugf("after cost=%v, smoothedAllowance=%v", cost, smoothedAllowance)

log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
Expand All @@ -118,6 +137,10 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe
log.WithError(err).Error("error writing sliding log")
}

if !dryRun && smoothingEnabled && doSmoothing(key, ratePerPeriodNow, smoothingConfig, store, maxCost) {
ctxScheduleSessionUpdate(r)
}

// Subtract by 1 because of the delayed add in the window
var subtractor int64 = 1
if globalConf.EnableSentinelRateLimiter || globalConf.DRLEnableSentinelRateLimiter {
Expand All @@ -140,6 +163,67 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe
return false
}

func getAllowanceFromSmoothing(smoothing *user.RateLimitSmoothing) int64 {
if smoothing.CurrentAllowance == 0 {
smoothing.CurrentAllowance = smoothing.Threshold
return smoothing.Threshold
}

return smoothing.CurrentAllowance
}

func doSmoothing(key string, currentRate int64,
smoothing *user.RateLimitSmoothing,
store storage.Handler, maxAllowedRate float64) bool {

if smoothing == nil {
return false
}

var newAllowance int64
if float64(currentRate) >= smoothing.Trigger*float64(smoothing.CurrentAllowance) {
// step up
newAllowance = smoothing.CurrentAllowance + smoothing.Rate
}

if float64(currentRate) <= smoothing.Trigger*float64(smoothing.CurrentAllowance-smoothing.Rate) {
// step down
newAllowance = smoothing.CurrentAllowance - smoothing.Rate
}

if newAllowance == 0 {
log.Debug("no smoothing opportunity, skipping")
return false
}

// boundary values
if newAllowance > int64(maxAllowedRate) {
log.Debugf("skipping smoothing, new allowance=(%d) over allowed rate=(%d)", newAllowance, maxAllowedRate)

Check failure on line 201 in gateway/session_manager.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

(*github.com/sirupsen/logrus.Logger).Debugf format %d has arg maxAllowedRate of wrong type float64
return false
}

if newAllowance < smoothing.Threshold {
log.Debugf("skipping smoothing, new allowance=(%d) less than threshold=(%d)", newAllowance, smoothing.Threshold)
return false
}

// add next resetAt
lockAcquired, err := store.Lock(fmt.Sprintf("reset-smoothed-%s", key), time.Second*time.Duration(smoothing.Interval))
if err != nil {
log.Info("skipping smoothing with lock db error", err)
return false
}

if !lockAcquired {
log.Info("skipping smoothing: failed to acquire lock")
return false
}

smoothing.CurrentAllowance = newAllowance
log.Infof("smoothing done, updated rate to %d", newAllowance)
return true
}

type sessionFailReason uint

const (
Expand All @@ -149,7 +233,7 @@ const (
sessionFailInternalServerError
)

func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
func (l *SessionLimiter) limitSentinel(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool {

rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash()
Expand All @@ -161,7 +245,8 @@ func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key st
}

defer func() {
go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)
go l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf,
apiLimit, dryRun)
}()

// Check sentinel
Expand All @@ -174,7 +259,7 @@ func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key st
return false
}

func (l *SessionLimiter) limitRedis(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
func (l *SessionLimiter) limitRedis(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool {

rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash()
Expand All @@ -185,10 +270,7 @@ func (l *SessionLimiter) limitRedis(currentSession *user.SessionState, key strin
rateLimiterSentinelKey = RateLimitKeyPrefix + rateScope + key + SentinelRateLimitKeyPostfix
}

if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) {
return true
}
return false
return l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)
}

func (l *SessionLimiter) limitDRL(currentSession *user.SessionState, key string, rateScope string,
Expand Down Expand Up @@ -248,7 +330,9 @@ func (sfr sessionFailReason) String() string {
// sessionFailReason if session limits have been exceeded.
// Key values to manage rate are Rate and Per, e.g. Rate of 10 messages
// Per 10 seconds
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason {
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState,
rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool,
globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason {
// check for limit on API level (set to session by ApplyPolicies)
accessDef, allowanceScope, err := GetAccessDefinitionByAPIIDOrSession(currentSession, api)
if err != nil {
Expand Down Expand Up @@ -280,11 +364,13 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se

switch {
case globalConf.EnableSentinelRateLimiter:
if l.limitSentinel(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitSentinel(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
case globalConf.EnableRedisRollingLimiter:
if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
default:
Expand All @@ -306,7 +392,8 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se
return sessionFailRateLimit
}
} else {
if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
}
Expand All @@ -327,7 +414,8 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se

}

func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string, limit *user.APILimit, store storage.Handler, hashKeys bool) bool {
func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string,
limit *user.APILimit, store storage.Handler, hashKeys bool) bool {
// Unlimited?
if limit.QuotaMax == -1 || limit.QuotaMax == 0 {
// No quota set
Expand Down Expand Up @@ -442,6 +530,7 @@ func GetAccessDefinitionByAPIIDOrSession(currentSession *user.SessionState, api
ThrottleInterval: currentSession.ThrottleInterval,
ThrottleRetryLimit: currentSession.ThrottleRetryLimit,
MaxQueryDepth: currentSession.MaxQueryDepth,
Smoothing: currentSession.Smoothing,
}
}

Expand Down
5 changes: 5 additions & 0 deletions storage/mdcb_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"errors"
"strings"
"time"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -254,3 +255,7 @@ func (m MdcbStorage) Exists(key string) (bool, error) {

return foundLocal && foundRpc, nil
}

func (m MdcbStorage) Lock(key string, timeout time.Duration) (bool, error) {
return m.local.Lock(key, timeout)
}
2 changes: 2 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"strings"
"time"

"github.com/buger/jsonparser"

Expand Down Expand Up @@ -61,6 +62,7 @@ type Handler interface {
RemoveFromList(string, string) error
AppendToSet(string, string)
Exists(string) (bool, error)
Lock(string, time.Duration) (bool, error)
}

type AnalyticsHandler interface {
Expand Down
14 changes: 14 additions & 0 deletions user/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ type APILimit struct {
QuotaRemaining int64 `json:"quota_remaining" msg:"quota_remaining"`
QuotaRenewalRate int64 `json:"quota_renewal_rate" msg:"quota_renewal_rate"`
SetBy string `json:"-" msg:"-"`

// smoothing related configuration
Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"`
}

type RateLimitSmoothing struct {
Rate int64 `json:"rate"`
Interval int64 `json:"interval"`
Threshold int64 `json:"threshold"`
Trigger float64 `json:"trigger"`
CurrentAllowance int64 `json:"currentAllowance"`
}

// AccessDefinition defines which versions of an API a key has access to
Expand Down Expand Up @@ -147,6 +158,9 @@ type SessionState struct {
// Used to store token hash
keyHash string
KeyID string `json:"-"`

// smoothing related configuration
Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"`
}

func NewSessionState() *SessionState {
Expand Down

0 comments on commit 15e4911

Please sign in to comment.