From 5d192bfc0cdcbfeb607d9d137634a413e54091c2 Mon Sep 17 00:00:00 2001 From: ytlou Date: Sun, 16 Jun 2019 21:11:20 +0800 Subject: [PATCH 01/12] Issues 1: init project and implement raw single-node local flow control --- .gitignore | 1 + core/node/metric_node.go | 11 + core/node/node.go | 34 +++ core/property/property.go | 8 + core/public/entry.go | 14 + core/slots/base/base.go | 25 ++ core/slots/base/default_node.go | 93 ++++++ core/slots/chain/slot.go | 14 + core/slots/chain/slot_chain.go | 90 ++++++ core/slots/flow/flow_control.go | 38 +++ core/slots/flow/flow_rule.go | 210 ++++++++++++++ core/slots/flow/flow_slot.go | 64 +++++ core/slots/statistic/data/leap_array.go | 265 ++++++++++++++++++ core/slots/statistic/data/metric_bucket.go | 93 ++++++ core/slots/statistic/data/unary_leap_array.go | 15 + core/slots/statistic/statistic_slot.go | 25 ++ core/sphu.go | 43 +++ example/main.go | 31 ++ go.mod | 3 + 19 files changed, 1077 insertions(+) create mode 100644 .gitignore create mode 100644 core/node/metric_node.go create mode 100644 core/node/node.go create mode 100644 core/property/property.go create mode 100644 core/public/entry.go create mode 100644 core/slots/base/base.go create mode 100644 core/slots/base/default_node.go create mode 100644 core/slots/chain/slot.go create mode 100644 core/slots/chain/slot_chain.go create mode 100644 core/slots/flow/flow_control.go create mode 100644 core/slots/flow/flow_rule.go create mode 100644 core/slots/flow/flow_slot.go create mode 100644 core/slots/statistic/data/leap_array.go create mode 100644 core/slots/statistic/data/metric_bucket.go create mode 100644 core/slots/statistic/data/unary_leap_array.go create mode 100644 core/slots/statistic/statistic_slot.go create mode 100644 core/sphu.go create mode 100644 example/main.go create mode 100644 go.mod diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..62c893550 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ \ No newline at end of file diff --git a/core/node/metric_node.go b/core/node/metric_node.go new file mode 100644 index 000000000..aa76b8fb7 --- /dev/null +++ b/core/node/metric_node.go @@ -0,0 +1,11 @@ +package node + +type MetricNode struct { + Timestamp uint64 + PassQps uint64 + BlockQps uint64 + SuccessQps uint64 + ErrorQps uint64 + Rt uint64 + Resource string +} diff --git a/core/node/node.go b/core/node/node.go new file mode 100644 index 000000000..d55c27741 --- /dev/null +++ b/core/node/node.go @@ -0,0 +1,34 @@ +package node + +type Node interface { + //TotalRequest() uint64 + //TotalPass() uint64 + TotalSuccess() uint64 + //BlockRequest() uint64 + //TotalError() uint64 + //PassQps() uint64 + //BlockQps() uint64 + //TotalQps() uint64 + //SuccessQps() uint64 + //MaxSuccessQps() uint64 + //ErrorQps() uint64 + //AvgRt() float32 + //MinRt() float32 + //CurGoroutineNum() uint64 + + //PreviousBlockQps() uint64 + //PreviousPassQps() uint64 + // + //Metrics() map[uint64]*metrics.MetricNode + + AddPassRequest(count uint32) + //AddRtAndSuccess(rt uint64, success uint32) + // + //IncreaseBlockQps(count uint32) + //IncreaseErrorQps(count uint32) + // + //IncreaseGoroutineNum() + //DecreaseGoroutineNum() + + //Reset() +} diff --git a/core/property/property.go b/core/property/property.go new file mode 100644 index 000000000..9079f3b30 --- /dev/null +++ b/core/property/property.go @@ -0,0 +1,8 @@ +package property + +//PropertyListener +type PropertyListener interface { + ConfigUpdate(value interface{}) + + ConfigLoad(value interface{}) +} diff --git a/core/public/entry.go b/core/public/entry.go new file mode 100644 index 000000000..c583962f5 --- /dev/null +++ b/core/public/entry.go @@ -0,0 +1,14 @@ +package public + +import ( + "github.com/sentinel-group/sentinel-golang/core/node" + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type Entry struct { + createTime uint64 + originNode node.Node + currentNode node.Node + resourceWrap *base.ResourceWrapper + err error +} diff --git a/core/slots/base/base.go b/core/slots/base/base.go new file mode 100644 index 000000000..c1cf6095c --- /dev/null +++ b/core/slots/base/base.go @@ -0,0 +1,25 @@ +package base + +const ( + INBOUND = iota + OUTBOUND +) + +type ResourceWrapper struct { + // unique resource name + ResourceName string + // + ResourceType int +} + +type SlotResultStatus int8 + +const ( + ResultStatusOk = iota + ResultStatusBlocked +) + +type SlotResult struct { + Status SlotResultStatus + BlockedReason string +} diff --git a/core/slots/base/default_node.go b/core/slots/base/default_node.go new file mode 100644 index 000000000..6c79e91fd --- /dev/null +++ b/core/slots/base/default_node.go @@ -0,0 +1,93 @@ +package base + +import ( + "github.com/sentinel-group/sentinel-golang/core/node" + "github.com/sentinel-group/sentinel-golang/core/slots/statistic/data" + "sync/atomic" + "time" +) + +const ( + windowLengthImMs_ uint32 = 200 + sampleCount_ uint32 = 5 + intervalInMs_ uint32 = 1000 +) + +type DefaultNode struct { + rollingCounterInSecond *data.SlidingWindow + rollingCounterInMinute *data.SlidingWindow + currentGoroutineNum uint32 + lastFetchTime uint64 + resourceWrapper *ResourceWrapper +} + +func NewDefaultNode(wrapper *ResourceWrapper) *DefaultNode { + return &DefaultNode{ + rollingCounterInSecond: data.NewSlidingWindow(), + rollingCounterInMinute: data.NewSlidingWindow(), + currentGoroutineNum: 0, + lastFetchTime: uint64(time.Now().Nanosecond() / (1e6)), + resourceWrapper: wrapper, + } +} + +func (dn *DefaultNode) AddPass(count uint64) { + dn.rollingCounterInSecond.AddCount(data.MetricEventPass, count) +} + +func (dn *DefaultNode) AddGoroutineNum(count uint32) { + atomic.AddUint32(&dn.currentGoroutineNum, count) +} + +func (dn *DefaultNode) TotalRequest() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventPass) + dn.rollingCounterInSecond.Count(data.MetricEventBlock) +} +func (dn *DefaultNode) TotalPass() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventPass) +} +func (dn *DefaultNode) TotalSuccess() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventSuccess) +} +func (dn *DefaultNode) BlockRequest() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventBlock) +} +func (dn *DefaultNode) TotalError() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventError) +} +func (dn *DefaultNode) PassQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventPass) / uint64(intervalInMs_) +} +func (dn *DefaultNode) BlockQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventBlock) / uint64(intervalInMs_) +} +func (dn *DefaultNode) TotalQps() uint64 { + return dn.PassQps() + dn.BlockQps() +} +func (dn *DefaultNode) SuccessQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventSuccess) / uint64(intervalInMs_) +} +func (dn *DefaultNode) MaxSuccessQps() uint64 { + return dn.rollingCounterInSecond.MaxSuccess() * uint64(sampleCount_) +} +func (dn *DefaultNode) ErrorQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventError) / uint64(intervalInMs_) +} + +func (dn *DefaultNode) AvgRt() float32 { + return 0 +} +func (dn *DefaultNode) MinRt() float32 { + return 0 +} +func (dn *DefaultNode) CurGoroutineNum() uint64 { + return 0 +} +func (dn *DefaultNode) PreviousBlockQps() uint64 { + return 0 +} +func (dn *DefaultNode) PreviousPassQps() uint64 { + return 0 +} +func (dn *DefaultNode) Metrics() map[uint64]*node.MetricNode { + return nil +} diff --git a/core/slots/chain/slot.go b/core/slots/chain/slot.go new file mode 100644 index 000000000..37b811cb6 --- /dev/null +++ b/core/slots/chain/slot.go @@ -0,0 +1,14 @@ +package chain + +import ( + "context" + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type Slot interface { + IsContinue(lastResult base.SlotResult, ctx context.Context) bool + + Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult + + Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) +} diff --git a/core/slots/chain/slot_chain.go b/core/slots/chain/slot_chain.go new file mode 100644 index 000000000..5eb7c4738 --- /dev/null +++ b/core/slots/chain/slot_chain.go @@ -0,0 +1,90 @@ +package chain + +import ( + "container/list" + "context" + "fmt" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/flow" + "github.com/sentinel-group/sentinel-golang/core/slots/statistic" +) + +type SlotChain interface { + Slot + AddFirst(slot Slot) + AddLast(slot Slot) +} + +type DefaultSlotChain struct { + slots *list.List +} + +type SlotChainBuilder interface { + Build() SlotChain +} + +func NewDefaultSlotChain() *DefaultSlotChain { + defaultSlotChain := &DefaultSlotChain{ + slots: list.New(), + } + defaultSlotChain.AddLast(&flow.FlowSlot{ + RuleManager: flow.NewRuleManager(), + }) + defaultSlotChain.AddLast(&statistic.StatisticSlot{}) + return defaultSlotChain +} + +func (dsc *DefaultSlotChain) AddFirst(slot Slot) { + dsc.slots.PushFront(slot) +} + +func (dsc *DefaultSlotChain) AddLast(slot Slot) { + dsc.slots.PushBack(slot) +} + +func (dsc *DefaultSlotChain) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { + return true +} + +func (dsc *DefaultSlotChain) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { + slotResult := base.SlotResult{ + Status: base.ResultStatusOk, + } + for e := dsc.slots.Front(); e != nil; e = e.Next() { + slot := e.Value + switch slot_ := slot.(type) { + case *flow.FlowSlot: + if slot_.IsContinue(slotResult, ctx) { + slotResult = slot_.Entry(ctx, resourceWrap, node, count) + } + break + case *statistic.StatisticSlot: + if slot_.IsContinue(slotResult, ctx) { + slotResult = slot_.Entry(ctx, resourceWrap, node, count) + } + break + default: + slotResult = base.SlotResult{ + Status: base.ResultStatusBlocked, + BlockedReason: "Unknown Slot", + } + } + } + return slotResult +} + +func (dsc *DefaultSlotChain) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { + for e := dsc.slots.Front(); e != nil; e = e.Next() { + slot := e.Value + switch slot_ := slot.(type) { + case *flow.FlowSlot: + slot_.Exit(ctx, resourceWrap, count) + break + case *statistic.StatisticSlot: + slot_.Exit(ctx, resourceWrap, count) + break + default: + fmt.Println("DefaultSlotChain Exit error!") + } + } +} diff --git a/core/slots/flow/flow_control.go b/core/slots/flow/flow_control.go new file mode 100644 index 000000000..42d7e0a02 --- /dev/null +++ b/core/slots/flow/flow_control.go @@ -0,0 +1,38 @@ +package flow + +import ( + "context" + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type TrafficShapingController interface { + CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool +} + +type WarmUpController struct { +} + +func (wpc WarmUpController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} + +type RateLimiterController struct { +} + +func (wpc RateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} + +type WarmUpRateLimiterController struct { +} + +func (wpc WarmUpRateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} + +type DefaultController struct { +} + +func (wpc DefaultController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} diff --git a/core/slots/flow/flow_rule.go b/core/slots/flow/flow_rule.go new file mode 100644 index 000000000..e1b4add7c --- /dev/null +++ b/core/slots/flow/flow_rule.go @@ -0,0 +1,210 @@ +package flow + +import ( + "context" + "fmt" + "github.com/sentinel-group/sentinel-golang/core/node" +) + +const ( + LimitAppDefault = "default" + LIMIT_APP_OTHER = "other" + ResourceNameDefault = "default" +) + +type StrategyType int8 + +const ( + StrategyDirect StrategyType = iota + StrategyRelate + StrategyChain +) + +type ControlBehaviorType int8 + +const ( + ControlBehaviorDefault ControlBehaviorType = iota + ControlBehaviorWarmUp + ControlBehaviorRateLimiter + ControlBehaviorWarmUpRateLimiter +) + +type FlowGradeType int8 + +const ( + FlowGradeThread FlowGradeType = iota + FlowGradeQps +) + +type RuleChecker interface { + passCheck(ctx context.Context, node *node.Node, count int) bool +} + +type rule struct { + resource_ string + limitApp_ string + grade_ FlowGradeType + // Flow control threshold count. + count_ uint64 + strategy_ StrategyType + refResource_ string + controlBehavior_ ControlBehaviorType + warmUpPeriodSec_ int32 + /** + * Max queueing time in rate limiter behavior. + */ + maxQueueingTimeMs_ int32 + controller_ TrafficShapingController +} + +func (r *rule) PassCheck(ctx context.Context, node node.Node, count int) bool { + + return true +} + +func (r *rule) validate() error { + return nil +} + +func newRuleBuilder() *rule { + return &rule{ + resource_: ResourceNameDefault, + limitApp_: LimitAppDefault, + grade_: FlowGradeQps, + count_: 100, + strategy_: StrategyDirect, + refResource_: "", + controlBehavior_: ControlBehaviorRateLimiter, + warmUpPeriodSec_: 10, + maxQueueingTimeMs_: 500, + } +} + +func (r *rule) resource(resource string) *rule { + r.resource_ = resource + return r +} +func (r *rule) limitApp(limitApp string) *rule { + r.limitApp_ = limitApp + return r +} +func (r *rule) grade(grade_ FlowGradeType) *rule { + r.grade_ = grade_ + return r +} +func (r *rule) count(count uint64) *rule { + r.count_ = count + return r +} +func (r *rule) strategy(strategy StrategyType) *rule { + r.strategy_ = strategy + return r +} +func (r *rule) refResource(refResource string) *rule { + r.refResource_ = refResource + return r +} +func (r *rule) controlBehavior(controlBehavior ControlBehaviorType) *rule { + r.controlBehavior_ = controlBehavior + return r +} +func (r *rule) warmUpPeriodSec(warmUpPeriodSec int32) *rule { + r.warmUpPeriodSec_ = warmUpPeriodSec + return r +} +func (r *rule) maxQueueingTimeMs(maxQueueingTimeMs int32) *rule { + r.maxQueueingTimeMs_ = maxQueueingTimeMs + return r +} +func (r *rule) controller(controller TrafficShapingController) *rule { + r.controller_ = controller + return r +} + +type RuleManager struct { + flowRules map[string][]*rule +} + +func NewRuleManager() *RuleManager { + return &RuleManager{ + flowRules: nil, + } +} + +func LoadRules(rm *RuleManager, rules []*rule) { + if len(rules) == 0 { + println("RuleManager| load empty rule ") + return + } + rm.flowRules = buildFlowRuleMap(rules) +} + +// Get a copy of the rules. +func (rm *RuleManager) getAllRule() []*rule { + + ret := make([]*rule, 0) + for _, value := range rm.flowRules { + ret = append(ret, value...) + } + return ret +} + +func (rm *RuleManager) getRuleMap() map[string][]*rule { + return rm.flowRules +} + +func (rm *RuleManager) getRuleBySource(resource string) []*rule { + rules := rm.flowRules[resource] + if len(rules) == 0 { + return nil + } + return rules +} + +func buildFlowRuleMap(rules []*rule) map[string][]*rule { + ret := make(map[string][]*rule) + + for _, r := range rules { + err := r.validate() + if err != nil { + fmt.Printf("validate rule fail, the reason is %s ", err.Error()) + continue + } + r.controller_ = generateFlowControl(r) + srcName := r.resource_ + var slc []*rule = ret[srcName] + if slc == nil { + slc = make([]*rule, 0) + } + slc = append(slc, r) + ret[srcName] = slc + } + return ret +} + +func generateFlowControl(r *rule) TrafficShapingController { + if r.grade_ == FlowGradeQps { + switch r.controlBehavior_ { + case ControlBehaviorWarmUp: + return WarmUpController{} + case ControlBehaviorRateLimiter: + return RateLimiterController{} + case ControlBehaviorWarmUpRateLimiter: + return WarmUpRateLimiterController{} + default: + } + } + return DefaultController{} +} + +// +//type FlowPropertyListener struct { +//} +// +//func (fpl *FlowPropertyListener) ConfigUpdate(value interface{}) { +// +//} +//func (fpl *FlowPropertyListener) ConfigLoad(value interface{}) { +// +//} +// diff --git a/core/slots/flow/flow_slot.go b/core/slots/flow/flow_slot.go new file mode 100644 index 000000000..2b1acadc5 --- /dev/null +++ b/core/slots/flow/flow_slot.go @@ -0,0 +1,64 @@ +package flow + +import ( + "context" + "fmt" + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type FlowSlot struct { + RuleManager *RuleManager +} + +func (fs *FlowSlot) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { + + if lastResult.Status == base.ResultStatusOk { + return true + } + return false +} + +func (fs *FlowSlot) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { + fmt.Println("flowSlot request number is ", node.TotalRequest()) + if fs.RuleManager == nil { + return base.SlotResult{ + Status: base.ResultStatusOk, + } + } + rules := fs.RuleManager.getRuleBySource(resourceWrap.ResourceName) + if len(rules) == 0 { + return base.SlotResult{ + Status: base.ResultStatusOk, + } + } + success := checkFlow(ctx, resourceWrap, rules, node, count) + if success { + return base.SlotResult{ + Status: base.ResultStatusOk, + } + } else { + return base.SlotResult{ + Status: base.ResultStatusBlocked, + } + } +} + +func (fs *FlowSlot) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { + +} + +func checkFlow(ctx context.Context, resourceWrap *base.ResourceWrapper, rules []*rule, node *base.DefaultNode, count uint32) bool { + if rules == nil { + return true + } + for _, rule := range rules { + if !canPass(ctx, resourceWrap, rule, node, count) { + return false + } + } + return true +} + +func canPass(ctx context.Context, resourceWrap *base.ResourceWrapper, rule *rule, node *base.DefaultNode, count uint32) bool { + return rule.controller_.CanPass(ctx, node, count) +} diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go new file mode 100644 index 000000000..8dbd00319 --- /dev/null +++ b/core/slots/statistic/data/leap_array.go @@ -0,0 +1,265 @@ +package data + +import ( + "errors" + "fmt" + "math" + "sync" + "time" +) + +type WindowWrap struct { + windowLengthInMs uint32 + WindowStart uint64 + Value interface{} +} + +func (ww *WindowWrap) resetTo(startTime uint64) { + ww.WindowStart = startTime +} + +func (ww *WindowWrap) isTimeInWindow(timeMillis uint64) bool { + return ww.WindowStart <= timeMillis && timeMillis < ww.WindowStart+uint64(ww.windowLengthInMs) +} + +type LeapArray struct { + windowLengthInMs uint32 + sampleCount uint32 + intervalInMs uint32 + array []*WindowWrap //实际保存的数据 + + mux sync.Mutex // lock +} + +func (la *LeapArray) CurrentWindow(sw BucketGenerator) (*WindowWrap, error) { + return la.CurrentWindowWithTime(uint64(time.Now().UnixNano())/1e6, sw) +} + +func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator) (*WindowWrap, error) { + if timeMillis < 0 { + return nil, errors.New("timeMillion is less than 0") + } + + idx := la.calculateTimeIdx(timeMillis) + windowStart := la.calculateStartTime(timeMillis) + + for { + old := la.array[idx] + if old == nil { + newWrap := &WindowWrap{ + windowLengthInMs: la.windowLengthInMs, + WindowStart: windowStart, + Value: sw.newEmptyBucket(windowStart), + } + la.mux.Lock() + la.array[idx] = newWrap + la.mux.Unlock() + return la.array[idx], nil + } else if windowStart == old.WindowStart { + return old, nil + } else if windowStart > old.WindowStart { + // reset WindowWrap + la.mux.Lock() + old, _ = sw.resetWindowTo(old, windowStart) + la.mux.Unlock() + return old, nil + } else if windowStart < old.WindowStart { + // Should not go through here, + return nil, errors.New(fmt.Sprintf("provided time timeMillis=%d is already behind old.WindowStart=%d", windowStart, old.WindowStart)) + } + } +} + +func (la *LeapArray) calculateTimeIdx(timeMillis uint64) uint32 { + timeId := (int)(timeMillis / uint64(la.windowLengthInMs)) + return uint32(timeId % len(la.array)) +} + +func (la *LeapArray) calculateStartTime(timeMillis uint64) uint64 { + return timeMillis - (timeMillis % uint64(la.windowLengthInMs)) +} + +// Get all the bucket in sliding window for current time; +func (la *LeapArray) Values() []*WindowWrap { + return la.valuesWithTime(uint64(time.Now().UnixNano()) / 1e6) +} + +func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { + if timeMillis <= 0 { + return nil + } + wwp := make([]*WindowWrap, 0) + for _, wwp_ := range la.array { + if wwp_ == nil { + //fmt.Printf("current bucket is nil, index is %d \n", idx) + wwp_ = &WindowWrap{ + windowLengthInMs: 200, + WindowStart: uint64(time.Now().Nanosecond() / 1e6), + Value: newEmptyMetricBucket(), + } + wwp = append(wwp, wwp_) + continue + } + ww := &WindowWrap{ + windowLengthInMs: wwp_.windowLengthInMs, + WindowStart: wwp_.WindowStart, + Value: wwp_.Value, + } + wwp = append(wwp, ww) + } + return wwp +} + +type BucketGenerator interface { + // 根据开始时间,创建一个新的统计bucket, bucket的具体数据结构可以有多个 + newEmptyBucket(startTime uint64) interface{} + + // 将窗口ww重置startTime和空的统计bucket + resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) +} + +/** + * The implement of sliding window based on struct MetricBucket + */ +type SlidingWindow struct { + data *LeapArray + BucketType string +} + +func NewSlidingWindow() *SlidingWindow { + array_ := make([]*WindowWrap, 5) + return &SlidingWindow{ + data: &LeapArray{ + windowLengthInMs: 200, + sampleCount: 5, + intervalInMs: 1000, + array: array_, + }, + BucketType: "metrics", + } +} + +func (sw *SlidingWindow) newEmptyBucket(startTime uint64) interface{} { + return newEmptyMetricBucket() +} + +func (sw *SlidingWindow) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { + ww.WindowStart = startTime + ww.Value = newEmptyMetricBucket() + return ww, nil +} + +func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + count := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.Value.(MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + cn := uint64(0) + var ce error + switch eventType { + case MetricEventSuccess: + cn, ce = mb.Get(MetricEventSuccess) + case MetricEventPass: + cn, ce = mb.Get(MetricEventPass) + case MetricEventError: + cn, ce = mb.Get(MetricEventError) + case MetricEventBlock: + cn, ce = mb.Get(MetricEventBlock) + case MetricEventRt: + cn, ce = mb.Get(MetricEventRt) + default: + ce = errors.New("unknown metric type! ") + } + if ce != nil { + fmt.Println("fail to count, reason: ", ce) + } + count += cn + } + return count +} + +func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { + curWindow, err := sw.data.CurrentWindow(sw) + if err != nil || curWindow == nil || curWindow.Value == nil { + fmt.Println("sliding window fail to record success") + return + } + + mb, ok := curWindow.Value.(MetricBucket) + if !ok { + fmt.Println("assert fail") + return + } + + var ae error + switch eventType { + case MetricEventSuccess: + ae = mb.Add(MetricEventSuccess, count) + case MetricEventPass: + ae = mb.Add(MetricEventPass, count) + case MetricEventError: + ae = mb.Add(MetricEventError, count) + case MetricEventBlock: + ae = mb.Add(MetricEventBlock, count) + case MetricEventRt: + ae = mb.Add(MetricEventRt, count) + default: + ae = errors.New("unknown metric type ") + } + if ae != nil { + fmt.Println("add success counter fail, reason: ", ae) + } +} + +func (sw *SlidingWindow) MaxSuccess() uint64 { + + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + + succ := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.Value.(MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + s, err := mb.Get(MetricEventSuccess) + if err != nil { + fmt.Println("get success counter fail, reason: ", err) + } + succ = uint64(math.Max(float64(succ), float64(s))) + } + return succ +} + +func (sw *SlidingWindow) MinSuccess() uint64 { + + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + + succ := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.Value.(MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + s, err := mb.Get(MetricEventSuccess) + if err != nil { + fmt.Println("get success counter fail, reason: ", err) + } + succ = uint64(math.Min(float64(succ), float64(s))) + } + return succ +} diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go new file mode 100644 index 000000000..b1d837e57 --- /dev/null +++ b/core/slots/statistic/data/metric_bucket.go @@ -0,0 +1,93 @@ +package data + +import ( + "errors" + "math" +) + +type MetricEventType int8 + +const ( + MetricEventPass MetricEventType = iota + MetricEventBlock + MetricEventSuccess + MetricEventError + MetricEventRt +) +const metricEventNum = 5 + +/** +MetricBucket store the metric statistic of each event +(MetricEventPass、MetricEventBlock、MetricEventError、MetricEventSuccess、MetricEventRt) +*/ +type MetricBucket struct { + counter []uint64 + minRt uint64 +} + +func (mb *MetricBucket) MetricEvents() []MetricEventType { + met := make([]MetricEventType, 0, metricEventNum) + met = append(met, MetricEventPass) + met = append(met, MetricEventBlock) + met = append(met, MetricEventError) + met = append(met, MetricEventSuccess) + met = append(met, MetricEventRt) + return met +} + +func newEmptyMetricBucket() MetricBucket { + return MetricBucket{ + counter: make([]uint64, metricEventNum, metricEventNum), + minRt: math.MaxUint64, + } +} + +func (mb *MetricBucket) Add(event MetricEventType, count uint64) error { + switch event { + case MetricEventPass: + mb.counter[0] += count + case MetricEventBlock: + mb.counter[1] += count + case MetricEventError: + mb.counter[2] += count + case MetricEventSuccess: + mb.counter[3] += count + case MetricEventRt: + mb.counter[4] += count + default: + return errors.New("unknown metric event type, " + string(event)) + } + return nil +} + +func (mb *MetricBucket) Get(event MetricEventType) (uint64, error) { + switch event { + case MetricEventPass: + return mb.counter[0], nil + case MetricEventBlock: + return mb.counter[1], nil + case MetricEventError: + return mb.counter[2], nil + case MetricEventSuccess: + return mb.counter[3], nil + case MetricEventRt: + return mb.counter[4], nil + default: + return 0, errors.New("unknown metric event type, " + string(event)) + } +} + +func (mb *MetricBucket) AddRt(rt uint64) error { + err := mb.Add(MetricEventRt, rt) + if rt < mb.minRt { + mb.minRt = rt + } + return err +} + +func (mb *MetricBucket) Reset() { + for i := 0; i < metricEventNum; i++ { + mb.counter[i] = 0 + } + mb.minRt = math.MaxUint64 +} diff --git a/core/slots/statistic/data/unary_leap_array.go b/core/slots/statistic/data/unary_leap_array.go new file mode 100644 index 000000000..4878ea8ca --- /dev/null +++ b/core/slots/statistic/data/unary_leap_array.go @@ -0,0 +1,15 @@ +package data + +type UnaryLeapArray struct { + LeapArray +} + +func (uls *UnaryLeapArray) newEmptyBucket(startTime uint64) interface{} { + return uint64(0) +} + +func (uls *UnaryLeapArray) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { + ww.WindowStart = startTime + ww.Value = uint64(0) + return ww, nil +} diff --git a/core/slots/statistic/statistic_slot.go b/core/slots/statistic/statistic_slot.go new file mode 100644 index 000000000..f7a1ddd89 --- /dev/null +++ b/core/slots/statistic/statistic_slot.go @@ -0,0 +1,25 @@ +package statistic + +import ( + "context" + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type StatisticSlot struct { +} + +func (ss *StatisticSlot) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { + return true +} + +func (ss *StatisticSlot) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { + node.AddGoroutineNum(count) + node.AddPass(uint64(count)) + return base.SlotResult{ + Status: base.ResultStatusOk, + } +} + +func (ss *StatisticSlot) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { + +} diff --git a/core/sphu.go b/core/sphu.go new file mode 100644 index 000000000..b4529a0a1 --- /dev/null +++ b/core/sphu.go @@ -0,0 +1,43 @@ +package core + +import ( + "fmt" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" + "sync" +) + +var defaultChain *chain.DefaultSlotChain +var defaultNode *base.DefaultNode +var resourceWrap *base.ResourceWrapper +var lock sync.Mutex + +func Entry(resource string) error { + lock.Lock() + if resourceWrap == nil { + fmt.Println("default resource chain is nil, init default chain") + resourceWrap = &base.ResourceWrapper{ + ResourceName: resource, + ResourceType: base.INBOUND, + } + } + if defaultChain == nil { + fmt.Println("default chain is nil, init default chain") + defaultChain = chain.NewDefaultSlotChain() + } + if defaultNode == nil { + fmt.Println("default node is nil, init default node") + defaultNode = base.NewDefaultNode(resourceWrap) + } + lock.Unlock() + defaultChain.Entry(nil, resourceWrap, defaultNode, 1) + return nil +} + +func Exit(resource string) { + resourceWrap := &base.ResourceWrapper{ + ResourceName: resource, + ResourceType: base.INBOUND, + } + defaultChain.Exit(nil, resourceWrap, 1) +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 000000000..7d8957738 --- /dev/null +++ b/example/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "github.com/sentinel-group/sentinel-golang/core" + "math/rand" + "sync" + "time" +) + +func main() { + fmt.Println("=================start=================") + wg := &sync.WaitGroup{} + wg.Add(10) + + for i := 0; i < 10; i++ { + test(wg) + } + wg.Wait() + fmt.Println("=================end=================") +} + +func test(wg *sync.WaitGroup) { + rand.Seed(1000) + r := rand.Int63() % 10 + time.Sleep(time.Duration(r) * time.Millisecond) + _ = core.Entry("test") + time.Sleep(time.Duration(r) * time.Millisecond) + core.Exit("test") + wg.Done() +} diff --git a/go.mod b/go.mod new file mode 100644 index 000000000..abda04c93 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/sentinel-group/sentinel-golang + +go 1.12 From cf77de35a550d34bdab5eb9e0e6a366700554b03 Mon Sep 17 00:00:00 2001 From: ytlou Date: Tue, 25 Jun 2019 01:14:28 +0800 Subject: [PATCH 02/12] Issues 1: add flow control --- core/slots/base/default_node.go | 6 +- core/slots/flow/flow_control.go | 64 +++++- core/slots/statistic/data/leap_array.go | 80 +++---- core/slots/statistic/data/leap_array_test.go | 205 ++++++++++++++++++ core/slots/statistic/data/metric_bucket.go | 43 ++-- core/slots/statistic/data/unary_leap_array.go | 4 +- 6 files changed, 329 insertions(+), 73 deletions(-) create mode 100644 core/slots/statistic/data/leap_array_test.go diff --git a/core/slots/base/default_node.go b/core/slots/base/default_node.go index 6c79e91fd..0ee9a45ff 100644 --- a/core/slots/base/default_node.go +++ b/core/slots/base/default_node.go @@ -23,10 +23,10 @@ type DefaultNode struct { func NewDefaultNode(wrapper *ResourceWrapper) *DefaultNode { return &DefaultNode{ - rollingCounterInSecond: data.NewSlidingWindow(), - rollingCounterInMinute: data.NewSlidingWindow(), + rollingCounterInSecond: data.NewSlidingWindow(sampleCount_, intervalInMs_), + rollingCounterInMinute: data.NewSlidingWindow(sampleCount_, intervalInMs_), currentGoroutineNum: 0, - lastFetchTime: uint64(time.Now().Nanosecond() / (1e6)), + lastFetchTime: uint64(time.Now().UnixNano() / (1e6)), resourceWrapper: wrapper, } } diff --git a/core/slots/flow/flow_control.go b/core/slots/flow/flow_control.go index 42d7e0a02..d46b45ca7 100644 --- a/core/slots/flow/flow_control.go +++ b/core/slots/flow/flow_control.go @@ -3,36 +3,84 @@ package flow import ( "context" "github.com/sentinel-group/sentinel-golang/core/slots/base" + "math" + "sync/atomic" + "time" ) type TrafficShapingController interface { CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool } -type WarmUpController struct { +type DefaultController struct { + grade FlowGradeType + count uint64 } -func (wpc WarmUpController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (dc *DefaultController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + curCount := dc.avgUsedTokens(node) + if (curCount + uint64(acquire)) > dc.count { + return false + } return true } +func (dc *DefaultController) avgUsedTokens(node *base.DefaultNode) uint64 { + if node == nil { + return 0 + } + if dc.grade == FlowGradeThread { + return node.CurGoroutineNum() + } + return node.PassQps() +} + type RateLimiterController struct { + count uint64 + maxQueueingTimeMs int64 + latestPassedTime int64 } -func (wpc RateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { - return true +func (rlc *RateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { + if acquire < 0 { + return true + } + // Reject when count is less or equal than 0. + // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. + currentTime := int64(time.Now().UnixNano()) / 1e6 + // Calculate the interval between every two requests. + costTime := int64(math.Round(float64(uint64(acquire)/rlc.count) * 1000)) + // Expected pass time of this request. + expectedTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) + + if expectedTime < currentTime { + // Contention may exist here, but it's okay. + atomic.CompareAndSwapInt64(&rlc.latestPassedTime, rlc.latestPassedTime, currentTime) + return true + } else { + // Calculate the time to wait. + waitTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) - int64(time.Now().UnixNano())/1e6 + if waitTime > rlc.maxQueueingTimeMs { + atomic.AddInt64(&rlc.latestPassedTime, -costTime) + return false + } + if waitTime > 0 { + time.Sleep(time.Duration(waitTime) * time.Millisecond) + } + return true + } } -type WarmUpRateLimiterController struct { +type WarmUpController struct { } -func (wpc WarmUpRateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (wpc WarmUpController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { return true } -type DefaultController struct { +type WarmUpRateLimiterController struct { } -func (wpc DefaultController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (wpc WarmUpRateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { return true } diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index 8dbd00319..a23f5abfc 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -10,16 +10,16 @@ import ( type WindowWrap struct { windowLengthInMs uint32 - WindowStart uint64 - Value interface{} + windowStart uint64 + value interface{} } func (ww *WindowWrap) resetTo(startTime uint64) { - ww.WindowStart = startTime + ww.windowStart = startTime } func (ww *WindowWrap) isTimeInWindow(timeMillis uint64) bool { - return ww.WindowStart <= timeMillis && timeMillis < ww.WindowStart+uint64(ww.windowLengthInMs) + return ww.windowStart <= timeMillis && timeMillis < ww.windowStart+uint64(ww.windowLengthInMs) } type LeapArray struct { @@ -48,24 +48,24 @@ func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator if old == nil { newWrap := &WindowWrap{ windowLengthInMs: la.windowLengthInMs, - WindowStart: windowStart, - Value: sw.newEmptyBucket(windowStart), + windowStart: windowStart, + value: sw.newEmptyBucket(windowStart), } la.mux.Lock() la.array[idx] = newWrap la.mux.Unlock() return la.array[idx], nil - } else if windowStart == old.WindowStart { + } else if windowStart == old.windowStart { return old, nil - } else if windowStart > old.WindowStart { + } else if windowStart > old.windowStart { // reset WindowWrap la.mux.Lock() old, _ = sw.resetWindowTo(old, windowStart) la.mux.Unlock() return old, nil - } else if windowStart < old.WindowStart { + } else if windowStart < old.windowStart { // Should not go through here, - return nil, errors.New(fmt.Sprintf("provided time timeMillis=%d is already behind old.WindowStart=%d", windowStart, old.WindowStart)) + return nil, errors.New(fmt.Sprintf("provided time timeMillis=%d is already behind old.windowStart=%d", windowStart, old.windowStart)) } } } @@ -94,16 +94,16 @@ func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { //fmt.Printf("current bucket is nil, index is %d \n", idx) wwp_ = &WindowWrap{ windowLengthInMs: 200, - WindowStart: uint64(time.Now().Nanosecond() / 1e6), - Value: newEmptyMetricBucket(), + windowStart: uint64(time.Now().UnixNano() / 1e6), + value: newEmptyMetricBucket(), } wwp = append(wwp, wwp_) continue } ww := &WindowWrap{ windowLengthInMs: wwp_.windowLengthInMs, - WindowStart: wwp_.WindowStart, - Value: wwp_.Value, + windowStart: wwp_.windowStart, + value: wwp_.value, } wwp = append(wwp, ww) } @@ -126,13 +126,17 @@ type SlidingWindow struct { BucketType string } -func NewSlidingWindow() *SlidingWindow { +func NewSlidingWindow(sampleCount uint32, intervalInMs uint32) *SlidingWindow { + if intervalInMs%sampleCount != 0 { + panic(fmt.Sprintf("invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) + } + winLengthInMs := intervalInMs / sampleCount array_ := make([]*WindowWrap, 5) return &SlidingWindow{ data: &LeapArray{ - windowLengthInMs: 200, - sampleCount: 5, - intervalInMs: 1000, + windowLengthInMs: winLengthInMs, + sampleCount: sampleCount, + intervalInMs: intervalInMs, array: array_, }, BucketType: "metrics", @@ -144,8 +148,8 @@ func (sw *SlidingWindow) newEmptyBucket(startTime uint64) interface{} { } func (sw *SlidingWindow) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { - ww.WindowStart = startTime - ww.Value = newEmptyMetricBucket() + ww.windowStart = startTime + ww.value = newEmptyMetricBucket() return ww, nil } @@ -156,7 +160,7 @@ func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { } count := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.Value.(MetricBucket) + mb, ok := ww.value.(MetricBucket) if !ok { fmt.Println("assert fail") continue @@ -165,15 +169,15 @@ func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { var ce error switch eventType { case MetricEventSuccess: - cn, ce = mb.Get(MetricEventSuccess) + cn = mb.Get(MetricEventSuccess) case MetricEventPass: - cn, ce = mb.Get(MetricEventPass) + cn = mb.Get(MetricEventPass) case MetricEventError: - cn, ce = mb.Get(MetricEventError) + cn = mb.Get(MetricEventError) case MetricEventBlock: - cn, ce = mb.Get(MetricEventBlock) + cn = mb.Get(MetricEventBlock) case MetricEventRt: - cn, ce = mb.Get(MetricEventRt) + cn = mb.Get(MetricEventRt) default: ce = errors.New("unknown metric type! ") } @@ -187,12 +191,12 @@ func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { curWindow, err := sw.data.CurrentWindow(sw) - if err != nil || curWindow == nil || curWindow.Value == nil { + if err != nil || curWindow == nil || curWindow.value == nil { fmt.Println("sliding window fail to record success") return } - mb, ok := curWindow.Value.(MetricBucket) + mb, ok := curWindow.value.(MetricBucket) if !ok { fmt.Println("assert fail") return @@ -201,17 +205,17 @@ func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { var ae error switch eventType { case MetricEventSuccess: - ae = mb.Add(MetricEventSuccess, count) + mb.Add(MetricEventSuccess, count) case MetricEventPass: - ae = mb.Add(MetricEventPass, count) + mb.Add(MetricEventPass, count) case MetricEventError: - ae = mb.Add(MetricEventError, count) + mb.Add(MetricEventError, count) case MetricEventBlock: - ae = mb.Add(MetricEventBlock, count) + mb.Add(MetricEventBlock, count) case MetricEventRt: - ae = mb.Add(MetricEventRt, count) + mb.Add(MetricEventRt, count) default: - ae = errors.New("unknown metric type ") + errors.New("unknown metric type ") } if ae != nil { fmt.Println("add success counter fail, reason: ", ae) @@ -227,12 +231,12 @@ func (sw *SlidingWindow) MaxSuccess() uint64 { succ := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.Value.(MetricBucket) + mb, ok := ww.value.(MetricBucket) if !ok { fmt.Println("assert fail") continue } - s, err := mb.Get(MetricEventSuccess) + s := mb.Get(MetricEventSuccess) if err != nil { fmt.Println("get success counter fail, reason: ", err) } @@ -250,12 +254,12 @@ func (sw *SlidingWindow) MinSuccess() uint64 { succ := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.Value.(MetricBucket) + mb, ok := ww.value.(MetricBucket) if !ok { fmt.Println("assert fail") continue } - s, err := mb.Get(MetricEventSuccess) + s := mb.Get(MetricEventSuccess) if err != nil { fmt.Println("get success counter fail, reason: ", err) } diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go new file mode 100644 index 000000000..88a7ad93c --- /dev/null +++ b/core/slots/statistic/data/leap_array_test.go @@ -0,0 +1,205 @@ +package data + +import ( + "sync" + "sync/atomic" + "testing" + time2 "time" +) + +const ( + windowLengthImMs_ uint32 = 200 + sampleCount_ uint32 = 5 + intervalInMs_ uint32 = 1000 +) + +func TestNewWindow(t *testing.T) { + slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + time := uint64(time2.Now().UnixNano() / 1e6) + + wr, err := slidingWindow.data.CurrentWindowWithTime(time, slidingWindow) + if wr == nil { + t.Errorf("Unexcepted error") + } + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != windowLengthImMs_ { + t.Errorf("Unexcepted error, winlength is not same") + } + if wr.windowStart != (time - time%uint64(windowLengthImMs_)) { + t.Errorf("Unexcepted error, winlength is not same") + } + if wr.value == nil { + t.Errorf("Unexcepted error, value is nil") + } + if slidingWindow.Count(MetricEventPass) != 0 { + t.Errorf("Unexcepted error, pass value is invalid") + } +} + +func TestLeapArrayWindowStart(t *testing.T) { + slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + firstTime := uint64(time2.Now().UnixNano() / 1e6) + previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) + + wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != windowLengthImMs_ { + t.Errorf("Unexpected error, winLength is not same") + } + if wr.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } +} + +func TestWindowAfterOneInterval(t *testing.T) { + slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + firstTime := uint64(time2.Now().UnixNano() / 1e6) + previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) + + wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != windowLengthImMs_ { + t.Errorf("Unexpected error, winLength is not same") + } + if wr.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } + if wr.value == nil { + t.Errorf("Unexcepted error") + } + mb, ok := wr.value.(MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + mb.Add(MetricEventPass, 1) + mb.Add(MetricEventBlock, 1) + mb.Add(MetricEventSuccess, 1) + mb.Add(MetricEventError, 1) + + if mb.Get(MetricEventPass) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventBlock) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventSuccess) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventError) != 1 { + t.Errorf("Unexcepted error") + } + + middleTime := previousWindowStart + uint64(windowLengthImMs_)/2 + wr2, err := slidingWindow.data.CurrentWindowWithTime(middleTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr2.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } + mb2, ok := wr2.value.(MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if wr != wr2 { + t.Errorf("Unexcepted error") + } + mb2.Add(MetricEventPass, 1) + if mb.Get(MetricEventPass) != 2 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventBlock) != 1 { + t.Errorf("Unexcepted error") + } + + lastTime := middleTime + uint64(windowLengthImMs_)/2 + wr3, err := slidingWindow.data.CurrentWindowWithTime(lastTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr3.windowLengthInMs != windowLengthImMs_ { + t.Errorf("Unexpected error") + } + if (wr3.windowStart - uint64(windowLengthImMs_)) != previousWindowStart { + t.Errorf("Unexpected error") + } + mb3, ok := wr3.value.(MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if &mb3 == nil { + t.Errorf("Unexcepted error") + } + + if mb3.Get(MetricEventPass) != 0 { + t.Errorf("Unexcepted error") + } + if mb3.Get(MetricEventBlock) != 0 { + t.Errorf("Unexcepted error") + } +} + +func TestNTimeMultiGoroutineUpdateEmptyWindow(t *testing.T) { + for i := 0; i < 10; i++ { + nTestMultiGoroutineUpdateEmptyWindow(t) + } +} + +func task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testing.T, ct *uint64, lock *sync.Mutex) { + wr, err := slidingWindow.data.CurrentWindowWithTime(ti, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + lock.Lock() + mb, ok := wr.value.(MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + mb.Add(MetricEventPass, 1) + mb.Add(MetricEventBlock, 1) + mb.Add(MetricEventSuccess, 1) + mb.Add(MetricEventError, 1) + atomic.AddUint64(ct, 1) + lock.Unlock() + wg.Done() +} + +func nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { + slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + firstTime := uint64(time2.Now().UnixNano() / 1e6) + + const GoroutineNum = 10000 + wg := &sync.WaitGroup{} + lock := &sync.Mutex{} + wg.Add(GoroutineNum) + st := time2.Now().UnixNano() + var ct = uint64(0) + for i := 0; i < GoroutineNum; i++ { + go task(wg, slidingWindow, firstTime, t, &ct, lock) + } + wg.Wait() + t.Logf("finish goroutines: %d", atomic.LoadUint64(&ct)) + et := time2.Now().UnixNano() + dif := et - st + t.Logf("finish all goroutines, cost time is %d", dif) + wr2, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + mb2, ok := wr2.value.(MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if mb2.Get(MetricEventPass) != GoroutineNum { + t.Errorf("Unexcepted error, infact, %d", mb2.Get(MetricEventPass)) + } + if mb2.Get(MetricEventBlock) != GoroutineNum { + t.Errorf("Unexcepted error, infact, %d", mb2.Get(MetricEventBlock)) + } +} diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go index b1d837e57..c941ea72b 100644 --- a/core/slots/statistic/data/metric_bucket.go +++ b/core/slots/statistic/data/metric_bucket.go @@ -1,8 +1,8 @@ package data import ( - "errors" "math" + "sync/atomic" ) type MetricEventType int8 @@ -25,7 +25,7 @@ type MetricBucket struct { minRt uint64 } -func (mb *MetricBucket) MetricEvents() []MetricEventType { +func (mb *MetricBucket) metricEvents() []MetricEventType { met := make([]MetricEventType, 0, metricEventNum) met = append(met, MetricEventPass) met = append(met, MetricEventBlock) @@ -36,58 +36,57 @@ func (mb *MetricBucket) MetricEvents() []MetricEventType { } func newEmptyMetricBucket() MetricBucket { - return MetricBucket{ + mb := MetricBucket{ counter: make([]uint64, metricEventNum, metricEventNum), minRt: math.MaxUint64, } + return mb } -func (mb *MetricBucket) Add(event MetricEventType, count uint64) error { +func (mb *MetricBucket) Add(event MetricEventType, count uint64) { switch event { case MetricEventPass: - mb.counter[0] += count + atomic.AddUint64(&mb.counter[0], count) case MetricEventBlock: - mb.counter[1] += count + atomic.AddUint64(&mb.counter[1], count) case MetricEventError: - mb.counter[2] += count + atomic.AddUint64(&mb.counter[2], count) case MetricEventSuccess: - mb.counter[3] += count + atomic.AddUint64(&mb.counter[3], count) case MetricEventRt: - mb.counter[4] += count + atomic.AddUint64(&mb.counter[4], count) default: - return errors.New("unknown metric event type, " + string(event)) + panic("unknown metric event type, " + string(event)) } - return nil } -func (mb *MetricBucket) Get(event MetricEventType) (uint64, error) { +func (mb *MetricBucket) Get(event MetricEventType) uint64 { switch event { case MetricEventPass: - return mb.counter[0], nil + return atomic.LoadUint64(&mb.counter[0]) case MetricEventBlock: - return mb.counter[1], nil + return atomic.LoadUint64(&mb.counter[1]) case MetricEventError: - return mb.counter[2], nil + return atomic.LoadUint64(&mb.counter[2]) case MetricEventSuccess: - return mb.counter[3], nil + return atomic.LoadUint64(&mb.counter[3]) case MetricEventRt: - return mb.counter[4], nil + return atomic.LoadUint64(&mb.counter[4]) default: - return 0, errors.New("unknown metric event type, " + string(event)) + panic("unknown metric event type, " + string(event)) } } -func (mb *MetricBucket) AddRt(rt uint64) error { - err := mb.Add(MetricEventRt, rt) +func (mb *MetricBucket) AddRt(rt uint64) { + mb.Add(MetricEventRt, rt) if rt < mb.minRt { mb.minRt = rt } - return err } func (mb *MetricBucket) Reset() { for i := 0; i < metricEventNum; i++ { - mb.counter[i] = 0 + atomic.StoreUint64(&mb.counter[i], 0) } mb.minRt = math.MaxUint64 } diff --git a/core/slots/statistic/data/unary_leap_array.go b/core/slots/statistic/data/unary_leap_array.go index 4878ea8ca..7828de680 100644 --- a/core/slots/statistic/data/unary_leap_array.go +++ b/core/slots/statistic/data/unary_leap_array.go @@ -9,7 +9,7 @@ func (uls *UnaryLeapArray) newEmptyBucket(startTime uint64) interface{} { } func (uls *UnaryLeapArray) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { - ww.WindowStart = startTime - ww.Value = uint64(0) + ww.windowStart = startTime + ww.value = uint64(0) return ww, nil } From 946a93829df407a849042fe1a1b9978943c00879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Thu, 11 Jul 2019 14:45:12 +0800 Subject: [PATCH 03/12] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=B5=8B=E5=80=BC?= =?UTF-8?q?=E6=8C=87=E9=92=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/slots/flow/flow_rule.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/slots/flow/flow_rule.go b/core/slots/flow/flow_rule.go index e1b4add7c..539f1fdfe 100644 --- a/core/slots/flow/flow_rule.go +++ b/core/slots/flow/flow_rule.go @@ -186,15 +186,15 @@ func generateFlowControl(r *rule) TrafficShapingController { if r.grade_ == FlowGradeQps { switch r.controlBehavior_ { case ControlBehaviorWarmUp: - return WarmUpController{} + return new(WarmUpController) case ControlBehaviorRateLimiter: - return RateLimiterController{} + return new(RateLimiterController) case ControlBehaviorWarmUpRateLimiter: - return WarmUpRateLimiterController{} + return new(WarmUpRateLimiterController) default: } } - return DefaultController{} + return new(DefaultController) } // From 96e5060af8e7bd9eaf496d10de8ef2a56fe0ea57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Thu, 11 Jul 2019 15:39:20 +0800 Subject: [PATCH 04/12] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=B5=8B=E5=80=BC?= =?UTF-8?q?=E6=8C=87=E9=92=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/slots/flow/flow_rule.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/slots/flow/flow_rule.go b/core/slots/flow/flow_rule.go index 539f1fdfe..77392f2dd 100644 --- a/core/slots/flow/flow_rule.go +++ b/core/slots/flow/flow_rule.go @@ -172,7 +172,7 @@ func buildFlowRuleMap(rules []*rule) map[string][]*rule { } r.controller_ = generateFlowControl(r) srcName := r.resource_ - var slc []*rule = ret[srcName] + var slc = ret[srcName] if slc == nil { slc = make([]*rule, 0) } From 080ff812f767e1879e4823dff3bdd8a35c52625b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Thu, 11 Jul 2019 22:05:35 +0800 Subject: [PATCH 05/12] =?UTF-8?q?=E8=81=8C=E8=B4=A3=E9=93=BE=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=BA=86=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=20slot?= =?UTF-8?q?=20=E5=8F=AA=E9=9C=80=E8=A6=81=E7=BB=A7=E6=89=BF=20LinkedSlot?= =?UTF-8?q?=20=E5=AE=9E=E7=8E=B0=20Entry()=20Exit()=20=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E5=B0=B1=E5=A5=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 ++ core/slots/base/base.go | 19 ---- core/slots/base/context.go | 13 +++ core/{public => slots/base}/entry.go | 5 +- core/slots/base/result.go | 46 +++++++++ core/slots/chain/slot.go | 60 ++++++++++- core/slots/chain/slot_chain.go | 103 +++++++------------ core/slots/chain/slot_chain_test.go | 132 +++++++++++++++++++++++++ core/slots/flow/flow_control.go | 11 +-- core/slots/flow/flow_slot.go | 46 +++------ core/slots/statistic/statistic_slot.go | 37 ++++--- core/sphu.go | 59 ++++++----- example/main.go | 17 +++- 13 files changed, 386 insertions(+), 168 deletions(-) create mode 100644 README.md create mode 100644 core/slots/base/context.go rename core/{public => slots/base}/entry.go (61%) create mode 100644 core/slots/base/result.go create mode 100644 core/slots/chain/slot_chain_test.go diff --git a/README.md b/README.md new file mode 100644 index 000000000..50e62b180 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# 项目简介 +## 目录划分: + public: 是公共部分也是外部直接使用部分。 + slots : 职责链相关 + + diff --git a/core/slots/base/base.go b/core/slots/base/base.go index c1cf6095c..426dbaa08 100644 --- a/core/slots/base/base.go +++ b/core/slots/base/base.go @@ -4,22 +4,3 @@ const ( INBOUND = iota OUTBOUND ) - -type ResourceWrapper struct { - // unique resource name - ResourceName string - // - ResourceType int -} - -type SlotResultStatus int8 - -const ( - ResultStatusOk = iota - ResultStatusBlocked -) - -type SlotResult struct { - Status SlotResultStatus - BlockedReason string -} diff --git a/core/slots/base/context.go b/core/slots/base/context.go new file mode 100644 index 000000000..51eb5cb3c --- /dev/null +++ b/core/slots/base/context.go @@ -0,0 +1,13 @@ +package base + +import ( + "context" +) + +type Context struct { + name string + entranceNode DefaultNode + curEntry Entry + origin string + context context.Context +} diff --git a/core/public/entry.go b/core/slots/base/entry.go similarity index 61% rename from core/public/entry.go rename to core/slots/base/entry.go index c583962f5..66ada10ff 100644 --- a/core/public/entry.go +++ b/core/slots/base/entry.go @@ -1,14 +1,13 @@ -package public +package base import ( "github.com/sentinel-group/sentinel-golang/core/node" - "github.com/sentinel-group/sentinel-golang/core/slots/base" ) type Entry struct { createTime uint64 originNode node.Node currentNode node.Node - resourceWrap *base.ResourceWrapper + resourceWrap *ResourceWrapper err error } diff --git a/core/slots/base/result.go b/core/slots/base/result.go new file mode 100644 index 000000000..9b39ffbe8 --- /dev/null +++ b/core/slots/base/result.go @@ -0,0 +1,46 @@ +/** + * @description: + * + * @author: helloworld + * @date:2019-07-11 + */ +package base + +type ResourceWrapper struct { + // unique resource name + ResourceName string + // + ResourceType int +} + +type SlotResultStatus int8 + +const ( + ResultStatusPass = iota + ResultStatusBlocked + ResultStatusWait + ResultStatusError +) + +type TokenResult struct { + Status SlotResultStatus + BlockedReason string + WaitMs uint64 + ErrorMsg string +} + +func NewSlotResultPass() *TokenResult { + return &TokenResult{Status: ResultStatusPass} +} + +func NewSlotResultBlock(blockedReason string) *TokenResult { + return &TokenResult{Status: ResultStatusBlocked, BlockedReason: blockedReason} +} + +func NewSlotResultWait(waitMs uint64) *TokenResult { + return &TokenResult{Status: ResultStatusWait, WaitMs: waitMs} +} + +func NewSlotResultError(errorMsg string) *TokenResult { + return &TokenResult{Status: ResultStatusError, ErrorMsg: errorMsg} +} diff --git a/core/slots/chain/slot.go b/core/slots/chain/slot.go index 37b811cb6..7c33c4916 100644 --- a/core/slots/chain/slot.go +++ b/core/slots/chain/slot.go @@ -1,14 +1,66 @@ package chain import ( - "context" "github.com/sentinel-group/sentinel-golang/core/slots/base" ) +// a solt type Slot interface { - IsContinue(lastResult base.SlotResult, ctx context.Context) bool + /** + * Entrance of this slots. + */ + Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) - Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult + Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error - Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) + // 传递进入 + FireEntry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) + + // 传递退出 + FireExit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error + + GetNext() Slot + + SetNext(next Slot) +} + +// a slot can make slot compose linked +type LinkedSlot struct { + // next linkedSlot + next Slot +} + +// 传递退出 +func (s *LinkedSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +// 传递进入 +func (s *LinkedSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return s.FireExit(context, resourceWrapper, count) +} + +// 传递进入, 没有下一个就返回 ResultStatusPass +func (s *LinkedSlot) FireEntry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + if s.next != nil { + return s.next.Entry(context, resourceWrapper, defaultNode, count, prioritized) + } + return base.NewSlotResultPass(), nil +} + +// 传递退出,没有下一个就返回 +func (s *LinkedSlot) FireExit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + if s.next != nil { + return s.next.Exit(context, resourceWrapper, count) + } else { + return nil + } +} + +func (s *LinkedSlot) GetNext() Slot { + return s.next +} + +func (s *LinkedSlot) SetNext(next Slot) { + s.next = next } diff --git a/core/slots/chain/slot_chain.go b/core/slots/chain/slot_chain.go index 5eb7c4738..1489e6699 100644 --- a/core/slots/chain/slot_chain.go +++ b/core/slots/chain/slot_chain.go @@ -1,90 +1,63 @@ package chain import ( - "container/list" - "context" - "fmt" "github.com/sentinel-group/sentinel-golang/core/slots/base" - "github.com/sentinel-group/sentinel-golang/core/slots/flow" - "github.com/sentinel-group/sentinel-golang/core/slots/statistic" ) type SlotChain interface { - Slot + /** + * Add a processor to the head of this slots slotchain. + * + * @param protocolProcessor processor to be added. + */ AddFirst(slot Slot) + + /** + * Add a processor to the tail of this slots slotchain. + * + * @param protocolProcessor processor to be added. + */ AddLast(slot Slot) + + // fire to next slot + Entry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) + // fire to next slot + Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error } -type DefaultSlotChain struct { - slots *list.List +// implent SlotChain +type LinkedSlotChain struct { + first Slot + end Slot } -type SlotChainBuilder interface { - Build() SlotChain +func NewLinkedSlotChain() *LinkedSlotChain { + fs := new(LinkedSlot) + return &LinkedSlotChain{first: fs, end: fs} } -func NewDefaultSlotChain() *DefaultSlotChain { - defaultSlotChain := &DefaultSlotChain{ - slots: list.New(), +func (lsc *LinkedSlotChain) AddFirst(slot Slot) { + slot.SetNext(lsc.first.GetNext()) + lsc.first.SetNext(slot) + if lsc.end == lsc.first { + lsc.end = slot } - defaultSlotChain.AddLast(&flow.FlowSlot{ - RuleManager: flow.NewRuleManager(), - }) - defaultSlotChain.AddLast(&statistic.StatisticSlot{}) - return defaultSlotChain } -func (dsc *DefaultSlotChain) AddFirst(slot Slot) { - dsc.slots.PushFront(slot) +func (lsc *LinkedSlotChain) AddLast(slot Slot) { + lsc.end.SetNext(slot) + lsc.end = slot } -func (dsc *DefaultSlotChain) AddLast(slot Slot) { - dsc.slots.PushBack(slot) +func (lsc *LinkedSlotChain) Entry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + return lsc.first.Entry(context, resourceWrapper, defaultNode, count, prioritized) } -func (dsc *DefaultSlotChain) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { - return true +// 传递进入 +func (lsc *LinkedSlotChain) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return lsc.first.Exit(context, resourceWrapper, count) } -func (dsc *DefaultSlotChain) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { - slotResult := base.SlotResult{ - Status: base.ResultStatusOk, - } - for e := dsc.slots.Front(); e != nil; e = e.Next() { - slot := e.Value - switch slot_ := slot.(type) { - case *flow.FlowSlot: - if slot_.IsContinue(slotResult, ctx) { - slotResult = slot_.Entry(ctx, resourceWrap, node, count) - } - break - case *statistic.StatisticSlot: - if slot_.IsContinue(slotResult, ctx) { - slotResult = slot_.Entry(ctx, resourceWrap, node, count) - } - break - default: - slotResult = base.SlotResult{ - Status: base.ResultStatusBlocked, - BlockedReason: "Unknown Slot", - } - } - } - return slotResult -} - -func (dsc *DefaultSlotChain) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { - for e := dsc.slots.Front(); e != nil; e = e.Next() { - slot := e.Value - switch slot_ := slot.(type) { - case *flow.FlowSlot: - slot_.Exit(ctx, resourceWrap, count) - break - case *statistic.StatisticSlot: - slot_.Exit(ctx, resourceWrap, count) - break - default: - fmt.Println("DefaultSlotChain Exit error!") - } - } +type SlotChainBuilder interface { + Build() SlotChain } diff --git a/core/slots/chain/slot_chain_test.go b/core/slots/chain/slot_chain_test.go new file mode 100644 index 000000000..f49b64c31 --- /dev/null +++ b/core/slots/chain/slot_chain_test.go @@ -0,0 +1,132 @@ +/** + * @description: + * + * @author: helloworld + * @date:2019-07-11 + */ +package chain + +import ( + "errors" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "testing" +) + +// implent slot for unit test + +// 继承 LinkedProessorSlot 并完全实现 Slot +type IncrSlot struct { + LinkedSlot +} + +func (s *IncrSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + count++ + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +func (s *IncrSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + count++ + return s.FireExit(context, resourceWrapper, count) +} + +// 继承 LinkedProessorSlot 并完全实现 Slot +type DecrSlot struct { + LinkedSlot +} + +func (s *DecrSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + count-- + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +func (s *DecrSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + count-- + return s.FireExit(context, resourceWrapper, count) +} + +// 继承 LinkedProessorSlot 并完全实现 Slot +type GreaterZeroPassSlot struct { + num int + LinkedSlot +} + +func (s *GreaterZeroPassSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + if count > s.num { + return s.FireEntry(ctx, resWrapper, node, count, prioritized) + } else { + return base.NewSlotResultBlock("GreaterZeroPassSlot"), nil + } +} + +func (s *GreaterZeroPassSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + if count <= 0 { + return errors.New("GreaterZeroPassSlot") + } + return s.FireExit(context, resourceWrapper, count) +} + +func TestLinkedSlotChain_AddFirst_Pass(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddFirst(new(GreaterZeroPassSlot)) + newChain.AddFirst(new(IncrSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusPass { + t.Fatal("TestLinkedSlotChain_AddFirst_Block") + } + err := newChain.Exit(nil, nil, 0) + if err != nil { + t.Fatal(err) + } +} + +func TestLinkedSlotChain_AddFirst_Block(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddFirst(new(IncrSlot)) + newChain.AddFirst(new(GreaterZeroPassSlot)) + newChain.AddFirst(new(DecrSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusBlocked { + t.Fatal("TestLinkedSlotChain_AddFirst_Block") + } + err := newChain.Exit(nil, nil, 0) + if err == nil { + t.Fatal("should has error") + } +} + +func TestLinkedSlotChain_AddLast_Pass(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(GreaterZeroPassSlot)) + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusPass { + t.Fatal("TestLinkedSlotChain_AddLast_Pass") + } + + err := newChain.Exit(nil, nil, 0) + if err != nil { + t.Fatal(err) + } +} + +func TestLinkedSlotChain_AddLast_Block(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(GreaterZeroPassSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusBlocked { + t.Fatal("TestLinkedSlotChain_AddLast_Block") + } + + err := newChain.Exit(nil, nil, 0) + if err == nil { + t.Fatal("should has error") + } +} diff --git a/core/slots/flow/flow_control.go b/core/slots/flow/flow_control.go index d46b45ca7..43040f31c 100644 --- a/core/slots/flow/flow_control.go +++ b/core/slots/flow/flow_control.go @@ -1,7 +1,6 @@ package flow import ( - "context" "github.com/sentinel-group/sentinel-golang/core/slots/base" "math" "sync/atomic" @@ -9,7 +8,7 @@ import ( ) type TrafficShapingController interface { - CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool + CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool } type DefaultController struct { @@ -17,7 +16,7 @@ type DefaultController struct { count uint64 } -func (dc *DefaultController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (dc *DefaultController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { curCount := dc.avgUsedTokens(node) if (curCount + uint64(acquire)) > dc.count { return false @@ -41,7 +40,7 @@ type RateLimiterController struct { latestPassedTime int64 } -func (rlc *RateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (rlc *RateLimiterController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { if acquire < 0 { return true } @@ -74,13 +73,13 @@ func (rlc *RateLimiterController) CanPass(ctx context.Context, node *base.Defaul type WarmUpController struct { } -func (wpc WarmUpController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (wpc WarmUpController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { return true } type WarmUpRateLimiterController struct { } -func (wpc WarmUpRateLimiterController) CanPass(ctx context.Context, node *base.DefaultNode, acquire uint32) bool { +func (wpc WarmUpRateLimiterController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { return true } diff --git a/core/slots/flow/flow_slot.go b/core/slots/flow/flow_slot.go index 2b1acadc5..7f65a37e5 100644 --- a/core/slots/flow/flow_slot.go +++ b/core/slots/flow/flow_slot.go @@ -1,64 +1,48 @@ package flow import ( - "context" - "fmt" "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" ) type FlowSlot struct { + chain.LinkedSlot RuleManager *RuleManager } -func (fs *FlowSlot) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { - - if lastResult.Status == base.ResultStatusOk { - return true - } - return false -} - -func (fs *FlowSlot) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { - fmt.Println("flowSlot request number is ", node.TotalRequest()) +func (fs *FlowSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + // no rule return pass if fs.RuleManager == nil { - return base.SlotResult{ - Status: base.ResultStatusOk, - } + return fs.FireEntry(ctx, resWrapper, node, count, false) } - rules := fs.RuleManager.getRuleBySource(resourceWrap.ResourceName) + rules := fs.RuleManager.getRuleBySource(resWrapper.ResourceName) if len(rules) == 0 { - return base.SlotResult{ - Status: base.ResultStatusOk, - } + return fs.FireEntry(ctx, resWrapper, node, count, false) } - success := checkFlow(ctx, resourceWrap, rules, node, count) + success := checkFlow(ctx, resWrapper, rules, node, count) if success { - return base.SlotResult{ - Status: base.ResultStatusOk, - } + return fs.FireEntry(ctx, resWrapper, node, count, false) } else { - return base.SlotResult{ - Status: base.ResultStatusBlocked, - } + return base.NewSlotResultBlock("FlowSlot"), nil } } -func (fs *FlowSlot) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { - +func (fs *FlowSlot) Exit(ctx *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return fs.FireExit(ctx, resourceWrapper, count) } -func checkFlow(ctx context.Context, resourceWrap *base.ResourceWrapper, rules []*rule, node *base.DefaultNode, count uint32) bool { +func checkFlow(ctx *base.Context, resourceWrap *base.ResourceWrapper, rules []*rule, node *base.DefaultNode, count int) bool { if rules == nil { return true } for _, rule := range rules { - if !canPass(ctx, resourceWrap, rule, node, count) { + if !canPass(ctx, resourceWrap, rule, node, uint32(count)) { return false } } return true } -func canPass(ctx context.Context, resourceWrap *base.ResourceWrapper, rule *rule, node *base.DefaultNode, count uint32) bool { +func canPass(ctx *base.Context, resourceWrap *base.ResourceWrapper, rule *rule, node *base.DefaultNode, count uint32) bool { return rule.controller_.CanPass(ctx, node, count) } diff --git a/core/slots/statistic/statistic_slot.go b/core/slots/statistic/statistic_slot.go index f7a1ddd89..a75dcba27 100644 --- a/core/slots/statistic/statistic_slot.go +++ b/core/slots/statistic/statistic_slot.go @@ -1,25 +1,40 @@ package statistic import ( - "context" + "errors" "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" ) type StatisticSlot struct { + chain.LinkedSlot } -func (ss *StatisticSlot) IsContinue(lastResult base.SlotResult, ctx context.Context) bool { - return true -} +func (fs *StatisticSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + var r *base.TokenResult + var err error + defer func() { + if e := recover(); e != nil { + r = base.NewSlotResultError("StatisticSlot") + err = errors.New("panic occur") + } + }() + // fire next slot + result, err := fs.FireEntry(ctx, resWrapper, node, count, prioritized) -func (ss *StatisticSlot) Entry(ctx context.Context, resourceWrap *base.ResourceWrapper, node *base.DefaultNode, count uint32) base.SlotResult { - node.AddGoroutineNum(count) - node.AddPass(uint64(count)) - return base.SlotResult{ - Status: base.ResultStatusOk, + if err != nil { + // TO DO } -} + if result.Status == base.ResultStatusError { + // TO DO + } + if result.Status == base.ResultStatusPass { + node.AddPass(1) -func (ss *StatisticSlot) Exit(ctx context.Context, resourceWrap *base.ResourceWrapper, count uint32) { + } + return result, err +} +func (fs *StatisticSlot) Exit(ctx *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return fs.FireExit(ctx, resourceWrapper, count) } diff --git a/core/sphu.go b/core/sphu.go index b4529a0a1..661156e04 100644 --- a/core/sphu.go +++ b/core/sphu.go @@ -1,43 +1,48 @@ package core import ( - "fmt" "github.com/sentinel-group/sentinel-golang/core/slots/base" "github.com/sentinel-group/sentinel-golang/core/slots/chain" - "sync" + "github.com/sentinel-group/sentinel-golang/core/slots/flow" + "github.com/sentinel-group/sentinel-golang/core/slots/statistic" ) -var defaultChain *chain.DefaultSlotChain +type DefaultSlotChainBuilder struct { +} + +func (dsc *DefaultSlotChainBuilder) Build() chain.SlotChain { + linkedChain := chain.NewLinkedSlotChain() + linkedChain.AddFirst(new(flow.FlowSlot)) + linkedChain.AddFirst(new(statistic.StatisticSlot)) + // add all slot + return linkedChain +} + +func NewDefaultSlotChainBuilder() *DefaultSlotChainBuilder { + return &DefaultSlotChainBuilder{} +} + +var defaultChain chain.SlotChain var defaultNode *base.DefaultNode -var resourceWrap *base.ResourceWrapper -var lock sync.Mutex - -func Entry(resource string) error { - lock.Lock() - if resourceWrap == nil { - fmt.Println("default resource chain is nil, init default chain") - resourceWrap = &base.ResourceWrapper{ - ResourceName: resource, - ResourceType: base.INBOUND, - } - } - if defaultChain == nil { - fmt.Println("default chain is nil, init default chain") - defaultChain = chain.NewDefaultSlotChain() - } - if defaultNode == nil { - fmt.Println("default node is nil, init default node") - defaultNode = base.NewDefaultNode(resourceWrap) + +func init() { + defaultChain = NewDefaultSlotChainBuilder().Build() + defaultNode = base.NewDefaultNode(nil) +} + +func Entry(resource string) (*base.TokenResult, error) { + resourceWrap := &base.ResourceWrapper{ + ResourceName: resource, + ResourceType: base.INBOUND, } - lock.Unlock() - defaultChain.Entry(nil, resourceWrap, defaultNode, 1) - return nil + + return defaultChain.Entry(nil, resourceWrap, defaultNode, 0, false) } -func Exit(resource string) { +func Exit(resource string) error { resourceWrap := &base.ResourceWrapper{ ResourceName: resource, ResourceType: base.INBOUND, } - defaultChain.Exit(nil, resourceWrap, 1) + return defaultChain.Exit(nil, resourceWrap, 1) } diff --git a/example/main.go b/example/main.go index 7d8957738..28dd40b3a 100644 --- a/example/main.go +++ b/example/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "github.com/sentinel-group/sentinel-golang/core" + "github.com/sentinel-group/sentinel-golang/core/slots/base" "math/rand" "sync" "time" @@ -24,8 +25,20 @@ func test(wg *sync.WaitGroup) { rand.Seed(1000) r := rand.Int63() % 10 time.Sleep(time.Duration(r) * time.Millisecond) - _ = core.Entry("test") + result, e := core.Entry("test") + if e != nil { + fmt.Println(e.Error()) + return + } + if result.Status == base.ResultStatusBlocked { + fmt.Println("reason:", result.BlockedReason) + } + if result.Status == base.ResultStatusError { + fmt.Println("reason:", result.ErrorMsg) + } + if result.Status == base.ResultStatusPass { + _ = core.Exit("test") + } time.Sleep(time.Duration(r) * time.Millisecond) - core.Exit("test") wg.Done() } From 60d24ad14d6bf122e1f05e6d66209d496a67eeab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Thu, 11 Jul 2019 22:08:05 +0800 Subject: [PATCH 06/12] =?UTF-8?q?=20=E4=BF=AE=E6=94=B9=E4=BA=86=20readme.m?= =?UTF-8?q?d?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 50e62b180..8fc0e3423 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # 项目简介 ## 目录划分: - public: 是公共部分也是外部直接使用部分。 - slots : 职责链相关 + core : 核心实现 + example : 示例代码 From 178d32f16d68d9175c5b4174e53b0db8a7d0db7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Fri, 12 Jul 2019 15:34:58 +0800 Subject: [PATCH 07/12] =?UTF-8?q?metric=5Fbucket=20=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E5=9B=BA=E5=AE=9A=E6=95=B0=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/slots/statistic/data/metric_bucket.go | 106 ++++++++++++--------- 1 file changed, 59 insertions(+), 47 deletions(-) diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go index c941ea72b..00219453c 100644 --- a/core/slots/statistic/data/metric_bucket.go +++ b/core/slots/statistic/data/metric_bucket.go @@ -13,80 +13,92 @@ const ( MetricEventSuccess MetricEventError MetricEventRt + // hack for getting length of enum + metricEventNum ) -const metricEventNum = 5 /** MetricBucket store the metric statistic of each event (MetricEventPass、MetricEventBlock、MetricEventError、MetricEventSuccess、MetricEventRt) */ type MetricBucket struct { - counter []uint64 - minRt uint64 + // value of statistic + counters [metricEventNum]uint64 + minRt uint64 } -func (mb *MetricBucket) metricEvents() []MetricEventType { - met := make([]MetricEventType, 0, metricEventNum) - met = append(met, MetricEventPass) - met = append(met, MetricEventBlock) - met = append(met, MetricEventError) - met = append(met, MetricEventSuccess) - met = append(met, MetricEventRt) - return met -} - -func newEmptyMetricBucket() MetricBucket { +func newMetricBucket() MetricBucket { mb := MetricBucket{ - counter: make([]uint64, metricEventNum, metricEventNum), - minRt: math.MaxUint64, + minRt: math.MaxUint64, } return mb } func (mb *MetricBucket) Add(event MetricEventType, count uint64) { - switch event { - case MetricEventPass: - atomic.AddUint64(&mb.counter[0], count) - case MetricEventBlock: - atomic.AddUint64(&mb.counter[1], count) - case MetricEventError: - atomic.AddUint64(&mb.counter[2], count) - case MetricEventSuccess: - atomic.AddUint64(&mb.counter[3], count) - case MetricEventRt: - atomic.AddUint64(&mb.counter[4], count) - default: - panic("unknown metric event type, " + string(event)) + if event > metricEventNum { + panic("event is bigger then metricEventNum") } + atomic.AddUint64(&mb.counters[event], count) } func (mb *MetricBucket) Get(event MetricEventType) uint64 { - switch event { - case MetricEventPass: - return atomic.LoadUint64(&mb.counter[0]) - case MetricEventBlock: - return atomic.LoadUint64(&mb.counter[1]) - case MetricEventError: - return atomic.LoadUint64(&mb.counter[2]) - case MetricEventSuccess: - return atomic.LoadUint64(&mb.counter[3]) - case MetricEventRt: - return atomic.LoadUint64(&mb.counter[4]) - default: - panic("unknown metric event type, " + string(event)) + if event > metricEventNum { + panic("event is bigger then metricEventNum") } + return mb.counters[event] +} + +func (mb *MetricBucket) MinRt() uint64 { + return mb.minRt +} + +func (mb *MetricBucket) Reset() { + for i := 0; i < int(metricEventNum); i++ { + atomic.StoreUint64(&mb.counters[i], 0) + } + atomic.StoreUint64(&mb.minRt, math.MaxUint64) +} + +func (mb *MetricBucket) AddPass(n uint64) { + mb.Add(MetricEventPass, n) +} + +func (mb *MetricBucket) Pass() uint64 { + return mb.Get(MetricEventPass) +} + +func (mb *MetricBucket) AddBlock(n uint64) { + mb.Add(MetricEventBlock, n) +} + +func (mb *MetricBucket) Block() uint64 { + return mb.Get(MetricEventBlock) +} + +func (mb *MetricBucket) AddSuccess(n uint64) { + mb.Add(MetricEventSuccess, n) +} + +func (mb *MetricBucket) Success() uint64 { + return mb.Get(MetricEventSuccess) +} + +func (mb *MetricBucket) AddError(n uint64) { + mb.Add(MetricEventError, n) +} + +func (mb *MetricBucket) Error() uint64 { + return mb.Get(MetricEventError) } func (mb *MetricBucket) AddRt(rt uint64) { mb.Add(MetricEventRt, rt) + // Not thread-safe, but it's okay. if rt < mb.minRt { mb.minRt = rt } } -func (mb *MetricBucket) Reset() { - for i := 0; i < metricEventNum; i++ { - atomic.StoreUint64(&mb.counter[i], 0) - } - mb.minRt = math.MaxUint64 +func (mb *MetricBucket) Rt() uint64 { + return mb.Get(MetricEventRt) } From a0f636a790c236a6561ffa6dee331997d85b1a81 Mon Sep 17 00:00:00 2001 From: ytlou Date: Fri, 12 Jul 2019 23:41:04 +0800 Subject: [PATCH 08/12] Issues 1: Fix sliding windows bug --- core/slots/statistic/data/leap_array.go | 32 ++++++---- core/slots/statistic/data/leap_array_test.go | 20 +++---- core/slots/statistic/data/metric_bucket.go | 7 ++- core/util/triablemutx.go | 17 ++++++ core/util/triablemutx_test.go | 61 ++++++++++++++++++++ 5 files changed, 112 insertions(+), 25 deletions(-) create mode 100644 core/util/triablemutx.go create mode 100644 core/util/triablemutx_test.go diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index a23f5abfc..df02c2372 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -3,8 +3,9 @@ package data import ( "errors" "fmt" + "github.com/sentinel-group/sentinel-golang/core/util" "math" - "sync" + "runtime" "time" ) @@ -22,13 +23,14 @@ func (ww *WindowWrap) isTimeInWindow(timeMillis uint64) bool { return ww.windowStart <= timeMillis && timeMillis < ww.windowStart+uint64(ww.windowLengthInMs) } +// The basic data structure of sliding windows +// type LeapArray struct { windowLengthInMs uint32 sampleCount uint32 intervalInMs uint32 - array []*WindowWrap //实际保存的数据 - - mux sync.Mutex // lock + array []*WindowWrap //实际保存的数据 + mux util.TriableMutex // lock } func (la *LeapArray) CurrentWindow(sw BucketGenerator) (*WindowWrap, error) { @@ -51,18 +53,26 @@ func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator windowStart: windowStart, value: sw.newEmptyBucket(windowStart), } + // must be thread safe, + // some extreme condition,may newer override old empty WindowWrap + // 使用cas, 确保la.array[idx]更新前是nil la.mux.Lock() - la.array[idx] = newWrap + if la.array[idx] == nil { + la.array[idx] = newWrap + } la.mux.Unlock() return la.array[idx], nil } else if windowStart == old.windowStart { return old, nil } else if windowStart > old.windowStart { // reset WindowWrap - la.mux.Lock() - old, _ = sw.resetWindowTo(old, windowStart) - la.mux.Unlock() - return old, nil + if la.mux.TryLock() { + old, _ = sw.resetWindowTo(old, windowStart) + la.mux.Unlock() + return old, nil + } else { + runtime.Gosched() + } } else if windowStart < old.windowStart { // Should not go through here, return nil, errors.New(fmt.Sprintf("provided time timeMillis=%d is already behind old.windowStart=%d", windowStart, old.windowStart)) @@ -118,9 +128,7 @@ type BucketGenerator interface { resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) } -/** - * The implement of sliding window based on struct MetricBucket - */ +// The implement of sliding window based on struct LeapArray type SlidingWindow struct { data *LeapArray BucketType string diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go index 88a7ad93c..aef15c264 100644 --- a/core/slots/statistic/data/leap_array_test.go +++ b/core/slots/statistic/data/leap_array_test.go @@ -13,6 +13,7 @@ const ( intervalInMs_ uint32 = 1000 ) +//Test sliding windows create windows func TestNewWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) time := uint64(time2.Now().UnixNano() / 1e6) @@ -38,6 +39,7 @@ func TestNewWindow(t *testing.T) { } } +// Test the logic get window start time. func TestLeapArrayWindowStart(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) firstTime := uint64(time2.Now().UnixNano() / 1e6) @@ -55,6 +57,7 @@ func TestLeapArrayWindowStart(t *testing.T) { } } +// test sliding window has multi windows func TestWindowAfterOneInterval(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) firstTime := uint64(time2.Now().UnixNano() / 1e6) @@ -146,17 +149,16 @@ func TestWindowAfterOneInterval(t *testing.T) { } func TestNTimeMultiGoroutineUpdateEmptyWindow(t *testing.T) { - for i := 0; i < 10; i++ { - nTestMultiGoroutineUpdateEmptyWindow(t) + for i := 0; i < 1000; i++ { + _nTestMultiGoroutineUpdateEmptyWindow(t) } } -func task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testing.T, ct *uint64, lock *sync.Mutex) { +func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testing.T, ct *uint64) { wr, err := slidingWindow.data.CurrentWindowWithTime(ti, slidingWindow) if err != nil { t.Errorf("Unexcepted error") } - lock.Lock() mb, ok := wr.value.(MetricBucket) if !ok { t.Errorf("Unexcepted error") @@ -166,25 +168,23 @@ func task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testin mb.Add(MetricEventSuccess, 1) mb.Add(MetricEventError, 1) atomic.AddUint64(ct, 1) - lock.Unlock() wg.Done() } -func nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { +func _nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) firstTime := uint64(time2.Now().UnixNano() / 1e6) const GoroutineNum = 10000 wg := &sync.WaitGroup{} - lock := &sync.Mutex{} wg.Add(GoroutineNum) st := time2.Now().UnixNano() - var ct = uint64(0) + var cnt = uint64(0) for i := 0; i < GoroutineNum; i++ { - go task(wg, slidingWindow, firstTime, t, &ct, lock) + go _task(wg, slidingWindow, firstTime, t, &cnt) } wg.Wait() - t.Logf("finish goroutines: %d", atomic.LoadUint64(&ct)) + t.Logf("finish goroutines: %d", atomic.LoadUint64(&cnt)) et := time2.Now().UnixNano() dif := et - st t.Logf("finish all goroutines, cost time is %d", dif) diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go index 00219453c..ca4fedfe3 100644 --- a/core/slots/statistic/data/metric_bucket.go +++ b/core/slots/statistic/data/metric_bucket.go @@ -23,13 +23,14 @@ MetricBucket store the metric statistic of each event */ type MetricBucket struct { // value of statistic - counters [metricEventNum]uint64 + counters []uint64 minRt uint64 } -func newMetricBucket() MetricBucket { +func newEmptyMetricBucket() MetricBucket { mb := MetricBucket{ - minRt: math.MaxUint64, + minRt: math.MaxUint64, + counters: make([]uint64, metricEventNum, metricEventNum), } return mb } diff --git a/core/util/triablemutx.go b/core/util/triablemutx.go new file mode 100644 index 000000000..61d3340ad --- /dev/null +++ b/core/util/triablemutx.go @@ -0,0 +1,17 @@ +package util + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +const mutexLocked = 1 << iota + +type TriableMutex struct { + sync.Mutex +} + +func (tmux *TriableMutex) TryLock() bool { + return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&tmux.Mutex)), 0, mutexLocked) +} diff --git a/core/util/triablemutx_test.go b/core/util/triablemutx_test.go new file mode 100644 index 000000000..8b36dcf82 --- /dev/null +++ b/core/util/triablemutx_test.go @@ -0,0 +1,61 @@ +package util + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestTriableMutex_TryLock(t *testing.T) { + var m TriableMutex + m.Lock() + time.Sleep(time.Second) + fmt.Printf("TryLock: %t\n", m.TryLock()) //false + fmt.Printf("TryLock: %t\n", m.TryLock()) // false + m.Unlock() + fmt.Printf("TryLock: %t\n", m.TryLock()) //true + fmt.Printf("TryLock: %t\n", m.TryLock()) //false + m.Unlock() + fmt.Printf("TryLock: %t\n", m.TryLock()) //true + m.Unlock() +} + +func iiiTestConcurrent100() { + var m TriableMutex + cnt := int32(0) + wg := &sync.WaitGroup{} + wg.Add(1000) + for i := 0; i < 1000; i++ { + ci := i + go func(tm TriableMutex, wg_ *sync.WaitGroup, i_ int, cntPtr *int32) { + for { + if tm.TryLock() { + atomic.AddInt32(cntPtr, 1) + tm.Unlock() + wg_.Done() + break + } else { + runtime.Gosched() + } + } + }(m, wg, ci, &cnt) + } + wg.Wait() + //fmt.Println("count=", cnt) + if cnt != 1000 { + fmt.Println("count error") + } +} + +func TestTriableMutex_TryLock_Concurrent1000(t *testing.T) { + iiiTestConcurrent100() +} + +func BenchmarkTriableMutex_TryLock(b *testing.B) { + for n := 0; n < b.N; n++ { + iiiTestConcurrent100() + } +} From 40fa06f87795d2a757059d8d7fcc023a75f96256 Mon Sep 17 00:00:00 2001 From: ytlou Date: Sat, 13 Jul 2019 11:20:34 +0800 Subject: [PATCH 09/12] Issues 1: refactor sliding window add and count logic --- core/slots/statistic/data/leap_array.go | 45 +++---------------------- 1 file changed, 4 insertions(+), 41 deletions(-) diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index df02c2372..75bc2c2b7 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -161,7 +161,7 @@ func (sw *SlidingWindow) resetWindowTo(ww *WindowWrap, startTime uint64) (*Windo return ww, nil } -func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { +func (sw *SlidingWindow) Count(event MetricEventType) uint64 { _, err := sw.data.CurrentWindow(sw) if err != nil { fmt.Println("sliding window fail to record success") @@ -173,31 +173,12 @@ func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { fmt.Println("assert fail") continue } - cn := uint64(0) - var ce error - switch eventType { - case MetricEventSuccess: - cn = mb.Get(MetricEventSuccess) - case MetricEventPass: - cn = mb.Get(MetricEventPass) - case MetricEventError: - cn = mb.Get(MetricEventError) - case MetricEventBlock: - cn = mb.Get(MetricEventBlock) - case MetricEventRt: - cn = mb.Get(MetricEventRt) - default: - ce = errors.New("unknown metric type! ") - } - if ce != nil { - fmt.Println("fail to count, reason: ", ce) - } - count += cn + count += mb.Get(event) } return count } -func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { +func (sw *SlidingWindow) AddCount(event MetricEventType, count uint64) { curWindow, err := sw.data.CurrentWindow(sw) if err != nil || curWindow == nil || curWindow.value == nil { fmt.Println("sliding window fail to record success") @@ -209,25 +190,7 @@ func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { fmt.Println("assert fail") return } - - var ae error - switch eventType { - case MetricEventSuccess: - mb.Add(MetricEventSuccess, count) - case MetricEventPass: - mb.Add(MetricEventPass, count) - case MetricEventError: - mb.Add(MetricEventError, count) - case MetricEventBlock: - mb.Add(MetricEventBlock, count) - case MetricEventRt: - mb.Add(MetricEventRt, count) - default: - errors.New("unknown metric type ") - } - if ae != nil { - fmt.Println("add success counter fail, reason: ", ae) - } + mb.Add(event, count) } func (sw *SlidingWindow) MaxSuccess() uint64 { From 25c29818d64876f75701e0b386606a573b888b2b Mon Sep 17 00:00:00 2001 From: ytlou Date: Sat, 13 Jul 2019 11:36:22 +0800 Subject: [PATCH 10/12] Issues 1: refactor get milli time --- core/slots/base/default_node.go | 4 ++-- core/slots/flow/flow_control.go | 5 +++-- core/slots/statistic/data/leap_array.go | 7 +++---- core/slots/statistic/data/leap_array_test.go | 9 +++++---- core/util/time.go | 17 +++++++++++++++++ 5 files changed, 30 insertions(+), 12 deletions(-) create mode 100644 core/util/time.go diff --git a/core/slots/base/default_node.go b/core/slots/base/default_node.go index 0ee9a45ff..ce977d891 100644 --- a/core/slots/base/default_node.go +++ b/core/slots/base/default_node.go @@ -3,8 +3,8 @@ package base import ( "github.com/sentinel-group/sentinel-golang/core/node" "github.com/sentinel-group/sentinel-golang/core/slots/statistic/data" + "github.com/sentinel-group/sentinel-golang/core/util" "sync/atomic" - "time" ) const ( @@ -26,7 +26,7 @@ func NewDefaultNode(wrapper *ResourceWrapper) *DefaultNode { rollingCounterInSecond: data.NewSlidingWindow(sampleCount_, intervalInMs_), rollingCounterInMinute: data.NewSlidingWindow(sampleCount_, intervalInMs_), currentGoroutineNum: 0, - lastFetchTime: uint64(time.Now().UnixNano() / (1e6)), + lastFetchTime: util.GetTimeMilli(), resourceWrapper: wrapper, } } diff --git a/core/slots/flow/flow_control.go b/core/slots/flow/flow_control.go index 43040f31c..b2a3ad86b 100644 --- a/core/slots/flow/flow_control.go +++ b/core/slots/flow/flow_control.go @@ -2,6 +2,7 @@ package flow import ( "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/util" "math" "sync/atomic" "time" @@ -46,7 +47,7 @@ func (rlc *RateLimiterController) CanPass(ctx *base.Context, node *base.DefaultN } // Reject when count is less or equal than 0. // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. - currentTime := int64(time.Now().UnixNano()) / 1e6 + currentTime := int64(util.GetTimeMilli()) // Calculate the interval between every two requests. costTime := int64(math.Round(float64(uint64(acquire)/rlc.count) * 1000)) // Expected pass time of this request. @@ -58,7 +59,7 @@ func (rlc *RateLimiterController) CanPass(ctx *base.Context, node *base.DefaultN return true } else { // Calculate the time to wait. - waitTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) - int64(time.Now().UnixNano())/1e6 + waitTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) - int64(util.GetTimeMilli()) if waitTime > rlc.maxQueueingTimeMs { atomic.AddInt64(&rlc.latestPassedTime, -costTime) return false diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index 75bc2c2b7..998ea5a87 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -6,7 +6,6 @@ import ( "github.com/sentinel-group/sentinel-golang/core/util" "math" "runtime" - "time" ) type WindowWrap struct { @@ -34,7 +33,7 @@ type LeapArray struct { } func (la *LeapArray) CurrentWindow(sw BucketGenerator) (*WindowWrap, error) { - return la.CurrentWindowWithTime(uint64(time.Now().UnixNano())/1e6, sw) + return la.CurrentWindowWithTime(util.GetTimeMilli(), sw) } func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator) (*WindowWrap, error) { @@ -91,7 +90,7 @@ func (la *LeapArray) calculateStartTime(timeMillis uint64) uint64 { // Get all the bucket in sliding window for current time; func (la *LeapArray) Values() []*WindowWrap { - return la.valuesWithTime(uint64(time.Now().UnixNano()) / 1e6) + return la.valuesWithTime(util.GetTimeMilli()) } func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { @@ -104,7 +103,7 @@ func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { //fmt.Printf("current bucket is nil, index is %d \n", idx) wwp_ = &WindowWrap{ windowLengthInMs: 200, - windowStart: uint64(time.Now().UnixNano() / 1e6), + windowStart: util.GetTimeMilli(), value: newEmptyMetricBucket(), } wwp = append(wwp, wwp_) diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go index aef15c264..0e40a1e55 100644 --- a/core/slots/statistic/data/leap_array_test.go +++ b/core/slots/statistic/data/leap_array_test.go @@ -1,6 +1,7 @@ package data import ( + "github.com/sentinel-group/sentinel-golang/core/util" "sync" "sync/atomic" "testing" @@ -16,7 +17,7 @@ const ( //Test sliding windows create windows func TestNewWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - time := uint64(time2.Now().UnixNano() / 1e6) + time := util.GetTimeMilli() wr, err := slidingWindow.data.CurrentWindowWithTime(time, slidingWindow) if wr == nil { @@ -42,7 +43,7 @@ func TestNewWindow(t *testing.T) { // Test the logic get window start time. func TestLeapArrayWindowStart(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := util.GetTimeMilli() previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) @@ -60,7 +61,7 @@ func TestLeapArrayWindowStart(t *testing.T) { // test sliding window has multi windows func TestWindowAfterOneInterval(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := util.GetTimeMilli() previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) @@ -173,7 +174,7 @@ func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testi func _nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := util.GetTimeMilli() const GoroutineNum = 10000 wg := &sync.WaitGroup{} diff --git a/core/util/time.go b/core/util/time.go new file mode 100644 index 000000000..36d145b33 --- /dev/null +++ b/core/util/time.go @@ -0,0 +1,17 @@ +package util + +import "time" + +func GetTimeMilli() uint64 { + return uint64(time.Now().UnixNano() / 1e6) +} + +//noinspection GoUnusedExportedFunction +func GetTimeNano() uint64 { + return uint64(time.Now().UnixNano()) +} + +//noinspection GoUnusedExportedFunction +func GetTimeSecond() uint64 { + return uint64(time.Now().Unix()) +} From 5726a2de17538306f7aab5677ebc2064f9c659a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8E=E6=B5=B7=E6=B4=8B?= Date: Sat, 13 Jul 2019 14:37:32 +0800 Subject: [PATCH 11/12] =?UTF-8?q?MetricBucket=20=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=E6=95=B0=E7=BB=84=E5=AD=98=E6=94=BE=E6=95=B0=E6=8D=AE=E3=80=82?= =?UTF-8?q?newMetricBucket()=20=E8=BF=94=E5=9B=9E=E6=8C=87=E9=92=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/slots/statistic/data/leap_array.go | 8 +++--- core/slots/statistic/data/leap_array_test.go | 28 ++++++++++---------- core/slots/statistic/data/metric_bucket.go | 9 +++---- core/util/time.go | 23 ++++++++++++++++ 4 files changed, 45 insertions(+), 23 deletions(-) create mode 100644 core/util/time.go diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index df02c2372..c0752867f 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -168,7 +168,7 @@ func (sw *SlidingWindow) Count(eventType MetricEventType) uint64 { } count := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.value.(MetricBucket) + mb, ok := ww.value.(*MetricBucket) if !ok { fmt.Println("assert fail") continue @@ -204,7 +204,7 @@ func (sw *SlidingWindow) AddCount(eventType MetricEventType, count uint64) { return } - mb, ok := curWindow.value.(MetricBucket) + mb, ok := curWindow.value.(*MetricBucket) if !ok { fmt.Println("assert fail") return @@ -239,7 +239,7 @@ func (sw *SlidingWindow) MaxSuccess() uint64 { succ := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.value.(MetricBucket) + mb, ok := ww.value.(*MetricBucket) if !ok { fmt.Println("assert fail") continue @@ -262,7 +262,7 @@ func (sw *SlidingWindow) MinSuccess() uint64 { succ := uint64(0) for _, ww := range sw.data.Values() { - mb, ok := ww.value.(MetricBucket) + mb, ok := ww.value.(*MetricBucket) if !ok { fmt.Println("assert fail") continue diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go index aef15c264..517380938 100644 --- a/core/slots/statistic/data/leap_array_test.go +++ b/core/slots/statistic/data/leap_array_test.go @@ -1,10 +1,10 @@ package data import ( + "github.com/sentinel-group/sentinel-golang/core/util" "sync" "sync/atomic" "testing" - time2 "time" ) const ( @@ -16,7 +16,7 @@ const ( //Test sliding windows create windows func TestNewWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - time := uint64(time2.Now().UnixNano() / 1e6) + time := uint64(util.GetTimeMilli()) wr, err := slidingWindow.data.CurrentWindowWithTime(time, slidingWindow) if wr == nil { @@ -42,7 +42,7 @@ func TestNewWindow(t *testing.T) { // Test the logic get window start time. func TestLeapArrayWindowStart(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := uint64(util.GetTimeMilli()) previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) @@ -60,7 +60,7 @@ func TestLeapArrayWindowStart(t *testing.T) { // test sliding window has multi windows func TestWindowAfterOneInterval(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := util.GetTimeMilli() previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) @@ -76,7 +76,7 @@ func TestWindowAfterOneInterval(t *testing.T) { if wr.value == nil { t.Errorf("Unexcepted error") } - mb, ok := wr.value.(MetricBucket) + mb, ok := wr.value.(*MetricBucket) if !ok { t.Errorf("Unexcepted error") } @@ -106,7 +106,7 @@ func TestWindowAfterOneInterval(t *testing.T) { if wr2.windowStart != previousWindowStart { t.Errorf("Unexpected error, winStart is not same") } - mb2, ok := wr2.value.(MetricBucket) + mb2, ok := wr2.value.(*MetricBucket) if !ok { t.Errorf("Unexcepted error") } @@ -132,7 +132,7 @@ func TestWindowAfterOneInterval(t *testing.T) { if (wr3.windowStart - uint64(windowLengthImMs_)) != previousWindowStart { t.Errorf("Unexpected error") } - mb3, ok := wr3.value.(MetricBucket) + mb3, ok := wr3.value.(*MetricBucket) if !ok { t.Errorf("Unexcepted error") } @@ -159,7 +159,7 @@ func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testi if err != nil { t.Errorf("Unexcepted error") } - mb, ok := wr.value.(MetricBucket) + mb, ok := wr.value.(*MetricBucket) if !ok { t.Errorf("Unexcepted error") } @@ -173,26 +173,26 @@ func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testi func _nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) - firstTime := uint64(time2.Now().UnixNano() / 1e6) + firstTime := util.GetTimeMilli() - const GoroutineNum = 10000 + const GoroutineNum = 1000 wg := &sync.WaitGroup{} wg.Add(GoroutineNum) - st := time2.Now().UnixNano() + st := util.GetTimeNano() var cnt = uint64(0) for i := 0; i < GoroutineNum; i++ { go _task(wg, slidingWindow, firstTime, t, &cnt) } wg.Wait() t.Logf("finish goroutines: %d", atomic.LoadUint64(&cnt)) - et := time2.Now().UnixNano() + et := util.GetTimeNano() dif := et - st - t.Logf("finish all goroutines, cost time is %d", dif) + t.Logf("finish all goroutines, cost time is %d ns", dif) wr2, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error") } - mb2, ok := wr2.value.(MetricBucket) + mb2, ok := wr2.value.(*MetricBucket) if !ok { t.Errorf("Unexcepted error") } diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go index ca4fedfe3..58d3155b3 100644 --- a/core/slots/statistic/data/metric_bucket.go +++ b/core/slots/statistic/data/metric_bucket.go @@ -23,14 +23,13 @@ MetricBucket store the metric statistic of each event */ type MetricBucket struct { // value of statistic - counters []uint64 + counters [metricEventNum]uint64 minRt uint64 } -func newEmptyMetricBucket() MetricBucket { - mb := MetricBucket{ - minRt: math.MaxUint64, - counters: make([]uint64, metricEventNum, metricEventNum), +func newEmptyMetricBucket() *MetricBucket { + mb := &MetricBucket{ + minRt: math.MaxUint64, } return mb } diff --git a/core/util/time.go b/core/util/time.go new file mode 100644 index 000000000..6d1e680d4 --- /dev/null +++ b/core/util/time.go @@ -0,0 +1,23 @@ +/** + * @description: + * + * @author: helloworld + * @date:2019-07-13 + */ +package util + +import "time" + +func GetTimeMilli() uint64 { + return uint64(time.Now().UnixNano() / 1e6) +} + +//noinspection GoUnusedExportedFunction +func GetTimeNano() uint64 { + return uint64(time.Now().UnixNano()) +} + +//noinspection GoUnusedExportedFunction +func GetTimeSecond() uint64 { + return uint64(time.Now().Unix()) +} From 489ab099888e9c8c395855efc2c6458544a378a3 Mon Sep 17 00:00:00 2001 From: ytlou Date: Sat, 13 Jul 2019 15:32:01 +0800 Subject: [PATCH 12/12] Issues 1: refactor CurrentWindowWithTime function --- core/slots/flow/flow_rule.go | 2 +- core/slots/statistic/data/leap_array.go | 37 +++++++++------- core/slots/statistic/data/leap_array_test.go | 44 ++++++++++---------- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/core/slots/flow/flow_rule.go b/core/slots/flow/flow_rule.go index 77392f2dd..6ffa74082 100644 --- a/core/slots/flow/flow_rule.go +++ b/core/slots/flow/flow_rule.go @@ -8,7 +8,7 @@ import ( const ( LimitAppDefault = "default" - LIMIT_APP_OTHER = "other" + LimitAppOther = "other" ResourceNameDefault = "default" ) diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go index ede269ca6..c3bafbcb1 100644 --- a/core/slots/statistic/data/leap_array.go +++ b/core/slots/statistic/data/leap_array.go @@ -55,12 +55,19 @@ func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator // must be thread safe, // some extreme condition,may newer override old empty WindowWrap // 使用cas, 确保la.array[idx]更新前是nil - la.mux.Lock() - if la.array[idx] == nil { + //la.mux.Lock() + //if la.array[idx] == nil { + // la.array[idx] = newWrap + //} + //la.mux.Unlock() + //return la.array[idx], nil + if la.mux.TryLock() && la.array[idx] == nil { la.array[idx] = newWrap + la.mux.Unlock() + return la.array[idx], nil + } else { + runtime.Gosched() } - la.mux.Unlock() - return la.array[idx], nil } else if windowStart == old.windowStart { return old, nil } else if windowStart > old.windowStart { @@ -98,23 +105,23 @@ func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { return nil } wwp := make([]*WindowWrap, 0) - for _, wwp_ := range la.array { - if wwp_ == nil { + for _, wwPtr := range la.array { + if wwPtr == nil { //fmt.Printf("current bucket is nil, index is %d \n", idx) - wwp_ = &WindowWrap{ + wwPtr = &WindowWrap{ windowLengthInMs: 200, windowStart: util.GetTimeMilli(), value: newEmptyMetricBucket(), } - wwp = append(wwp, wwp_) + wwp = append(wwp, wwPtr) continue } - ww := &WindowWrap{ - windowLengthInMs: wwp_.windowLengthInMs, - windowStart: wwp_.windowStart, - value: wwp_.value, + newWW := &WindowWrap{ + windowLengthInMs: wwPtr.windowLengthInMs, + windowStart: wwPtr.windowStart, + value: wwPtr.value, } - wwp = append(wwp, ww) + wwp = append(wwp, newWW) } return wwp } @@ -138,13 +145,13 @@ func NewSlidingWindow(sampleCount uint32, intervalInMs uint32) *SlidingWindow { panic(fmt.Sprintf("invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) } winLengthInMs := intervalInMs / sampleCount - array_ := make([]*WindowWrap, 5) + arr := make([]*WindowWrap, 5) return &SlidingWindow{ data: &LeapArray{ windowLengthInMs: winLengthInMs, sampleCount: sampleCount, intervalInMs: intervalInMs, - array: array_, + array: arr, }, BucketType: "metrics", } diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go index ce5250309..7c285406a 100644 --- a/core/slots/statistic/data/leap_array_test.go +++ b/core/slots/statistic/data/leap_array_test.go @@ -8,14 +8,14 @@ import ( ) const ( - windowLengthImMs_ uint32 = 200 - sampleCount_ uint32 = 5 - intervalInMs_ uint32 = 1000 + WindowLengthImMs uint32 = 200 + SampleCount uint32 = 5 + IntervalInMs uint32 = 1000 ) //Test sliding windows create windows func TestNewWindow(t *testing.T) { - slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) time := util.GetTimeMilli() wr, err := slidingWindow.data.CurrentWindowWithTime(time, slidingWindow) @@ -25,10 +25,10 @@ func TestNewWindow(t *testing.T) { if err != nil { t.Errorf("Unexcepted error") } - if wr.windowLengthInMs != windowLengthImMs_ { + if wr.windowLengthInMs != WindowLengthImMs { t.Errorf("Unexcepted error, winlength is not same") } - if wr.windowStart != (time - time%uint64(windowLengthImMs_)) { + if wr.windowStart != (time - time%uint64(WindowLengthImMs)) { t.Errorf("Unexcepted error, winlength is not same") } if wr.value == nil { @@ -41,15 +41,15 @@ func TestNewWindow(t *testing.T) { // Test the logic get window start time. func TestLeapArrayWindowStart(t *testing.T) { - slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) firstTime := util.GetTimeMilli() - previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) + previousWindowStart := firstTime - firstTime%uint64(WindowLengthImMs) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error") } - if wr.windowLengthInMs != windowLengthImMs_ { + if wr.windowLengthInMs != WindowLengthImMs { t.Errorf("Unexpected error, winLength is not same") } if wr.windowStart != previousWindowStart { @@ -59,15 +59,15 @@ func TestLeapArrayWindowStart(t *testing.T) { // test sliding window has multi windows func TestWindowAfterOneInterval(t *testing.T) { - slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) firstTime := util.GetTimeMilli() - previousWindowStart := firstTime - firstTime%uint64(windowLengthImMs_) + previousWindowStart := firstTime - firstTime%uint64(WindowLengthImMs) wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error") } - if wr.windowLengthInMs != windowLengthImMs_ { + if wr.windowLengthInMs != WindowLengthImMs { t.Errorf("Unexpected error, winLength is not same") } if wr.windowStart != previousWindowStart { @@ -98,7 +98,7 @@ func TestWindowAfterOneInterval(t *testing.T) { t.Errorf("Unexcepted error") } - middleTime := previousWindowStart + uint64(windowLengthImMs_)/2 + middleTime := previousWindowStart + uint64(WindowLengthImMs)/2 wr2, err := slidingWindow.data.CurrentWindowWithTime(middleTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error") @@ -121,15 +121,15 @@ func TestWindowAfterOneInterval(t *testing.T) { t.Errorf("Unexcepted error") } - lastTime := middleTime + uint64(windowLengthImMs_)/2 + lastTime := middleTime + uint64(WindowLengthImMs)/2 wr3, err := slidingWindow.data.CurrentWindowWithTime(lastTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error") } - if wr3.windowLengthInMs != windowLengthImMs_ { + if wr3.windowLengthInMs != WindowLengthImMs { t.Errorf("Unexpected error") } - if (wr3.windowStart - uint64(windowLengthImMs_)) != previousWindowStart { + if (wr3.windowStart - uint64(WindowLengthImMs)) != previousWindowStart { t.Errorf("Unexpected error") } mb3, ok := wr3.value.(*MetricBucket) @@ -172,22 +172,22 @@ func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testi } func _nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { - slidingWindow := NewSlidingWindow(sampleCount_, intervalInMs_) + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) firstTime := util.GetTimeMilli() const GoroutineNum = 1000 wg := &sync.WaitGroup{} wg.Add(GoroutineNum) - st := util.GetTimeNano() + //st := util.GetTimeNano() var cnt = uint64(0) for i := 0; i < GoroutineNum; i++ { go _task(wg, slidingWindow, firstTime, t, &cnt) } wg.Wait() - t.Logf("finish goroutines: %d", atomic.LoadUint64(&cnt)) - et := util.GetTimeNano() - dif := et - st - t.Logf("finish all goroutines, cost time is %d ns", dif) + //t.Logf("finish goroutines: %d", atomic.LoadUint64(&cnt)) + //et := util.GetTimeNano() + //dif := et - st + //t.Logf("finish all goroutines, cost time is %d ns", dif) wr2, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) if err != nil { t.Errorf("Unexcepted error")