Skip to content

Commit

Permalink
Polish data-source helper for rules and rename package of freq_params…
Browse files Browse the repository at this point in the history
…_traffic module (alibaba#157)

* package rename: freq_params_traffic -> hotspot
* Also polish code of rule managers
  • Loading branch information
louyuting authored and gorexlv committed Jun 18, 2020
1 parent 79298a6 commit 63445e2
Show file tree
Hide file tree
Showing 32 changed files with 814 additions and 311 deletions.
6 changes: 3 additions & 3 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/freq_params_traffic"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -31,10 +31,10 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.FlowSlot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.CircuitBreakerSlot{})
sc.AddRuleCheckSlotLast(&freq_params_traffic.FreqPramsTrafficSlot{})
sc.AddRuleCheckSlotLast(&hotspot.FreqPramsTrafficSlot{})
sc.AddStatSlotLast(&stat.StatisticSlot{})
sc.AddStatSlotLast(&log.LogSlot{})
sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{})
sc.AddStatSlotLast(&freq_params_traffic.ConcurrencyStatSlot{})
sc.AddStatSlotLast(&hotspot.ConcurrencyStatSlot{})
return sc
}
8 changes: 4 additions & 4 deletions core/circuitbreaker/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (b *RuleBase) IsStatReusable(r Rule) bool {

func (b *RuleBase) String() string {
// fallback string
return fmt.Sprintf("RuleBase{id=%s,resource=%s, strategy=%+v, RetryTimeoutMs=%d, MinRequestAmount=%d}",
return fmt.Sprintf("{id=%s,resource=%s, strategy=%+v, RetryTimeoutMs=%d, MinRequestAmount=%d}",
b.Id, b.Resource, b.Strategy, b.RetryTimeoutMs, b.MinRequestAmount)
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func (r *slowRtRule) IsApplicable() error {
}

func (r *slowRtRule) String() string {
return fmt.Sprintf("slowRtRule{RuleBase:%s, MaxAllowedRt=%d, MaxSlowRequestRatio=%f}", r.RuleBase.String(), r.MaxAllowedRt, r.MaxSlowRequestRatio)
return fmt.Sprintf("{slowRtRule{RuleBase:%s, MaxAllowedRt=%d, MaxSlowRequestRatio=%f}", r.RuleBase.String(), r.MaxAllowedRt, r.MaxSlowRequestRatio)
}

// Error ratio circuit breaker rule
Expand All @@ -164,7 +164,7 @@ func NewErrorRatioRule(resource string, intervalMs uint32, retryTimeoutMs uint32
}

func (r *errorRatioRule) String() string {
return fmt.Sprintf("errorRatioRule{RuleBase:%s, Threshold=%f}", r.RuleBase.String(), r.Threshold)
return fmt.Sprintf("{errorRatioRule{RuleBase:%s, Threshold=%f}", r.RuleBase.String(), r.Threshold)
}

func (r *errorRatioRule) IsEqualsTo(newRule Rule) bool {
Expand Down Expand Up @@ -207,7 +207,7 @@ func NewErrorCountRule(resource string, intervalMs uint32, retryTimeoutMs uint32
}

func (r *errorCountRule) String() string {
return fmt.Sprintf("errorCountRule{RuleBase:%s, Threshold=%d}", r.RuleBase.String(), r.Threshold)
return fmt.Sprintf("{errorCountRule{RuleBase:%s, Threshold=%d}", r.RuleBase.String(), r.Threshold)
}

func (r *errorCountRule) IsEqualsTo(newRule Rule) bool {
Expand Down
98 changes: 56 additions & 42 deletions core/circuitbreaker/rule_manager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package circuitbreaker

import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/alibaba/sentinel-golang/logging"
Expand Down Expand Up @@ -82,6 +82,12 @@ func GetResRules(resource string) []Rule {
return ret
}

// ClearRules clear all the previous rules
func ClearRules() error {
_, err := LoadRules(nil)
return err
}

// LoadRules replaces old rules with the given circuit breaking rules.
//
// return value:
Expand All @@ -106,6 +112,42 @@ func getResBreakers(resource string) []CircuitBreaker {
return ret
}

func calculateReuseIndexFor(r Rule, oldResCbs []CircuitBreaker) (equalIdx, reuseStatIdx int) {
// the index of equivalent rule in old circuit breaker slice
equalIdx = -1
// the index of statistic reusable rule in old circuit breaker slice
reuseStatIdx = -1

for idx, oldTc := range oldResCbs {
oldRule := oldTc.BoundRule()
if oldRule.IsEqualsTo(r) {
// break if there is equivalent rule
equalIdx = idx
break
}
// find the index of first StatReusable rule
if !oldRule.IsStatReusable(r) {
continue
}
if reuseStatIdx >= 0 {
// had find reuse rule.
continue
}
reuseStatIdx = idx
}
return equalIdx, reuseStatIdx
}

func insertCbToCbMap(cb CircuitBreaker, res string, m map[string][]CircuitBreaker) {
cbsOfRes, exists := m[res]
if !exists {
cbsOfRes = make([]CircuitBreaker, 0, 1)
m[res] = append(cbsOfRes, cb)
} else {
m[res] = append(cbsOfRes, cb)
}
}

// Concurrent safe to update rules
func onRuleUpdate(rules []Rule) (err error) {
defer func() {
Expand Down Expand Up @@ -145,43 +187,19 @@ func onRuleUpdate(rules []Rule) (err error) {
for res, resRules := range newBreakerRules {
emptyCircuitBreakerList := make([]CircuitBreaker, 0, 0)
for _, r := range resRules {
// TODO: rearrange the code here.
oldResCbs := breakers[res]
if oldResCbs == nil {
oldResCbs = emptyCircuitBreakerList
}
equalsIdx := -1
reuseStatIdx := -1
for idx, cb := range oldResCbs {
oldRule := cb.BoundRule()
if oldRule.IsEqualsTo(r) {
equalsIdx = idx
break
}
if !oldRule.IsStatReusable(r) {
continue
}
if reuseStatIdx >= 0 {
// had find reuse rule.
continue
}
reuseStatIdx = idx
}
equalIdx, reuseStatIdx := calculateReuseIndexFor(r, oldResCbs)

// First check equals scenario
if equalsIdx >= 0 {
if equalIdx >= 0 {
// reuse the old cb
reuseOldCb := oldResCbs[equalsIdx]
cbsOfRes, ok := newBreakers[res]
if !ok {
cbsOfRes = make([]CircuitBreaker, 0, 1)
newBreakers[res] = append(cbsOfRes, reuseOldCb)
} else {
newBreakers[res] = append(cbsOfRes, reuseOldCb)
}
equalOldCb := oldResCbs[equalIdx]
insertCbToCbMap(equalOldCb, res, newBreakers)
// remove old cb from oldResCbs
oldResCbs = append(oldResCbs[:equalsIdx], oldResCbs[equalsIdx+1:]...)
breakers[res] = oldResCbs
breakers[res] = append(oldResCbs[:equalIdx], oldResCbs[equalIdx+1:]...)
continue
}

Expand All @@ -205,13 +223,7 @@ func onRuleUpdate(rules []Rule) (err error) {
if reuseStatIdx >= 0 {
breakers[res] = append(oldResCbs[:reuseStatIdx], oldResCbs[reuseStatIdx+1:]...)
}
cbsOfRes, ok := newBreakers[res]
if !ok {
cbsOfRes = make([]CircuitBreaker, 0, 1)
newBreakers[res] = append(cbsOfRes, cb)
} else {
newBreakers[res] = append(cbsOfRes, cb)
}
insertCbToCbMap(cb, res, newBreakers)
}
}

Expand Down Expand Up @@ -241,12 +253,14 @@ func rulesFrom(rm map[string][]Rule) []Rule {
}

func logRuleUpdate(rules map[string][]Rule) {
s, err := json.Marshal(rules)
if err != nil {
logger.Info("Circuit breaking rules loaded")
} else {
logger.Infof("Circuit breaking rules loaded: %s", s)
sb := strings.Builder{}
sb.WriteString("Circuit breaking rules loaded:[")

for _, r := range rulesFrom(rules) {
sb.WriteString(r.String() + ",")
}
sb.WriteString("]")
logger.Info(sb.String())
}

func RegisterStateChangeListeners(listeners ...StateChangeListener) {
Expand Down
24 changes: 12 additions & 12 deletions core/flow/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,24 @@ type ClusterRuleConfig struct {
// FlowRule describes the strategy of flow control.
type FlowRule struct {
// ID represents the unique ID of the rule (optional).
ID uint64 `json:"id,omitempty"`
ID uint64

// Resource represents the resource name.
Resource string `json:"resource"`
Resource string
// LimitOrigin represents the target origin (reserved field).
LimitOrigin string `json:"limitApp"`
MetricType MetricType `json:"grade"`
LimitOrigin string
MetricType MetricType
// Count represents the threshold.
Count float64 `json:"count"`
RelationStrategy RelationStrategy `json:"strategy"`
ControlBehavior ControlBehavior `json:"controlBehavior"`
Count float64
RelationStrategy RelationStrategy
ControlBehavior ControlBehavior

RefResource string `json:"refResource,omitempty"`
WarmUpPeriodSec uint32 `json:"warmUpPeriodSec"`
MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`
RefResource string
WarmUpPeriodSec uint32
MaxQueueingTimeMs uint32
// ClusterMode indicates whether the rule is for cluster flow control or local.
ClusterMode bool `json:"clusterMode"`
ClusterConfig ClusterRuleConfig `json:"clusterConfig"`
ClusterMode bool
ClusterConfig ClusterRuleConfig
}

func (f *FlowRule) String() string {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package freq_params_traffic
package hotspot

import (
"github.com/alibaba/sentinel-golang/core/base"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package freq_params_traffic
package hotspot

import "github.com/alibaba/sentinel-golang/core/freq_params_traffic/cache"
import "github.com/alibaba/sentinel-golang/core/hotspot/cache"

const (
ConcurrencyMaxCount = 4000
Expand Down
20 changes: 10 additions & 10 deletions core/freq_params_traffic/rule.go → core/hotspot/rule.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package freq_params_traffic
package hotspot

import (
"fmt"
Expand Down Expand Up @@ -50,19 +50,19 @@ func (t MetricType) String() string {
type ParamKind int

const (
kindInt ParamKind = iota
kindString
KindInt ParamKind = iota
KindString
KindBool
KindFloat64
KindSum
)

func (t ParamKind) String() string {
switch t {
case kindInt:
return "kindInt"
case kindString:
return "kindString"
case KindInt:
return "KindInt"
case KindString:
return "KindString"
case KindBool:
return "KindBool"
case KindFloat64:
Expand Down Expand Up @@ -102,7 +102,7 @@ type Rule struct {
}

func (r *Rule) String() string {
return fmt.Sprintf("{Id:%s, Resource:%s, MetricType:%+v, Behavior:%+v, ParamIndex:%d, Threshold:%f, MaxQueueingTimeMs:%d, BurstCount:%d, DurationInSec:%d, ParamsMaxCapacity:%d, SpecificItems:%+v},",
return fmt.Sprintf("{Id:%s, Resource:%s, MetricType:%+v, Behavior:%+v, ParamIndex:%d, Threshold:%f, MaxQueueingTimeMs:%d, BurstCount:%d, DurationInSec:%d, ParamsMaxCapacity:%d, SpecificItems:%+v}",
r.Id, r.Resource, r.MetricType, r.Behavior, r.ParamIndex, r.Threshold, r.MaxQueueingTimeMs, r.BurstCount, r.DurationInSec, r.ParamsMaxCapacity, r.SpecificItems)
}
func (r *Rule) ResourceName() string {
Expand Down Expand Up @@ -137,15 +137,15 @@ func parseSpecificItems(source map[SpecificValue]int64) map[interface{}]int64 {
}
for k, v := range source {
switch k.ValKind {
case kindInt:
case KindInt:
realVal, err := strconv.Atoi(k.ValStr)
if err != nil {
logger.Errorf("Fail to parse value for int specific item. paramKind: %+v, value: %s, err: %+v", k.ValKind, k.ValStr, err)
continue
}
ret[realVal] = v

case kindString:
case KindString:
ret[k.ValStr] = v

case KindBool:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package freq_params_traffic
package hotspot

import (
"encoding/json"
"fmt"
"strings"
"sync"

"github.com/alibaba/sentinel-golang/logging"
Expand Down Expand Up @@ -70,12 +70,16 @@ func LoadRules(rules []*Rule) (bool, error) {
return true, err
}

// GetRules return the whole of rules
func GetRules() []*Rule {
// GetRules return the res's rules
func GetRules(res string) []*Rule {
tcMux.RLock()
defer tcMux.RUnlock()

return rulesFrom(tcMap)
resTcs := tcMap[res]
ret := make([]*Rule, 0, len(resTcs))
for _, tc := range resTcs {
ret = append(ret, tc.BoundRule())
}
return ret
}

// ClearRules clears all rules in frequency parameters flow control components
Expand Down Expand Up @@ -106,12 +110,14 @@ func onRuleUpdate(rules []*Rule) (err error) {
}

func logRuleUpdate(m trafficControllerMap) {
s, err := json.Marshal(m)
if err != nil {
logger.Info("Frequency parameters flow control rules loaded")
} else {
logger.Infof("Frequency parameters flow control rules loaded: %s", s)
sb := strings.Builder{}
sb.WriteString("Frequency parameters flow control rules loaded:[")

for _, r := range rulesFrom(m) {
sb.WriteString(r.String() + ",")
}
sb.WriteString("]")
logger.Info(sb.String())
}

func rulesFrom(m trafficControllerMap) []*Rule {
Expand Down
Loading

0 comments on commit 63445e2

Please sign in to comment.