Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-11927] [poc]implement ratelimit smoothing #6250

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type RateLimit struct {

// Controls which algorthm to use as a fallback when your distributed rate limiter can't be used.
DRLEnableSentinelRateLimiter bool `json:"drl_enable_sentinel_rate_limiter"`

// EnableRateLimitSmoothing enables rate limit smoothing.
EnableRateLimitSmoothing bool `json:"enable_rate_limit_smoothing"`
}

// String returns a readable setting for the rate limiter in effect.
Expand Down
1 change: 1 addition & 0 deletions gateway/event_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
EventTokenCreated apidef.TykEvent = "TokenCreated"
EventTokenUpdated apidef.TykEvent = "TokenUpdated"
EventTokenDeleted apidef.TykEvent = "TokenDeleted"
EventRateLimitingSmoothed apidef.TykEvent = "RateLimitSmoothing"
)

// EventMetaDefault is a standard embedded struct to be used with custom event metadata types, gives an interface for
Expand Down
6 changes: 6 additions & 0 deletions gateway/ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gateway
import (
"errors"
"strings"
"time"

"github.com/mavricknz/ldap"
)
Expand Down Expand Up @@ -240,3 +241,8 @@ func (l LDAPStorageHandler) Exists(keyName string) (bool, error) {
log.Error("Not implemented")
return false, nil
}

func (l LDAPStorageHandler) Lock(string, time.Duration) (bool, error) {
log.Error("Not implemented")
return false, nil
}
5 changes: 5 additions & 0 deletions gateway/rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,3 +1173,8 @@ func (r *RPCStorageHandler) Exists(keyName string) (bool, error) {
log.Error("Not implemented")
return false, nil
}

func (r *RPCStorageHandler) Lock(key string, timeout time.Duration) (bool, error) {
log.Error("Not implemented")
return false, nil
}
119 changes: 104 additions & 15 deletions gateway/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"github.com/TykTechnologies/drl"
"github.com/TykTechnologies/leakybucket"
"github.com/TykTechnologies/leakybucket/memorycache"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/internal/rate"
"github.com/TykTechnologies/tyk/internal/redis"
Expand Down Expand Up @@ -79,23 +78,43 @@
return l.ctx
}

func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string,
func (l *SessionLimiter) doRollingWindowWrite(r *http.Request, key, rateLimiterKey, rateLimiterSentinelKey string,
currentSession *user.SessionState,
store storage.Handler,
globalConf *config.Config,
apiLimit *user.APILimit, dryRun bool) bool {

ctx := l.Context()

var per, cost float64
var per, cost, maxCost float64

if apiLimit != nil { // respect limit on API level
per = apiLimit.Per
cost = apiLimit.Rate
maxCost = apiLimit.Rate
} else {
per = currentSession.Per
cost = currentSession.Rate
maxCost = currentSession.Rate
}

smoothingEnabled := globalConf.EnableRateLimitSmoothing
var smoothedAllowance float64
var smoothingConfig *user.RateLimitSmoothing
if smoothingEnabled && apiLimit != nil && apiLimit.Smoothing != nil {
smoothedAllowance = float64(getAllowanceFromSmoothing(apiLimit.Smoothing))
smoothingConfig = apiLimit.Smoothing
} else if smoothingEnabled && currentSession.Smoothing != nil {
smoothedAllowance = float64(getAllowanceFromSmoothing(currentSession.Smoothing))
smoothingConfig = currentSession.Smoothing
}
log.Debugf("before cost=%v, smoothedAllowance=%v", cost, smoothedAllowance)

if smoothedAllowance > 0 && smoothedAllowance <= cost {
cost = smoothedAllowance
}

log.Debugf("after cost=%v, smoothedAllowance=%v", cost, smoothedAllowance)

log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
Expand All @@ -118,6 +137,10 @@
log.WithError(err).Error("error writing sliding log")
}

if !dryRun && smoothingEnabled && doSmoothing(key, ratePerPeriodNow, smoothingConfig, store, maxCost) {
ctxScheduleSessionUpdate(r)
}

// Subtract by 1 because of the delayed add in the window
var subtractor int64 = 1
if globalConf.EnableSentinelRateLimiter || globalConf.DRLEnableSentinelRateLimiter {
Expand All @@ -140,6 +163,67 @@
return false
}

func getAllowanceFromSmoothing(smoothing *user.RateLimitSmoothing) int64 {
if smoothing.CurrentAllowance == 0 {
smoothing.CurrentAllowance = smoothing.Threshold
return smoothing.Threshold
}

return smoothing.CurrentAllowance
}

