diff --git a/config/rate_limit.go b/config/rate_limit.go index df0530d938f..d1559ca6a52 100644 --- a/config/rate_limit.go +++ b/config/rate_limit.go @@ -30,6 +30,9 @@ type RateLimit struct { // Controls which algorthm to use as a fallback when your distributed rate limiter can't be used. DRLEnableSentinelRateLimiter bool `json:"drl_enable_sentinel_rate_limiter"` + + // EnableRateLimitSmoothing enables rate limit smoothing. + EnableRateLimitSmoothing bool `json:"enable_rate_limit_smoothing"` } // String returns a readable setting for the rate limiter in effect. diff --git a/gateway/event_system.go b/gateway/event_system.go index 9a2947b1fd3..baa46d5b8a7 100644 --- a/gateway/event_system.go +++ b/gateway/event_system.go @@ -36,6 +36,7 @@ const ( EventTokenCreated apidef.TykEvent = "TokenCreated" EventTokenUpdated apidef.TykEvent = "TokenUpdated" EventTokenDeleted apidef.TykEvent = "TokenDeleted" + EventRateLimitingSmoothed apidef.TykEvent = "RateLimitSmoothing" ) // EventMetaDefault is a standard embedded struct to be used with custom event metadata types, gives an interface for diff --git a/gateway/ldap_auth_handler.go b/gateway/ldap_auth_handler.go index 7be982a3952..13c52e6f231 100644 --- a/gateway/ldap_auth_handler.go +++ b/gateway/ldap_auth_handler.go @@ -3,6 +3,7 @@ package gateway import ( "errors" "strings" + "time" "github.com/mavricknz/ldap" ) @@ -240,3 +241,8 @@ func (l LDAPStorageHandler) Exists(keyName string) (bool, error) { log.Error("Not implemented") return false, nil } + +func (l LDAPStorageHandler) Lock(string, time.Duration) (bool, error) { + log.Error("Not implemented") + return false, nil +} diff --git a/gateway/rpc_storage_handler.go b/gateway/rpc_storage_handler.go index 2e57aaec893..59d861c87fd 100644 --- a/gateway/rpc_storage_handler.go +++ b/gateway/rpc_storage_handler.go @@ -1173,3 +1173,8 @@ func (r *RPCStorageHandler) Exists(keyName string) (bool, error) { log.Error("Not implemented") return false, nil } + +func (r *RPCStorageHandler) Lock(key string, timeout time.Duration) (bool, error) { + log.Error("Not implemented") + return false, nil +} diff --git a/gateway/session_manager.go b/gateway/session_manager.go index dd90eee0a79..7c6d251385d 100644 --- a/gateway/session_manager.go +++ b/gateway/session_manager.go @@ -10,7 +10,6 @@ import ( "github.com/TykTechnologies/drl" "github.com/TykTechnologies/leakybucket" "github.com/TykTechnologies/leakybucket/memorycache" - "github.com/TykTechnologies/tyk/config" "github.com/TykTechnologies/tyk/internal/rate" "github.com/TykTechnologies/tyk/internal/redis" @@ -79,7 +78,7 @@ func (l *SessionLimiter) Context() context.Context { return l.ctx } -func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, +func (l *SessionLimiter) doRollingWindowWrite(r *http.Request, key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *user.SessionState, store storage.Handler, globalConf *config.Config, @@ -87,15 +86,35 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe ctx := l.Context() - var per, cost float64 + var per, cost, maxCost float64 if apiLimit != nil { // respect limit on API level per = apiLimit.Per cost = apiLimit.Rate + maxCost = apiLimit.Rate } else { per = currentSession.Per cost = currentSession.Rate + maxCost = currentSession.Rate + } + + smoothingEnabled := globalConf.EnableRateLimitSmoothing + var smoothedAllowance float64 + var smoothingConfig *user.RateLimitSmoothing + if smoothingEnabled && apiLimit != nil && apiLimit.Smoothing != nil { + smoothedAllowance = float64(getAllowanceFromSmoothing(apiLimit.Smoothing)) + smoothingConfig = apiLimit.Smoothing + } else if smoothingEnabled && currentSession.Smoothing != nil { + smoothedAllowance = float64(getAllowanceFromSmoothing(currentSession.Smoothing)) + smoothingConfig = currentSession.Smoothing } + log.Debugf("before cost=%v, smoothedAllowance=%v", cost, smoothedAllowance) + + if smoothedAllowance > 0 && smoothedAllowance <= cost { + cost = smoothedAllowance + } + + log.Debugf("after cost=%v, smoothedAllowance=%v", cost, smoothedAllowance) log.Debug("[RATELIMIT] Inbound raw key is: ", key) log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey) @@ -118,6 +137,10 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe log.WithError(err).Error("error writing sliding log") } + if !dryRun && smoothingEnabled && doSmoothing(key, ratePerPeriodNow, smoothingConfig, store, maxCost) { + ctxScheduleSessionUpdate(r) + } + // Subtract by 1 because of the delayed add in the window var subtractor int64 = 1 if globalConf.EnableSentinelRateLimiter || globalConf.DRLEnableSentinelRateLimiter { @@ -140,6 +163,67 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe return false } +func getAllowanceFromSmoothing(smoothing *user.RateLimitSmoothing) int64 { + if smoothing.CurrentAllowance == 0 { + smoothing.CurrentAllowance = smoothing.Threshold + return smoothing.Threshold + } + + return smoothing.CurrentAllowance +} + +func doSmoothing(key string, currentRate int64, + smoothing *user.RateLimitSmoothing, + store storage.Handler, maxAllowedRate float64) bool { + + if smoothing == nil { + return false + } + + var newAllowance int64 + if float64(currentRate) >= smoothing.Trigger*float64(smoothing.CurrentAllowance) { + // step up + newAllowance = smoothing.CurrentAllowance + smoothing.Rate + } + + if float64(currentRate) <= smoothing.Trigger*float64(smoothing.CurrentAllowance-smoothing.Rate) { + // step down + newAllowance = smoothing.CurrentAllowance - smoothing.Rate + } + + if newAllowance == 0 { + log.Debug("no smoothing opportunity, skipping") + return false + } + + // boundary values + if newAllowance > int64(maxAllowedRate) { + log.Debugf("skipping smoothing, new allowance=(%d) over allowed rate=(%d)", newAllowance, maxAllowedRate) + return false + } + + if newAllowance < smoothing.Threshold { + log.Debugf("skipping smoothing, new allowance=(%d) less than threshold=(%d)", newAllowance, smoothing.Threshold) + return false + } + + // add next resetAt + lockAcquired, err := store.Lock(fmt.Sprintf("reset-smoothed-%s", key), time.Second*time.Duration(smoothing.Interval)) + if err != nil { + log.Info("skipping smoothing with lock db error", err) + return false + } + + if !lockAcquired { + log.Info("skipping smoothing: failed to acquire lock") + return false + } + + smoothing.CurrentAllowance = newAllowance + log.Infof("smoothing done, updated rate to %d", newAllowance) + return true +} + type sessionFailReason uint const ( @@ -149,7 +233,7 @@ const ( sessionFailInternalServerError ) -func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool, +func (l *SessionLimiter) limitSentinel(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool, store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool { rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash() @@ -161,7 +245,8 @@ func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key st } defer func() { - go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) + go l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, + apiLimit, dryRun) }() // Check sentinel @@ -174,7 +259,7 @@ func (l *SessionLimiter) limitSentinel(currentSession *user.SessionState, key st return false } -func (l *SessionLimiter) limitRedis(currentSession *user.SessionState, key string, rateScope string, useCustomKey bool, +func (l *SessionLimiter) limitRedis(r *http.Request, currentSession *user.SessionState, key string, rateScope string, useCustomKey bool, store storage.Handler, globalConf *config.Config, apiLimit *user.APILimit, dryRun bool) bool { rateLimiterKey := RateLimitKeyPrefix + rateScope + currentSession.KeyHash() @@ -185,10 +270,7 @@ func (l *SessionLimiter) limitRedis(currentSession *user.SessionState, key strin rateLimiterSentinelKey = RateLimitKeyPrefix + rateScope + key + SentinelRateLimitKeyPostfix } - if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) { - return true - } - return false + return l.doRollingWindowWrite(r, key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) } func (l *SessionLimiter) limitDRL(currentSession *user.SessionState, key string, rateScope string, @@ -248,7 +330,9 @@ func (sfr sessionFailReason) String() string { // sessionFailReason if session limits have been exceeded. // Key values to manage rate are Rate and Per, e.g. Rate of 10 messages // Per 10 seconds -func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason { +func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, + rateLimitKey string, quotaKey string, store storage.Handler, enableRL, enableQ bool, + globalConf *config.Config, api *APISpec, dryRun bool) sessionFailReason { // check for limit on API level (set to session by ApplyPolicies) accessDef, allowanceScope, err := GetAccessDefinitionByAPIIDOrSession(currentSession, api) if err != nil { @@ -280,11 +364,13 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se switch { case globalConf.EnableSentinelRateLimiter: - if l.limitSentinel(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) { + if l.limitSentinel(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, + &accessDef.Limit, dryRun) { return sessionFailRateLimit } case globalConf.EnableRedisRollingLimiter: - if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) { + if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, + &accessDef.Limit, dryRun) { return sessionFailRateLimit } default: @@ -306,7 +392,8 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se return sessionFailRateLimit } } else { - if l.limitRedis(currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, &accessDef.Limit, dryRun) { + if l.limitRedis(r, currentSession, rateLimitKey, rateScope, useCustomRateLimitKey, store, globalConf, + &accessDef.Limit, dryRun) { return sessionFailRateLimit } } @@ -327,7 +414,8 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se } -func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string, limit *user.APILimit, store storage.Handler, hashKeys bool) bool { +func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, quotaKey, scope string, + limit *user.APILimit, store storage.Handler, hashKeys bool) bool { // Unlimited? if limit.QuotaMax == -1 || limit.QuotaMax == 0 { // No quota set @@ -442,6 +530,7 @@ func GetAccessDefinitionByAPIIDOrSession(currentSession *user.SessionState, api ThrottleInterval: currentSession.ThrottleInterval, ThrottleRetryLimit: currentSession.ThrottleRetryLimit, MaxQueryDepth: currentSession.MaxQueryDepth, + Smoothing: currentSession.Smoothing, } } diff --git a/storage/mdcb_storage.go b/storage/mdcb_storage.go index b135bfcdec4..344363299f9 100644 --- a/storage/mdcb_storage.go +++ b/storage/mdcb_storage.go @@ -3,6 +3,7 @@ package storage import ( "errors" "strings" + "time" "github.com/sirupsen/logrus" ) @@ -254,3 +255,7 @@ func (m MdcbStorage) Exists(key string) (bool, error) { return foundLocal && foundRpc, nil } + +func (m MdcbStorage) Lock(key string, timeout time.Duration) (bool, error) { + return m.local.Lock(key, timeout) +} diff --git a/storage/storage.go b/storage/storage.go index 4da4d8dec66..b23fdb21f25 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -8,6 +8,7 @@ import ( "fmt" "hash" "strings" + "time" "github.com/buger/jsonparser" @@ -61,6 +62,7 @@ type Handler interface { RemoveFromList(string, string) error AppendToSet(string, string) Exists(string) (bool, error) + Lock(string, time.Duration) (bool, error) } type AnalyticsHandler interface { diff --git a/user/session.go b/user/session.go index 611cb287e1f..0990c6c1a80 100644 --- a/user/session.go +++ b/user/session.go @@ -49,6 +49,17 @@ type APILimit struct { QuotaRemaining int64 `json:"quota_remaining" msg:"quota_remaining"` QuotaRenewalRate int64 `json:"quota_renewal_rate" msg:"quota_renewal_rate"` SetBy string `json:"-" msg:"-"` + + // smoothing related configuration + Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"` +} + +type RateLimitSmoothing struct { + Rate int64 `json:"rate"` + Interval int64 `json:"interval"` + Threshold int64 `json:"threshold"` + Trigger float64 `json:"trigger"` + CurrentAllowance int64 `json:"currentAllowance"` } // AccessDefinition defines which versions of an API a key has access to @@ -147,6 +158,9 @@ type SessionState struct { // Used to store token hash keyHash string KeyID string `json:"-"` + + // smoothing related configuration + Smoothing *RateLimitSmoothing `json:"smoothing,omitempty" msg:"smoothing,omitempty"` } func NewSessionState() *SessionState {