Skip to content

Commit

Permalink
Merge 42bc797 into 04972c3
Browse files Browse the repository at this point in the history
  • Loading branch information
dencoded committed Aug 22, 2018
2 parents 04972c3 + 42bc797 commit 5c3df87
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 53 deletions.
85 changes: 61 additions & 24 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er
rights := session.AccessRights
tags := make(map[string]bool)
didQuota, didRateLimit, didACL := false, false, false
didPerAPI := make(map[string]bool)
policies := session.PolicyIDs()
for i, polID := range policies {
policiesMu.RLock()
Expand All @@ -226,31 +227,67 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er

if policy.Partitions.Quota || policy.Partitions.RateLimit || policy.Partitions.Acl {
// This is a partitioned policy, only apply what is active
if policy.Partitions.Quota {
if didQuota {
err := fmt.Errorf("cannot apply multiple quota policies")
log.Error(err)
return err
if policy.Partitions.PerAPI {
// new logic when you can specify quota or rate in more than one policy but for different APIs
// this new option will be applied to session ONLY when policy.Partitions.Acl == true
// as this is where we merging access rights from all policies into session
for apiID, accessRights := range policy.AccessRights {
if didPerAPI[apiID] {
err := fmt.Errorf("cannot apply multiple policies for API: %s", apiID)
log.Error(err)
return err
}
didPerAPI[apiID] = true

// check if we already have limit on API level specified when policy was created
if accessRights.Limit != nil {
continue
}

// limit was not specified on API level so we will populate it from policy
apiLimitFromPolicy := &user.APILimit{
QuotaMax: -1,
SetByPolicy: true,
}
if policy.Partitions.Quota {
apiLimitFromPolicy.QuotaMax = policy.QuotaMax
apiLimitFromPolicy.QuotaRenewalRate = policy.QuotaRenewalRate
}
if policy.Partitions.RateLimit {
apiLimitFromPolicy.Rate = policy.Rate
apiLimitFromPolicy.Per = policy.Per
}
accessRights.Limit = apiLimitFromPolicy
policy.AccessRights[apiID] = accessRights
}
didQuota = true
// Quotas
session.QuotaMax = policy.QuotaMax
session.QuotaRenewalRate = policy.QuotaRenewalRate
}

if policy.Partitions.RateLimit {
if didRateLimit {
err := fmt.Errorf("cannot apply multiple rate limit policies")
log.Error(err)
return err
} else {
// legacy logic when you can specify quota or rate only in no more than one policy
if policy.Partitions.Quota {
if didQuota {
err := fmt.Errorf("cannot apply multiple quota policies")
log.Error(err)
return err
}
didQuota = true
// Quotas
session.QuotaMax = policy.QuotaMax
session.QuotaRenewalRate = policy.QuotaRenewalRate
}
didRateLimit = true
// Rate limting
session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
session.Rate = policy.Rate
session.Per = policy.Per
if policy.LastUpdated != "" {
session.LastUpdated = policy.LastUpdated

if policy.Partitions.RateLimit {
if didRateLimit {
err := fmt.Errorf("cannot apply multiple rate limit policies")
log.Error(err)
return err
}
didRateLimit = true
// Rate limiting
session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
session.Rate = policy.Rate
session.Per = policy.Per
if policy.LastUpdated != "" {
session.LastUpdated = policy.LastUpdated
}
}
}

Expand Down Expand Up @@ -278,7 +315,7 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er
session.QuotaMax = policy.QuotaMax
session.QuotaRenewalRate = policy.QuotaRenewalRate

// Rate limting
// Rate limiting
session.Allowance = policy.Rate // This is a legacy thing, merely to make sure output is consistent. Needs to be purged
session.Rate = policy.Rate
session.Per = policy.Per
Expand Down
1 change: 1 addition & 0 deletions mw_api_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (k *RateLimitForAPI) ProcessRequest(w http.ResponseWriter, r *http.Request,
true,
false,
&k.Spec.GlobalConfig,
k.Spec.APIID,
)

if reason == sessionFailRateLimit {
Expand Down
2 changes: 2 additions & 0 deletions mw_organisation_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (k *OrganizationMonitor) ProcessRequestLive(r *http.Request, orgSession use
orgSession.Per > 0 && orgSession.Rate > 0,
true,
&k.Spec.GlobalConfig,
k.Spec.APIID,
)

sessionLifeTime := orgSession.Lifetime(k.Spec.SessionLifetime)
Expand Down Expand Up @@ -235,6 +236,7 @@ func (k *OrganizationMonitor) AllowAccessNext(
session.Per > 0 && session.Rate > 0,
true,
&k.Spec.GlobalConfig,
k.Spec.APIID,
)

sessionLifeTime := session.Lifetime(k.Spec.SessionLifetime)
Expand Down
1 change: 1 addition & 0 deletions mw_rate_limiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (k *RateLimitAndQuotaCheck) ProcessRequest(w http.ResponseWriter, r *http.R
!k.Spec.DisableRateLimit,
!k.Spec.DisableQuota,
&k.Spec.GlobalConfig,
k.Spec.APIID,
)

switch reason {
Expand Down
4 changes: 3 additions & 1 deletion policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type DBAccessDefinition struct {
APIName string `json:"apiname"`
APIID string `json:"apiid"`
Versions []string `json:"versions"`
AllowedURLs []user.AccessSpec `bson:"allowed_urls" json:"allowed_urls"` // mapped string MUST be a valid regex
AllowedURLs []user.AccessSpec `bson:"allowed_urls" json:"allowed_urls"` // mapped string MUST be a valid regex
Limit *user.APILimit `json:"limit"`
}

func (d *DBAccessDefinition) ToRegularAD() user.AccessDefinition {
Expand All @@ -26,6 +27,7 @@ func (d *DBAccessDefinition) ToRegularAD() user.AccessDefinition {
APIID: d.APIID,
Versions: d.Versions,
AllowedURLs: d.AllowedURLs,
Limit: d.Limit,
}
}

Expand Down
137 changes: 110 additions & 27 deletions session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,26 @@ type SessionLimiter struct {
bucketStore leakybucket.Storage
}

func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *user.SessionState, store storage.Handler, globalConf *config.Config) bool {
func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string,
currentSession *user.SessionState,
store storage.Handler,
globalConf *config.Config,
apiLimit *user.APILimit) bool {

var per, rate float64

if apiLimit != nil { // respect limit on API level
per = apiLimit.Per
rate = apiLimit.Rate
} else {
per = currentSession.Per
rate = currentSession.Rate
}

log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
pipeline := globalConf.EnableNonTransactionalRateLimiter
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), "-1", pipeline)
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(per), "-1", pipeline)

//log.Info("Num Requests: ", ratePerPeriodNow)

Expand All @@ -51,10 +66,10 @@ func (l *SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSe
// The test TestRateLimitForAPIAndRateLimitAndQuotaCheck
// will only work with ththese two lines here
//log.Info("break: ", (int(currentSession.Rate) - subtractor))
if ratePerPeriodNow > int(currentSession.Rate)-subtractor {
if ratePerPeriodNow > int(rate)-subtractor {
// Set a sentinel value with expire
if globalConf.EnableSentinelRateLImiter {
store.SetRawKey(rateLimiterSentinelKey, "1", int64(currentSession.Per))
store.SetRawKey(rateLimiterSentinelKey, "1", int64(per))
}
return true
}
Expand All @@ -74,13 +89,28 @@ const (
// sessionFailReason if session limits have been exceeded.
// Key values to manage rate are Rate and Per, e.g. Rate of 10 messages
// Per 10 seconds
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config) sessionFailReason {
func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, enableRL, enableQ bool, globalConf *config.Config, apiID string) sessionFailReason {
if enableRL {
// check for limit on API level (set to session by ApplyPolicies)
var apiLimit *user.APILimit
if len(currentSession.AccessRights) > 0 {
if rights, ok := currentSession.AccessRights[apiID]; !ok {
log.WithField("apiID", apiID).Debug("[RATE] unexpected apiID")
return sessionFailRateLimit
} else {
apiLimit = rights.Limit
}
}

if globalConf.EnableSentinelRateLImiter {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf)
go l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit)

// Check sentinel
_, sentinelActive := store.GetRawKey(rateLimiterSentinelKey)
Expand All @@ -91,8 +121,12 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se
} else if globalConf.EnableRedisRollingLimiter {
rateLimiterKey := RateLimitKeyPrefix + currentSession.KeyHash()
rateLimiterSentinelKey := RateLimitKeyPrefix + currentSession.KeyHash() + ".BLOCKED"
if apiLimit != nil {
rateLimiterKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash()
rateLimiterSentinelKey = RateLimitKeyPrefix + apiID + "-" + currentSession.KeyHash() + ".BLOCKED"
}

if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf) {
if l.doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey, currentSession, store, globalConf, apiLimit) {
return sessionFailRateLimit
}
} else {
Expand All @@ -101,19 +135,30 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se
l.bucketStore = memorycache.New()
}

// If a token has been updated, we must ensure we dont use
// If a token has been updated, we must ensure we don't use
// an old bucket an let the cache deal with it
bucketKey := key + ":" + currentSession.LastUpdated
bucketKey := ""
var currRate float64
var per float64
if apiLimit == nil {
bucketKey = key + ":" + currentSession.LastUpdated
currRate = currentSession.Rate
per = currentSession.Per
} else { // respect limit on API level
bucketKey = apiID + ":" + key + ":" + currentSession.LastUpdated
currRate = apiLimit.Rate
per = apiLimit.Per
}

// DRL will always overflow with more servers on low rates
rate := uint(currentSession.Rate * float64(DRLManager.RequestTokenValue))
rate := uint(currRate * float64(DRLManager.RequestTokenValue))
if rate < uint(DRLManager.CurrentTokenValue) {
rate = uint(DRLManager.CurrentTokenValue)
}

userBucket, err := l.bucketStore.Create(bucketKey,
rate,
time.Duration(currentSession.Per)*time.Second)
time.Duration(per)*time.Second)
if err != nil {
log.Error("Failed to create bucket!")
return sessionFailRateLimit
Expand All @@ -132,7 +177,7 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se
currentSession.Allowance--
}

if l.RedisQuotaExceeded(r, currentSession, key, store) {
if l.RedisQuotaExceeded(r, currentSession, key, store, apiID) {
return sessionFailQuota
}
}
Expand All @@ -141,26 +186,57 @@ func (l *SessionLimiter) ForwardMessage(r *http.Request, currentSession *user.Se

}

func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler) bool {
func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *user.SessionState, key string, store storage.Handler, apiID string) bool {
log.Debug("[QUOTA] Inbound raw key is: ", key)

// check for limit on API level (set to session by ApplyPolicies)
var apiLimit *user.APILimit
if len(currentSession.AccessRights) > 0 {
if rights, ok := currentSession.AccessRights[apiID]; !ok {
log.WithField("apiID", apiID).Debug("[QUOTA] unexpected apiID")
return false
} else {
apiLimit = rights.Limit
}
}

// Are they unlimited?
if currentSession.QuotaMax == -1 {
if apiLimit == nil {
if currentSession.QuotaMax == -1 {
// No quota set
return false
}
} else if apiLimit.QuotaMax == -1 {
// No quota set
return false
}

// Create the key
log.Debug("[QUOTA] Inbound raw key is: ", key)
rawKey := QuotaKeyPrefix + currentSession.KeyHash()
rawKey := ""
var quotaRenewalRate int64
var quotaRenews int64
var quotaMax int64
if apiLimit == nil {
rawKey = QuotaKeyPrefix + currentSession.KeyHash()
quotaRenewalRate = currentSession.QuotaRenewalRate
quotaRenews = currentSession.QuotaRenews
quotaMax = currentSession.QuotaMax
} else {
rawKey = QuotaKeyPrefix + apiID + "-" + currentSession.KeyHash()
quotaRenewalRate = apiLimit.QuotaRenewalRate
quotaRenews = apiLimit.QuotaRenews
quotaMax = apiLimit.QuotaMax
}

log.Debug("[QUOTA] Quota limiter key is: ", rawKey)
log.Debug("Renewing with TTL: ", currentSession.QuotaRenewalRate)
log.Debug("Renewing with TTL: ", quotaRenewalRate)
// INCR the key (If it equals 1 - set EXPIRE)
qInt := store.IncrememntWithExpire(rawKey, currentSession.QuotaRenewalRate)
qInt := store.IncrememntWithExpire(rawKey, quotaRenewalRate)

// if the returned val is >= quota: block
if qInt-1 >= currentSession.QuotaMax {
renewalDate := time.Unix(currentSession.QuotaRenews, 0)
if qInt-1 >= quotaMax {
renewalDate := time.Unix(quotaRenews, 0)
log.Debug("Renewal Date is: ", renewalDate)
log.Debug("As epoch: ", currentSession.QuotaRenews)
log.Debug("As epoch: ", quotaRenews)
log.Debug("Session: ", currentSession)
log.Debug("Now:", time.Now())
if time.Now().After(renewalDate) {
Expand All @@ -179,17 +255,24 @@ func (l *SessionLimiter) RedisQuotaExceeded(r *http.Request, currentSession *use
// If this is a new Quota period, ensure we let the end user know
if qInt == 1 {
current := time.Now().Unix()
currentSession.QuotaRenews = current + currentSession.QuotaRenewalRate
if apiLimit == nil {
currentSession.QuotaRenews = current + quotaRenewalRate
} else {
apiLimit.QuotaRenews = current + quotaRenewalRate
}
ctxScheduleSessionUpdate(r)
}

// If not, pass and set the values of the session to quotamax - counter
remaining := currentSession.QuotaMax - qInt

remaining := quotaMax - qInt
if remaining < 0 {
currentSession.QuotaRemaining = 0
} else {
remaining = 0
}

if apiLimit == nil {
currentSession.QuotaRemaining = remaining
} else {
apiLimit.QuotaRemaining = remaining
}

return false
Expand Down
1 change: 1 addition & 0 deletions user/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ type PolicyPartitions struct {
Quota bool `bson:"quota" json:"quota"`
RateLimit bool `bson:"rate_limit" json:"rate_limit"`
Acl bool `bson:"acl" json:"acl"`
PerAPI bool `bson:"per_api" json:"per_api"`
}
Loading

0 comments on commit 5c3df87

Please sign in to comment.