Skip to content

Commit

Permalink
hybrid drl backport changes (#2704)
Browse files Browse the repository at this point in the history
Backport of #2674 for 2.8
  • Loading branch information
gernest authored and buger committed Nov 28, 2019
1 parent eb18a1f commit 0750612
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 55 deletions.
3 changes: 3 additions & 0 deletions cli/linter/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@
"drl_notification_frequency": {
"type": "integer"
},
"drl_threshold": {
"type": "number"
},
"enable_analytics": {
"type": "boolean"
},
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ type Config struct {
PublicKeyPath string `json:"public_key_path"`
CloseIdleConnections bool `json:"close_idle_connections"`
DRLNotificationFrequency int `json:"drl_notification_frequency"`
DRLThreshold float64 `json:"drl_threshold"`
GlobalSessionLifetime int64 `bson:"global_session_lifetime" json:"global_session_lifetime"`
ForceGlobalSessionLifetime bool `bson:"force_global_session_lifetime" json:"force_global_session_lifetime"`
BundleBaseURL string `bson:"bundle_base_url" json:"bundle_base_url"`
Expand Down
166 changes: 111 additions & 55 deletions session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,101 @@ const (
sessionFailQuota
)

func (l *SessionLimiter) limitSentinel(
currentSession *user.SessionState,
key string,
apiID string,
store storage.Handler,
globalConf *config.Config,
apiLimit *user.APILimit,
dryRun bool,
) bool {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)

// Check sentinel
_, sentinelActive := store.GetRawKey(rateLimiterSentinelKey)
return sentinelActive == nil
}
func (l *SessionLimiter) limitRedis(
currentSession *user.SessionState,
key string,
apiID string,
store storage.Handler,
globalConf *config.Config,
apiLimit *user.APILimit,
dryRun bool,
) bool {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) {
return true
}
return false
}
func (l *SessionLimiter) limitDRL(
currentSession *user.SessionState,
key string,
apiID string,
apiLimit *user.APILimit,
dryRun bool,
) bool {
// In-memory limiter
if l.bucketStore == nil {
l.bucketStore = memorycache.New()
}

// If a token has been updated, we must ensure we don't use
// an old bucket an let the cache deal with it
bucketKey := ""
var currRate float64
var per float64
if apiLimit == nil {
bucketKey = key + ":" + currentSession.LastUpdated
currRate = currentSession.Rate
per = currentSession.Per
} else { // respect limit on API level
bucketKey = apiID + ":" + key + ":" + currentSession.LastUpdated
currRate = apiLimit.Rate
per = apiLimit.Per
}

// DRL will always overflow with more servers on low rates
rate := uint(currRate * float64(DRLManager.RequestTokenValue))
if rate < uint(DRLManager.CurrentTokenValue) {
rate = uint(DRLManager.CurrentTokenValue)
}

userBucket, err := l.bucketStore.Create(bucketKey, rate, time.Duration(per)*time.Second)
if err != nil {
log.Error("Failed to create bucket!")
return true
}
if dryRun {
// if userBucket is empty and not expired.
if userBucket.Remaining() == 0 && time.Now().Before(userBucket.Reset()) {
return true
}
} else {
_, errF := userBucket.Add(uint(DRLManager.CurrentTokenValue))
if errF != nil {
return true
}
}
return false
}

// ForwardMessage will enforce rate limiting, returning a non-zero
// sessionFailReason if session limits have been exceeded.
// Key values to manage rate are Rate and Per, e.g. Rate of 10 messages
Expand All @@ -111,73 +206,34 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se
}

if globalConf.EnableSentinelRateLimiter {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun)

// Check sentinel
_, sentinelActive := store.GetRawKey(rateLimiterSentinelKey)
if sentinelActive == nil {
// Sentinel is set, fail
if l.limitSentinel(currentSession, key, apiID, store, globalConf, apiLimit, dryRun) {
return sessionFailRateLimit
}
} else if globalConf.EnableRedisRollingLimiter {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit, dryRun) {
} else if globalConf.EnableRedisRollingLimiter {
if l.limitRedis(currentSession, key, apiID, store, globalConf, apiLimit, dryRun) {
return sessionFailRateLimit
}
} else {
// In-memory limiter
if l.bucketStore == nil {
l.bucketStore = memorycache.New()
var n float64
if DRLManager.Servers != nil {
n = float64(DRLManager.Servers.Count())
}

// If a token has been updated, we must ensure we don't use
// an old bucket an let the cache deal with it
bucketKey := ""
var currRate float64
var per float64
if apiLimit == nil {
bucketKey = key + ":" + currentSession.LastUpdated
currRate = currentSession.Rate
per = currentSession.Per
} else { // respect limit on API level
bucketKey = apiID + ":" + key + ":" + currentSession.LastUpdated
currRate = apiLimit.Rate
per = apiLimit.Per
}

// DRL will always overflow with more servers on low rates
rate := uint(currRate * float64(DRLManager.RequestTokenValue))
if rate < uint(DRLManager.CurrentTokenValue) {
rate = uint(DRLManager.CurrentTokenValue)
}

userBucket, err := l.bucketStore.Create(bucketKey, rate, time.Duration(per)*time.Second)
if err != nil {
log.Error("Failed to create bucket!")
return sessionFailRateLimit
rate := apiLimit.Rate / apiLimit.Per
c := globalConf.DRLThreshold
if c == 0 {
// defaults to 5
c = 5
}

if dryRun {
// if userBucket is empty and not expired.
if userBucket.Remaining() == 0 && time.Now().Before(userBucket.Reset()) {
if n <= 1 || n*c > rate {
// If we have 1 server, there is no need to strain redis at all the leaky
// bucket algorithm will suffice.
if l.limitDRL(currentSession, key, apiID, apiLimit, dryRun) {
return sessionFailRateLimit
}
} else {
_, errF := userBucket.Add(uint(DRLManager.CurrentTokenValue))
if errF != nil {
if l.limitRedis(currentSession, key, apiID, store, globalConf, apiLimit, dryRun) {
return sessionFailRateLimit
}
}
Expand Down

0 comments on commit 0750612

Please sign in to comment.