func doSmoothing(key string, currentRate int64,
smoothing *user.RateLimitSmoothing,
store storage.Handler, maxAllowedRate float64) bool {

if smoothing == nil {
return false
}

var newAllowance int64
if float64(currentRate) >= smoothing.Trigger*float64(smoothing.CurrentAllowance) {
// step up
newAllowance = smoothing.CurrentAllowance + smoothing.Rate
}

if float64(currentRate) <= smoothing.Trigger*float64(smoothing.CurrentAllowance-smoothing.Rate) {
// step down
newAllowance = smoothing.CurrentAllowance - smoothing.Rate
}

if newAllowance == 0 {
log.Debug("no smoothing opportunity, skipping")
return false
}

// boundary values
if newAllowance > int64(maxAllowedRate) {
log.Debugf("skipping smoothing, new allowance=(%d) over allowed rate=(%d)", newAllowance, maxAllowedRate)

Check failure on line 201 in gateway/session_manager.go

View workflow job for this annotation

GitHub Actions / Analyze (go)

(*github.com/sirupsen/logrus.Logger).Debugf format %d has arg maxAllowedRate of wrong type float64
return false
}

if newAllowance < smoothing.Threshold {
log.Debugf("skipping smoothing, new allowance=(%d) less than threshold=(%d)", newAllowance, smoothing.Threshold)
return false
}

// add next resetAt
lockAcquired, err := store.Lock(fmt.Sprintf("reset-smoothed-%s", key), time.Second*time.Duration(smoothing.Interval))
if err != nil {
log.Info("skipping smoothing with lock db error", err)
return false
}

if !lockAcquired {
log.Info("skipping smoothing: failed to acquire lock")
return false
}

smoothing.CurrentAllowance = newAllowance
log.Infof("smoothing done, updated rate to %d", newAllowance)
return true
}

type sessionFailReason uint

const (
Expand All @@ -149,7 +233,7 @@
sessionFailInternalServerError
)

func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
func (l *SessionLimiter) limitSentinel(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool {

rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash()
Expand All @@ -161,7 +245,8 @@
}

defer func() {
go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)
go l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf,
apiLimit, dryRun)
}()

// Check sentinel
Expand All @@ -174,7 +259,7 @@
return false
}

func (l *SessionLimiter) limitRedis(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
func (l *SessionLimiter) limitRedis(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool,
store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool {

rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash()
Expand All @@ -185,10 +270,7 @@
rateLimiterSentinelKey = RateLimitKeyPrefix + rateScope + key + SentinelRateLimitKeyPostfix
}

if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) {
return true
}
return false
return l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)
}

func (l *SessionLimiter) limitDRL(currentSession *user.SessionState, key string, rateScope string,
Expand Down Expand Up @@ -248,7 +330,9 @@
// sessionFailReason if session limits have been exceeded.
// Key values to manage rate are Rate and Per, e.g. Rate of 10 messages
// Per 10 seconds
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason {
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState,
rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool,
globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason {
// check for limit on API level (set to session by ApplyPolicies)
accessDef, allowanceScope, err := GetAccessDefinitionByAPIIDOrSession(currentSession, api)
if err != nil {
Expand Down Expand Up @@ -280,11 +364,13 @@

switch {
case globalConf.EnableSentinelRateLimiter:
if l.limitSentinel(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitSentinel(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
case globalConf.EnableRedisRollingLimiter:
if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
default:
Expand All @@ -306,7 +392,8 @@
return sessionFailRateLimit
}
} else {
if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) {
if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf,
&accessDef.Limit, dryRun) {
return sessionFailRateLimit
}
}
Expand All @@ -327,7 +414,8 @@

}

func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string, limit *user.APILimit, store storage.Handler, hashKeys bool) bool {
func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string,
limit *user.APILimit, store storage.Handler, hashKeys bool) bool {
// Unlimited?
if limit.QuotaMax == -1 || limit.QuotaMax == 0 {
// No quota set
Expand Down Expand Up @@ -442,6 +530,7 @@
ThrottleInterval: currentSession.ThrottleInterval,
ThrottleRetryLimit: currentSession.ThrottleRetryLimit,
MaxQueryDepth: currentSession.MaxQueryDepth,
Smoothing: currentSession.Smoothing,
}
}

Expand Down
5 changes: 5 additions & 0 deletions storage/mdcb_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"errors"
"strings"
"time"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -254,3 +255,7 @@ func (m MdcbStorage) Exists(key string) (bool, error) {

return foundLocal && foundRpc, nil
}

func (m MdcbStorage) Lock(key string, timeout time.Duration) (bool, error) {
return m.local.Lock(key, timeout)
}
2 changes: 2 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash"
"strings"
"time"

"github.com/buger/jsonparser"

Expand Down Expand Up @@ -61,6 +62,7 @@ type Handler interface {
RemoveFromList(string, string) error
AppendToSet(string, string)
Exists(string) (bool, error)
Lock(string, time.Duration) (bool, error)
}

type AnalyticsHandler interface {
Expand Down
14 changes: 14 additions & 0 deletions user/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ type APILimit struct {
QuotaRemaining int64 `json:"quota_remaining" msg:"quota_remaining"`
QuotaRenewalRate int64 `json:"quota_renewal_rate" msg:"quota_renewal_rate"`
SetBy string `json:"-" msg:"-"`

// smoothing related configuration
Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"`
}

type RateLimitSmoothing struct {
Rate int64 `json:"rate"`
Interval int64 `json:"interval"`
Threshold int64 `json:"threshold"`
Trigger float64 `json:"trigger"`
CurrentAllowance int64 `json:"currentAllowance"`
}

// AccessDefinition defines which versions of an API a key has access to
Expand Down Expand Up @@ -147,6 +158,9 @@ type SessionState struct {
// Used to store token hash
keyHash string
KeyID string `json:"-"`

// smoothing related configuration
Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"`
}

func NewSessionState() *SessionState {
Expand Down
Loading