From dcbb0d981dee34ea7feb1dcc1c7eed2e574d2602 Mon Sep 17 00:00:00 2001 From: dencoded Date: Tue, 21 Aug 2018 23:53:04 -0400 Subject: [PATCH 1/2] per API partitions option added --- middleware.go | 85 +++++++++++++++------- mw_api_rate_limit.go | 1 + mw_organisation_activity.go | 2 + mw_rate_limiting.go | 1 + policy.go | 4 +- session_manager.go | 137 +++++++++++++++++++++++++++++------- user/policy.go | 1 + user/session.go | 14 +++- 8 files changed, 192 insertions(+), 53 deletions(-) diff --git a/middleware.go b/middleware.go index 3ab7b15bac7..4dbb11f69f8 100644 --- a/middleware.go +++ b/middleware.go @@ -207,6 +207,7 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er rights := session.AccessRights tags := make(map[string]bool) didQuota, didRateLimit, didACL := false, false, false + didPerAPI := make(map[string]bool) policies := session.PolicyIDs() for i, polID := range policies { policiesMu.RLock() @@ -227,31 +228,67 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er if policy.Partitions.Quota || policy.Partitions.RateLimit || policy.Partitions.Acl { // This is a partitioned policy, only apply what is active - if policy.Partitions.Quota { - if didQuota { - err := fmt.Errorf("cannot apply multiple quota policies") - log.Error(err) - return err + if policy.Partitions.PerAPI { + // new logic when you can specify quota or rate in more than one policy but for different APIs + // this new option will be applied to session ONLY when policy.Partitions.Acl == true + // as this is where we merging access rights from all policies into session + for apiID, accessRights := range policy.AccessRights { + if didPerAPI[apiID] { + err := fmt.Errorf("cannot apply multiple policies for API: %s", apiID) + log.Error(err) + return err + } + didPerAPI[apiID] = true + + // check if we already have limit on API level specified when policy was created + if accessRights.Limit != nil { + continue + } + + // limit was not specified on API level so we will populate it from policy + apiLimitFromPolicy := &user.APILimit{ + QuotaMax: -1, + SetByPolicy: true, + } + if policy.Partitions.Quota { + apiLimitFromPolicy.QuotaMax = policy.QuotaMax + apiLimitFromPolicy.QuotaRenewalRate = policy.QuotaRenewalRate + } + if policy.Partitions.RateLimit { + apiLimitFromPolicy.Rate = policy.Rate + apiLimitFromPolicy.Per = policy.Per + } + accessRights.Limit = apiLimitFromPolicy + policy.AccessRights[apiID] = accessRights } - didQuota = true - // Quotas - session.QuotaMax = policy.QuotaMax - session.QuotaRenewalRate = policy.QuotaRenewalRate - } - - if policy.Partitions.RateLimit { - if didRateLimit { - err := fmt.Errorf("cannot apply multiple rate limit policies") - log.Error(err) - return err + } else { + // legacy logic when you can specify quota or rate only in no more than one policy + if policy.Partitions.Quota { + if didQuota { + err := fmt.Errorf("cannot apply multiple quota policies") + log.Error(err) + return err + } + didQuota = true + // Quotas + session.QuotaMax = policy.QuotaMax + session.QuotaRenewalRate = policy.QuotaRenewalRate } - didRateLimit = true - // Rate limting - session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged - session.Rate = policy.Rate - session.Per = policy.Per - if policy.LastUpdated != "" { - session.LastUpdated = policy.LastUpdated + + if policy.Partitions.RateLimit { + if didRateLimit { + err := fmt.Errorf("cannot apply multiple rate limit policies") + log.Error(err) + return err + } + didRateLimit = true + // Rate limiting + session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged + session.Rate = policy.Rate + session.Per = policy.Per + if policy.LastUpdated != "" { + session.LastUpdated = policy.LastUpdated + } } } @@ -279,7 +316,7 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er session.QuotaMax = policy.QuotaMax session.QuotaRenewalRate = policy.QuotaRenewalRate - // Rate limting + // Rate limiting session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged session.Rate = policy.Rate session.Per = policy.Per diff --git a/mw_api_rate_limit.go b/mw_api_rate_limit.go index a312a3e177d..95345edcc7f 100644 --- a/mw_api_rate_limit.go +++ b/mw_api_rate_limit.go @@ -70,6 +70,7 @@ func (k *RateLimitForAPI) ProcessRequest(w http.ResponseWriter, r *http.Request, true, false, &k.Spec.GlobalConfig, + k.Spec.APIID, ) if reason == sessionFailRateLimit { diff --git a/mw_organisation_activity.go b/mw_organisation_activity.go index 8b58323f5f6..a72f653933b 100644 --- a/mw_organisation_activity.go +++ b/mw_organisation_activity.go @@ -71,6 +71,7 @@ func (k *OrganizationMonitor) ProcessRequestLive(r *http.Request) (error, int) { session.Per > 0 && session.Rate > 0, true, &k.Spec.GlobalConfig, + k.Spec.APIID, ) k.Spec.OrgSessionManager.UpdateSession(k.Spec.OrgID, &session, session.Lifetime(k.Spec.SessionLifetime), false) @@ -196,6 +197,7 @@ func (k *OrganizationMonitor) AllowAccessNext( session.Per > 0 && session.Rate > 0, true, &k.Spec.GlobalConfig, + k.Spec.APIID, ) k.Spec.OrgSessionManager.UpdateSession(k.Spec.OrgID, &session, session.Lifetime(k.Spec.SessionLifetime), false) diff --git a/mw_rate_limiting.go b/mw_rate_limiting.go index 99e204e3ef2..bbd01e606be 100644 --- a/mw_rate_limiting.go +++ b/mw_rate_limiting.go @@ -74,6 +74,7 @@ func (k *RateLimitAndQuotaCheck) ProcessRequest(w http.ResponseWriter, r *http.R !k.Spec.DisableRateLimit, !k.Spec.DisableQuota, &k.Spec.GlobalConfig, + k.Spec.APIID, ) switch reason { diff --git a/policy.go b/policy.go index 2b5280297f0..db6c195088a 100644 --- a/policy.go +++ b/policy.go @@ -17,7 +17,8 @@ type DBAccessDefinition struct { APIName string `json:"apiname"` APIID string `json:"apiid"` Versions []string `json:"versions"` - AllowedURLs []user.AccessSpec `bson:"allowed_urls" json:"allowed_urls"` // mapped string MUST be a valid regex + AllowedURLs []user.AccessSpec `bson:"allowed_urls" json:"allowed_urls"` // mapped string MUST be a valid regex + Limit *user.APILimit `json:"limit"` } func (d *DBAccessDefinition) ToRegularAD() user.AccessDefinition { @@ -26,6 +27,7 @@ func (d *DBAccessDefinition) ToRegularAD() user.AccessDefinition { APIID: d.APIID, Versions: d.Versions, AllowedURLs: d.AllowedURLs, + Limit: d.Limit, } } diff --git a/session_manager.go b/session_manager.go index 05336ef27cf..be009665de7 100644 --- a/session_manager.go +++ b/session_manager.go @@ -34,11 +34,26 @@ type SessionLimiter struct { bucketStore leakybucket.Storage } -func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *user.SessionState, store storage.Handler, globalConf *config.Config) bool { +func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, + currentSession *user.SessionState, + store storage.Handler, + globalConf *config.Config, + apiLimit *user.APILimit) bool { + + var per, rate float64 + + if apiLimit != nil { // respect limit on API level + per = apiLimit.Per + rate = apiLimit.Rate + } else { + per = currentSession.Per + rate = currentSession.Rate + } + log.Debug("[RATELIMIT] Inbound raw key is: ", key) log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey) pipeline := globalConf.EnableNonTransactionalRateLimiter - ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), "-1", pipeline) + ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(per), "-1", pipeline) //log.Info("Num Requests: ", ratePerPeriodNow) @@ -51,10 +66,10 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe // The test TestRateLimitForAPIAndRateLimitAndQuotaCheck // will only work with ththese two lines here //log.Info("break: ", (int(currentSession.Rate) - subtractor)) - if ratePerPeriodNow > int(currentSession.Rate)-subtractor { + if ratePerPeriodNow > int(rate)-subtractor { // Set a sentinel value with expire if globalConf.EnableSentinelRateLImiter { - store.SetRawKey(rateLimiterSentinelKey, "1", int64(currentSession.Per)) + store.SetRawKey(rateLimiterSentinelKey, "1", int64(per)) } return true } @@ -74,13 +89,28 @@ const ( // sessionFailReason if session limits have been exceeded. // Key values to manage rate are Rate and Per, e.g. Rate of 10 messages // Per 10 seconds -func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config) sessionFailReason { +func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, apiID string) sessionFailReason { if enableRL { + // check for limit on API level (set to session by ApplyPolicies) + var apiLimit *user.APILimit + if len(currentSession.AccessRights) > 0 { + if rights, ok := currentSession.AccessRights[apiID]; !ok { + log.WithField("apiID", apiID).Debug("[RATE] unexpected apiID") + return sessionFailRateLimit + } else { + apiLimit = rights.Limit + } + } + if globalConf.EnableSentinelRateLImiter { rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash() rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED" + if apiLimit != nil { + rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED" + } - go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf) + go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit) // Check sentinel _, sentinelActive := store.GetRawKey(rateLimiterSentinelKey) @@ -91,8 +121,12 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se } else if globalConf.EnableRedisRollingLimiter { rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash() rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED" + if apiLimit != nil { + rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED" + } - if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf) { + if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit) { return sessionFailRateLimit } } else { @@ -101,19 +135,30 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se l.bucketStore = memorycache.New() } - // If a token has been updated, we must ensure we dont use + // If a token has been updated, we must ensure we don't use // an old bucket an let the cache deal with it - bucketKey := key + ":" + currentSession.LastUpdated + bucketKey := "" + var currRate float64 + var per float64 + if apiLimit == nil { + bucketKey = key + ":" + currentSession.LastUpdated + currRate = currentSession.Rate + per = currentSession.Per + } else { // respect limit on API level + bucketKey = apiID + ":" + key + ":" + currentSession.LastUpdated + currRate = apiLimit.Rate + per = apiLimit.Per + } // DRL will always overflow with more servers on low rates - rate := uint(currentSession.Rate * float64(DRLManager.RequestTokenValue)) + rate := uint(currRate * float64(DRLManager.RequestTokenValue)) if rate < uint(DRLManager.CurrentTokenValue) { rate = uint(DRLManager.CurrentTokenValue) } userBucket, err := l.bucketStore.Create(bucketKey, rate, - time.Duration(currentSession.Per)*time.Second) + time.Duration(per)*time.Second) if err != nil { log.Error("Failed to create bucket!") return sessionFailRateLimit @@ -132,7 +177,7 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se currentSession.Allowance-- } - if l.RedisQuotaExceeded(r, currentSession, key, store) { + if l.RedisQuotaExceeded(r, currentSession, key, store, apiID) { return sessionFailQuota } } @@ -141,26 +186,57 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se } -func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler) bool { +func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, apiID string) bool { + log.Debug("[QUOTA] Inbound raw key is: ", key) + + // check for limit on API level (set to session by ApplyPolicies) + var apiLimit *user.APILimit + if len(currentSession.AccessRights) > 0 { + if rights, ok := currentSession.AccessRights[apiID]; !ok { + log.WithField("apiID", apiID).Debug("[QUOTA] unexpected apiID") + return false + } else { + apiLimit = rights.Limit + } + } + // Are they unlimited? - if currentSession.QuotaMax == -1 { + if apiLimit == nil { + if currentSession.QuotaMax == -1 { + // No quota set + return false + } + } else if apiLimit.QuotaMax == -1 { // No quota set return false } - // Create the key - log.Debug("[QUOTA] Inbound raw key is: ", key) - rawKey := QuotaKeyPrefix + currentSession.KeyHash() + rawKey := "" + var quotaRenewalRate int64 + var quotaRenews int64 + var quotaMax int64 + if apiLimit == nil { + rawKey = QuotaKeyPrefix + currentSession.KeyHash() + quotaRenewalRate = currentSession.QuotaRenewalRate + quotaRenews = currentSession.QuotaRenews + quotaMax = currentSession.QuotaMax + } else { + rawKey = QuotaKeyPrefix + apiID + "-" + currentSession.KeyHash() + quotaRenewalRate = apiLimit.QuotaRenewalRate + quotaRenews = apiLimit.QuotaRenews + quotaMax = apiLimit.QuotaMax + } + log.Debug("[QUOTA] Quota limiter key is: ", rawKey) - log.Debug("Renewing with TTL: ", currentSession.QuotaRenewalRate) + log.Debug("Renewing with TTL: ", quotaRenewalRate) // INCR the key (If it equals 1 - set EXPIRE) - qInt := store.IncrememntWithExpire(rawKey, currentSession.QuotaRenewalRate) + qInt := store.IncrememntWithExpire(rawKey, quotaRenewalRate) // if the returned val is >= quota: block - if qInt-1 >= currentSession.QuotaMax { - renewalDate := time.Unix(currentSession.QuotaRenews, 0) + if qInt-1 >= quotaMax { + renewalDate := time.Unix(quotaRenews, 0) log.Debug("Renewal Date is: ", renewalDate) - log.Debug("As epoch: ", currentSession.QuotaRenews) + log.Debug("As epoch: ", quotaRenews) log.Debug("Session: ", currentSession) log.Debug("Now:", time.Now()) if time.Now().After(renewalDate) { @@ -179,17 +255,24 @@ func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *use // If this is a new Quota period, ensure we let the end user know if qInt == 1 { current := time.Now().Unix() - currentSession.QuotaRenews = current + currentSession.QuotaRenewalRate + if apiLimit == nil { + currentSession.QuotaRenews = current + quotaRenewalRate + } else { + apiLimit.QuotaRenews = current + quotaRenewalRate + } ctxScheduleSessionUpdate(r) } // If not, pass and set the values of the session to quotamax - counter - remaining := currentSession.QuotaMax - qInt - + remaining := quotaMax - qInt if remaining < 0 { - currentSession.QuotaRemaining = 0 - } else { + remaining = 0 + } + + if apiLimit == nil { currentSession.QuotaRemaining = remaining + } else { + apiLimit.QuotaRemaining = remaining } return false diff --git a/user/policy.go b/user/policy.go index a3b151cade6..f9f43bb2112 100644 --- a/user/policy.go +++ b/user/policy.go @@ -24,4 +24,5 @@ type PolicyPartitions struct { Quota bool `bson:"quota" json:"quota"` RateLimit bool `bson:"rate_limit" json:"rate_limit"` Acl bool `bson:"acl" json:"acl"` + PerAPI bool `bson:"per_api" json:"per_api"` } diff --git a/user/session.go b/user/session.go index bcd6d67eb4e..9a24b87abf5 100644 --- a/user/session.go +++ b/user/session.go @@ -20,12 +20,24 @@ type AccessSpec struct { Methods []string `json:"methods" msg:"methods"` } +// APILimit stores quota and rate limit on ACL level (per API) +type APILimit struct { + Rate float64 `json:"rate" msg:"rate"` + Per float64 `json:"per" msg:"per"` + QuotaMax int64 `json:"quota_max" msg:"quota_max"` + QuotaRenews int64 `json:"quota_renews" msg:"quota_renews"` + QuotaRemaining int64 `json:"quota_remaining" msg:"quota_remaining"` + QuotaRenewalRate int64 `json:"quota_renewal_rate" msg:"quota_renewal_rate"` + SetByPolicy bool `json:"set_by_policy" msg:"set_by_policy"` +} + // AccessDefinition defines which versions of an API a key has access to type AccessDefinition struct { APIName string `json:"api_name" msg:"api_name"` APIID string `json:"api_id" msg:"api_id"` Versions []string `json:"versions" msg:"versions"` - AllowedURLs []AccessSpec `bson:"allowed_urls" json:"allowed_urls" msg:"allowed_urls"` // mapped string MUST be a valid regex + AllowedURLs []AccessSpec `bson:"allowed_urls" json:"allowed_urls" msg:"allowed_urls"` // mapped string MUST be a valid regex + Limit *APILimit `json:"limit" msg:"limit"` } // SessionState objects represent a current API session, mainly used for rate limiting. From 42bc797c4c88441abd3513044a795c35614d10cf Mon Sep 17 00:00:00 2001 From: dencoded Date: Wed, 22 Aug 2018 00:26:46 -0400 Subject: [PATCH 2/2] gofmt fix --- api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api.go b/api.go index 1f4fc3d135d..7c506e3e788 100644 --- a/api.go +++ b/api.go @@ -340,7 +340,7 @@ func handleGetDetail(sessionKey, apiID string, byHash bool) (interface{}, int) { log.WithFields(logrus.Fields{ "prefix": "api", "key": obfuscateKey(sessionKey), - "error": err, + "error": err, "status": "ok", }).Info("Can't retrieve key quota") }