diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..62c893550 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 000000000..8fc0e3423 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# 项目简介 +## 目录划分: + core : 核心实现 + example : 示例代码 + + diff --git a/core/node/metric_node.go b/core/node/metric_node.go new file mode 100644 index 000000000..aa76b8fb7 --- /dev/null +++ b/core/node/metric_node.go @@ -0,0 +1,11 @@ +package node + +type MetricNode struct { + Timestamp uint64 + PassQps uint64 + BlockQps uint64 + SuccessQps uint64 + ErrorQps uint64 + Rt uint64 + Resource string +} diff --git a/core/node/node.go b/core/node/node.go new file mode 100644 index 000000000..d55c27741 --- /dev/null +++ b/core/node/node.go @@ -0,0 +1,34 @@ +package node + +type Node interface { + //TotalRequest() uint64 + //TotalPass() uint64 + TotalSuccess() uint64 + //BlockRequest() uint64 + //TotalError() uint64 + //PassQps() uint64 + //BlockQps() uint64 + //TotalQps() uint64 + //SuccessQps() uint64 + //MaxSuccessQps() uint64 + //ErrorQps() uint64 + //AvgRt() float32 + //MinRt() float32 + //CurGoroutineNum() uint64 + + //PreviousBlockQps() uint64 + //PreviousPassQps() uint64 + // + //Metrics() map[uint64]*metrics.MetricNode + + AddPassRequest(count uint32) + //AddRtAndSuccess(rt uint64, success uint32) + // + //IncreaseBlockQps(count uint32) + //IncreaseErrorQps(count uint32) + // + //IncreaseGoroutineNum() + //DecreaseGoroutineNum() + + //Reset() +} diff --git a/core/property/property.go b/core/property/property.go new file mode 100644 index 000000000..9079f3b30 --- /dev/null +++ b/core/property/property.go @@ -0,0 +1,8 @@ +package property + +//PropertyListener +type PropertyListener interface { + ConfigUpdate(value interface{}) + + ConfigLoad(value interface{}) +} diff --git a/core/slots/base/base.go b/core/slots/base/base.go new file mode 100644 index 000000000..426dbaa08 --- /dev/null +++ b/core/slots/base/base.go @@ -0,0 +1,6 @@ +package base + +const ( + INBOUND = iota + OUTBOUND +) diff --git a/core/slots/base/context.go b/core/slots/base/context.go new file mode 100644 index 000000000..51eb5cb3c --- /dev/null +++ b/core/slots/base/context.go @@ -0,0 +1,13 @@ +package base + +import ( + "context" +) + +type Context struct { + name string + entranceNode DefaultNode + curEntry Entry + origin string + context context.Context +} diff --git a/core/slots/base/default_node.go b/core/slots/base/default_node.go new file mode 100644 index 000000000..ce977d891 --- /dev/null +++ b/core/slots/base/default_node.go @@ -0,0 +1,93 @@ +package base + +import ( + "github.com/sentinel-group/sentinel-golang/core/node" + "github.com/sentinel-group/sentinel-golang/core/slots/statistic/data" + "github.com/sentinel-group/sentinel-golang/core/util" + "sync/atomic" +) + +const ( + windowLengthImMs_ uint32 = 200 + sampleCount_ uint32 = 5 + intervalInMs_ uint32 = 1000 +) + +type DefaultNode struct { + rollingCounterInSecond *data.SlidingWindow + rollingCounterInMinute *data.SlidingWindow + currentGoroutineNum uint32 + lastFetchTime uint64 + resourceWrapper *ResourceWrapper +} + +func NewDefaultNode(wrapper *ResourceWrapper) *DefaultNode { + return &DefaultNode{ + rollingCounterInSecond: data.NewSlidingWindow(sampleCount_, intervalInMs_), + rollingCounterInMinute: data.NewSlidingWindow(sampleCount_, intervalInMs_), + currentGoroutineNum: 0, + lastFetchTime: util.GetTimeMilli(), + resourceWrapper: wrapper, + } +} + +func (dn *DefaultNode) AddPass(count uint64) { + dn.rollingCounterInSecond.AddCount(data.MetricEventPass, count) +} + +func (dn *DefaultNode) AddGoroutineNum(count uint32) { + atomic.AddUint32(&dn.currentGoroutineNum, count) +} + +func (dn *DefaultNode) TotalRequest() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventPass) + dn.rollingCounterInSecond.Count(data.MetricEventBlock) +} +func (dn *DefaultNode) TotalPass() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventPass) +} +func (dn *DefaultNode) TotalSuccess() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventSuccess) +} +func (dn *DefaultNode) BlockRequest() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventBlock) +} +func (dn *DefaultNode) TotalError() uint64 { + return dn.rollingCounterInMinute.Count(data.MetricEventError) +} +func (dn *DefaultNode) PassQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventPass) / uint64(intervalInMs_) +} +func (dn *DefaultNode) BlockQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventBlock) / uint64(intervalInMs_) +} +func (dn *DefaultNode) TotalQps() uint64 { + return dn.PassQps() + dn.BlockQps() +} +func (dn *DefaultNode) SuccessQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventSuccess) / uint64(intervalInMs_) +} +func (dn *DefaultNode) MaxSuccessQps() uint64 { + return dn.rollingCounterInSecond.MaxSuccess() * uint64(sampleCount_) +} +func (dn *DefaultNode) ErrorQps() uint64 { + return dn.rollingCounterInSecond.Count(data.MetricEventError) / uint64(intervalInMs_) +} + +func (dn *DefaultNode) AvgRt() float32 { + return 0 +} +func (dn *DefaultNode) MinRt() float32 { + return 0 +} +func (dn *DefaultNode) CurGoroutineNum() uint64 { + return 0 +} +func (dn *DefaultNode) PreviousBlockQps() uint64 { + return 0 +} +func (dn *DefaultNode) PreviousPassQps() uint64 { + return 0 +} +func (dn *DefaultNode) Metrics() map[uint64]*node.MetricNode { + return nil +} diff --git a/core/slots/base/entry.go b/core/slots/base/entry.go new file mode 100644 index 000000000..66ada10ff --- /dev/null +++ b/core/slots/base/entry.go @@ -0,0 +1,13 @@ +package base + +import ( + "github.com/sentinel-group/sentinel-golang/core/node" +) + +type Entry struct { + createTime uint64 + originNode node.Node + currentNode node.Node + resourceWrap *ResourceWrapper + err error +} diff --git a/core/slots/base/result.go b/core/slots/base/result.go new file mode 100644 index 000000000..9b39ffbe8 --- /dev/null +++ b/core/slots/base/result.go @@ -0,0 +1,46 @@ +/** + * @description: + * + * @author: helloworld + * @date:2019-07-11 + */ +package base + +type ResourceWrapper struct { + // unique resource name + ResourceName string + // + ResourceType int +} + +type SlotResultStatus int8 + +const ( + ResultStatusPass = iota + ResultStatusBlocked + ResultStatusWait + ResultStatusError +) + +type TokenResult struct { + Status SlotResultStatus + BlockedReason string + WaitMs uint64 + ErrorMsg string +} + +func NewSlotResultPass() *TokenResult { + return &TokenResult{Status: ResultStatusPass} +} + +func NewSlotResultBlock(blockedReason string) *TokenResult { + return &TokenResult{Status: ResultStatusBlocked, BlockedReason: blockedReason} +} + +func NewSlotResultWait(waitMs uint64) *TokenResult { + return &TokenResult{Status: ResultStatusWait, WaitMs: waitMs} +} + +func NewSlotResultError(errorMsg string) *TokenResult { + return &TokenResult{Status: ResultStatusError, ErrorMsg: errorMsg} +} diff --git a/core/slots/chain/slot.go b/core/slots/chain/slot.go new file mode 100644 index 000000000..7c33c4916 --- /dev/null +++ b/core/slots/chain/slot.go @@ -0,0 +1,66 @@ +package chain + +import ( + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +// a solt +type Slot interface { + /** + * Entrance of this slots. + */ + Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) + + Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error + + // 传递进入 + FireEntry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) + + // 传递退出 + FireExit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error + + GetNext() Slot + + SetNext(next Slot) +} + +// a slot can make slot compose linked +type LinkedSlot struct { + // next linkedSlot + next Slot +} + +// 传递退出 +func (s *LinkedSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +// 传递进入 +func (s *LinkedSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return s.FireExit(context, resourceWrapper, count) +} + +// 传递进入, 没有下一个就返回 ResultStatusPass +func (s *LinkedSlot) FireEntry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + if s.next != nil { + return s.next.Entry(context, resourceWrapper, defaultNode, count, prioritized) + } + return base.NewSlotResultPass(), nil +} + +// 传递退出,没有下一个就返回 +func (s *LinkedSlot) FireExit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + if s.next != nil { + return s.next.Exit(context, resourceWrapper, count) + } else { + return nil + } +} + +func (s *LinkedSlot) GetNext() Slot { + return s.next +} + +func (s *LinkedSlot) SetNext(next Slot) { + s.next = next +} diff --git a/core/slots/chain/slot_chain.go b/core/slots/chain/slot_chain.go new file mode 100644 index 000000000..1489e6699 --- /dev/null +++ b/core/slots/chain/slot_chain.go @@ -0,0 +1,63 @@ +package chain + +import ( + "github.com/sentinel-group/sentinel-golang/core/slots/base" +) + +type SlotChain interface { + /** + * Add a processor to the head of this slots slotchain. + * + * @param protocolProcessor processor to be added. + */ + AddFirst(slot Slot) + + /** + * Add a processor to the tail of this slots slotchain. + * + * @param protocolProcessor processor to be added. + */ + AddLast(slot Slot) + + // fire to next slot + Entry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) + // fire to next slot + Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error +} + +// implent SlotChain +type LinkedSlotChain struct { + first Slot + end Slot +} + +func NewLinkedSlotChain() *LinkedSlotChain { + fs := new(LinkedSlot) + return &LinkedSlotChain{first: fs, end: fs} +} + +func (lsc *LinkedSlotChain) AddFirst(slot Slot) { + slot.SetNext(lsc.first.GetNext()) + lsc.first.SetNext(slot) + if lsc.end == lsc.first { + lsc.end = slot + } +} + +func (lsc *LinkedSlotChain) AddLast(slot Slot) { + lsc.end.SetNext(slot) + lsc.end = slot +} + +func (lsc *LinkedSlotChain) Entry(context *base.Context, resourceWrapper *base.ResourceWrapper, defaultNode *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + return lsc.first.Entry(context, resourceWrapper, defaultNode, count, prioritized) +} + +// 传递进入 +func (lsc *LinkedSlotChain) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return lsc.first.Exit(context, resourceWrapper, count) +} + +type SlotChainBuilder interface { + Build() SlotChain +} diff --git a/core/slots/chain/slot_chain_test.go b/core/slots/chain/slot_chain_test.go new file mode 100644 index 000000000..f49b64c31 --- /dev/null +++ b/core/slots/chain/slot_chain_test.go @@ -0,0 +1,132 @@ +/** + * @description: + * + * @author: helloworld + * @date:2019-07-11 + */ +package chain + +import ( + "errors" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "testing" +) + +// implent slot for unit test + +// 继承 LinkedProessorSlot 并完全实现 Slot +type IncrSlot struct { + LinkedSlot +} + +func (s *IncrSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + count++ + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +func (s *IncrSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + count++ + return s.FireExit(context, resourceWrapper, count) +} + +// 继承 LinkedProessorSlot 并完全实现 Slot +type DecrSlot struct { + LinkedSlot +} + +func (s *DecrSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + count-- + return s.FireEntry(ctx, resWrapper, node, count, prioritized) +} + +func (s *DecrSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + count-- + return s.FireExit(context, resourceWrapper, count) +} + +// 继承 LinkedProessorSlot 并完全实现 Slot +type GreaterZeroPassSlot struct { + num int + LinkedSlot +} + +func (s *GreaterZeroPassSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + if count > s.num { + return s.FireEntry(ctx, resWrapper, node, count, prioritized) + } else { + return base.NewSlotResultBlock("GreaterZeroPassSlot"), nil + } +} + +func (s *GreaterZeroPassSlot) Exit(context *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + if count <= 0 { + return errors.New("GreaterZeroPassSlot") + } + return s.FireExit(context, resourceWrapper, count) +} + +func TestLinkedSlotChain_AddFirst_Pass(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddFirst(new(GreaterZeroPassSlot)) + newChain.AddFirst(new(IncrSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusPass { + t.Fatal("TestLinkedSlotChain_AddFirst_Block") + } + err := newChain.Exit(nil, nil, 0) + if err != nil { + t.Fatal(err) + } +} + +func TestLinkedSlotChain_AddFirst_Block(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddFirst(new(IncrSlot)) + newChain.AddFirst(new(GreaterZeroPassSlot)) + newChain.AddFirst(new(DecrSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusBlocked { + t.Fatal("TestLinkedSlotChain_AddFirst_Block") + } + err := newChain.Exit(nil, nil, 0) + if err == nil { + t.Fatal("should has error") + } +} + +func TestLinkedSlotChain_AddLast_Pass(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(GreaterZeroPassSlot)) + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusPass { + t.Fatal("TestLinkedSlotChain_AddLast_Pass") + } + + err := newChain.Exit(nil, nil, 0) + if err != nil { + t.Fatal(err) + } +} + +func TestLinkedSlotChain_AddLast_Block(t *testing.T) { + newChain := NewLinkedSlotChain() + newChain.AddLast(new(IncrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(DecrSlot)) + newChain.AddLast(new(GreaterZeroPassSlot)) + + result, _ := newChain.Entry(nil, nil, nil, 0, false) + if result.Status != base.ResultStatusBlocked { + t.Fatal("TestLinkedSlotChain_AddLast_Block") + } + + err := newChain.Exit(nil, nil, 0) + if err == nil { + t.Fatal("should has error") + } +} diff --git a/core/slots/flow/flow_control.go b/core/slots/flow/flow_control.go new file mode 100644 index 000000000..b2a3ad86b --- /dev/null +++ b/core/slots/flow/flow_control.go @@ -0,0 +1,86 @@ +package flow + +import ( + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/util" + "math" + "sync/atomic" + "time" +) + +type TrafficShapingController interface { + CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool +} + +type DefaultController struct { + grade FlowGradeType + count uint64 +} + +func (dc *DefaultController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { + curCount := dc.avgUsedTokens(node) + if (curCount + uint64(acquire)) > dc.count { + return false + } + return true +} + +func (dc *DefaultController) avgUsedTokens(node *base.DefaultNode) uint64 { + if node == nil { + return 0 + } + if dc.grade == FlowGradeThread { + return node.CurGoroutineNum() + } + return node.PassQps() +} + +type RateLimiterController struct { + count uint64 + maxQueueingTimeMs int64 + latestPassedTime int64 +} + +func (rlc *RateLimiterController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { + if acquire < 0 { + return true + } + // Reject when count is less or equal than 0. + // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. + currentTime := int64(util.GetTimeMilli()) + // Calculate the interval between every two requests. + costTime := int64(math.Round(float64(uint64(acquire)/rlc.count) * 1000)) + // Expected pass time of this request. + expectedTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) + + if expectedTime < currentTime { + // Contention may exist here, but it's okay. + atomic.CompareAndSwapInt64(&rlc.latestPassedTime, rlc.latestPassedTime, currentTime) + return true + } else { + // Calculate the time to wait. + waitTime := costTime + atomic.LoadInt64(&rlc.latestPassedTime) - int64(util.GetTimeMilli()) + if waitTime > rlc.maxQueueingTimeMs { + atomic.AddInt64(&rlc.latestPassedTime, -costTime) + return false + } + if waitTime > 0 { + time.Sleep(time.Duration(waitTime) * time.Millisecond) + } + return true + } +} + +type WarmUpController struct { +} + +func (wpc WarmUpController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} + +type WarmUpRateLimiterController struct { +} + +func (wpc WarmUpRateLimiterController) CanPass(ctx *base.Context, node *base.DefaultNode, acquire uint32) bool { + return true +} diff --git a/core/slots/flow/flow_rule.go b/core/slots/flow/flow_rule.go new file mode 100644 index 000000000..6ffa74082 --- /dev/null +++ b/core/slots/flow/flow_rule.go @@ -0,0 +1,210 @@ +package flow + +import ( + "context" + "fmt" + "github.com/sentinel-group/sentinel-golang/core/node" +) + +const ( + LimitAppDefault = "default" + LimitAppOther = "other" + ResourceNameDefault = "default" +) + +type StrategyType int8 + +const ( + StrategyDirect StrategyType = iota + StrategyRelate + StrategyChain +) + +type ControlBehaviorType int8 + +const ( + ControlBehaviorDefault ControlBehaviorType = iota + ControlBehaviorWarmUp + ControlBehaviorRateLimiter + ControlBehaviorWarmUpRateLimiter +) + +type FlowGradeType int8 + +const ( + FlowGradeThread FlowGradeType = iota + FlowGradeQps +) + +type RuleChecker interface { + passCheck(ctx context.Context, node *node.Node, count int) bool +} + +type rule struct { + resource_ string + limitApp_ string + grade_ FlowGradeType + // Flow control threshold count. + count_ uint64 + strategy_ StrategyType + refResource_ string + controlBehavior_ ControlBehaviorType + warmUpPeriodSec_ int32 + /** + * Max queueing time in rate limiter behavior. + */ + maxQueueingTimeMs_ int32 + controller_ TrafficShapingController +} + +func (r *rule) PassCheck(ctx context.Context, node node.Node, count int) bool { + + return true +} + +func (r *rule) validate() error { + return nil +} + +func newRuleBuilder() *rule { + return &rule{ + resource_: ResourceNameDefault, + limitApp_: LimitAppDefault, + grade_: FlowGradeQps, + count_: 100, + strategy_: StrategyDirect, + refResource_: "", + controlBehavior_: ControlBehaviorRateLimiter, + warmUpPeriodSec_: 10, + maxQueueingTimeMs_: 500, + } +} + +func (r *rule) resource(resource string) *rule { + r.resource_ = resource + return r +} +func (r *rule) limitApp(limitApp string) *rule { + r.limitApp_ = limitApp + return r +} +func (r *rule) grade(grade_ FlowGradeType) *rule { + r.grade_ = grade_ + return r +} +func (r *rule) count(count uint64) *rule { + r.count_ = count + return r +} +func (r *rule) strategy(strategy StrategyType) *rule { + r.strategy_ = strategy + return r +} +func (r *rule) refResource(refResource string) *rule { + r.refResource_ = refResource + return r +} +func (r *rule) controlBehavior(controlBehavior ControlBehaviorType) *rule { + r.controlBehavior_ = controlBehavior + return r +} +func (r *rule) warmUpPeriodSec(warmUpPeriodSec int32) *rule { + r.warmUpPeriodSec_ = warmUpPeriodSec + return r +} +func (r *rule) maxQueueingTimeMs(maxQueueingTimeMs int32) *rule { + r.maxQueueingTimeMs_ = maxQueueingTimeMs + return r +} +func (r *rule) controller(controller TrafficShapingController) *rule { + r.controller_ = controller + return r +} + +type RuleManager struct { + flowRules map[string][]*rule +} + +func NewRuleManager() *RuleManager { + return &RuleManager{ + flowRules: nil, + } +} + +func LoadRules(rm *RuleManager, rules []*rule) { + if len(rules) == 0 { + println("RuleManager| load empty rule ") + return + } + rm.flowRules = buildFlowRuleMap(rules) +} + +// Get a copy of the rules. +func (rm *RuleManager) getAllRule() []*rule { + + ret := make([]*rule, 0) + for _, value := range rm.flowRules { + ret = append(ret, value...) + } + return ret +} + +func (rm *RuleManager) getRuleMap() map[string][]*rule { + return rm.flowRules +} + +func (rm *RuleManager) getRuleBySource(resource string) []*rule { + rules := rm.flowRules[resource] + if len(rules) == 0 { + return nil + } + return rules +} + +func buildFlowRuleMap(rules []*rule) map[string][]*rule { + ret := make(map[string][]*rule) + + for _, r := range rules { + err := r.validate() + if err != nil { + fmt.Printf("validate rule fail, the reason is %s ", err.Error()) + continue + } + r.controller_ = generateFlowControl(r) + srcName := r.resource_ + var slc = ret[srcName] + if slc == nil { + slc = make([]*rule, 0) + } + slc = append(slc, r) + ret[srcName] = slc + } + return ret +} + +func generateFlowControl(r *rule) TrafficShapingController { + if r.grade_ == FlowGradeQps { + switch r.controlBehavior_ { + case ControlBehaviorWarmUp: + return new(WarmUpController) + case ControlBehaviorRateLimiter: + return new(RateLimiterController) + case ControlBehaviorWarmUpRateLimiter: + return new(WarmUpRateLimiterController) + default: + } + } + return new(DefaultController) +} + +// +//type FlowPropertyListener struct { +//} +// +//func (fpl *FlowPropertyListener) ConfigUpdate(value interface{}) { +// +//} +//func (fpl *FlowPropertyListener) ConfigLoad(value interface{}) { +// +//} +// diff --git a/core/slots/flow/flow_slot.go b/core/slots/flow/flow_slot.go new file mode 100644 index 000000000..7f65a37e5 --- /dev/null +++ b/core/slots/flow/flow_slot.go @@ -0,0 +1,48 @@ +package flow + +import ( + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" +) + +type FlowSlot struct { + chain.LinkedSlot + RuleManager *RuleManager +} + +func (fs *FlowSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + // no rule return pass + if fs.RuleManager == nil { + return fs.FireEntry(ctx, resWrapper, node, count, false) + } + rules := fs.RuleManager.getRuleBySource(resWrapper.ResourceName) + if len(rules) == 0 { + return fs.FireEntry(ctx, resWrapper, node, count, false) + } + success := checkFlow(ctx, resWrapper, rules, node, count) + if success { + return fs.FireEntry(ctx, resWrapper, node, count, false) + } else { + return base.NewSlotResultBlock("FlowSlot"), nil + } +} + +func (fs *FlowSlot) Exit(ctx *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return fs.FireExit(ctx, resourceWrapper, count) +} + +func checkFlow(ctx *base.Context, resourceWrap *base.ResourceWrapper, rules []*rule, node *base.DefaultNode, count int) bool { + if rules == nil { + return true + } + for _, rule := range rules { + if !canPass(ctx, resourceWrap, rule, node, uint32(count)) { + return false + } + } + return true +} + +func canPass(ctx *base.Context, resourceWrap *base.ResourceWrapper, rule *rule, node *base.DefaultNode, count uint32) bool { + return rule.controller_.CanPass(ctx, node, count) +} diff --git a/core/slots/statistic/data/leap_array.go b/core/slots/statistic/data/leap_array.go new file mode 100644 index 000000000..c3bafbcb1 --- /dev/null +++ b/core/slots/statistic/data/leap_array.go @@ -0,0 +1,246 @@ +package data + +import ( + "errors" + "fmt" + "github.com/sentinel-group/sentinel-golang/core/util" + "math" + "runtime" +) + +type WindowWrap struct { + windowLengthInMs uint32 + windowStart uint64 + value interface{} +} + +func (ww *WindowWrap) resetTo(startTime uint64) { + ww.windowStart = startTime +} + +func (ww *WindowWrap) isTimeInWindow(timeMillis uint64) bool { + return ww.windowStart <= timeMillis && timeMillis < ww.windowStart+uint64(ww.windowLengthInMs) +} + +// The basic data structure of sliding windows +// +type LeapArray struct { + windowLengthInMs uint32 + sampleCount uint32 + intervalInMs uint32 + array []*WindowWrap //实际保存的数据 + mux util.TriableMutex // lock +} + +func (la *LeapArray) CurrentWindow(sw BucketGenerator) (*WindowWrap, error) { + return la.CurrentWindowWithTime(util.GetTimeMilli(), sw) +} + +func (la *LeapArray) CurrentWindowWithTime(timeMillis uint64, sw BucketGenerator) (*WindowWrap, error) { + if timeMillis < 0 { + return nil, errors.New("timeMillion is less than 0") + } + + idx := la.calculateTimeIdx(timeMillis) + windowStart := la.calculateStartTime(timeMillis) + + for { + old := la.array[idx] + if old == nil { + newWrap := &WindowWrap{ + windowLengthInMs: la.windowLengthInMs, + windowStart: windowStart, + value: sw.newEmptyBucket(windowStart), + } + // must be thread safe, + // some extreme condition,may newer override old empty WindowWrap + // 使用cas, 确保la.array[idx]更新前是nil + //la.mux.Lock() + //if la.array[idx] == nil { + // la.array[idx] = newWrap + //} + //la.mux.Unlock() + //return la.array[idx], nil + if la.mux.TryLock() && la.array[idx] == nil { + la.array[idx] = newWrap + la.mux.Unlock() + return la.array[idx], nil + } else { + runtime.Gosched() + } + } else if windowStart == old.windowStart { + return old, nil + } else if windowStart > old.windowStart { + // reset WindowWrap + if la.mux.TryLock() { + old, _ = sw.resetWindowTo(old, windowStart) + la.mux.Unlock() + return old, nil + } else { + runtime.Gosched() + } + } else if windowStart < old.windowStart { + // Should not go through here, + return nil, errors.New(fmt.Sprintf("provided time timeMillis=%d is already behind old.windowStart=%d", windowStart, old.windowStart)) + } + } +} + +func (la *LeapArray) calculateTimeIdx(timeMillis uint64) uint32 { + timeId := (int)(timeMillis / uint64(la.windowLengthInMs)) + return uint32(timeId % len(la.array)) +} + +func (la *LeapArray) calculateStartTime(timeMillis uint64) uint64 { + return timeMillis - (timeMillis % uint64(la.windowLengthInMs)) +} + +// Get all the bucket in sliding window for current time; +func (la *LeapArray) Values() []*WindowWrap { + return la.valuesWithTime(util.GetTimeMilli()) +} + +func (la *LeapArray) valuesWithTime(timeMillis uint64) []*WindowWrap { + if timeMillis <= 0 { + return nil + } + wwp := make([]*WindowWrap, 0) + for _, wwPtr := range la.array { + if wwPtr == nil { + //fmt.Printf("current bucket is nil, index is %d \n", idx) + wwPtr = &WindowWrap{ + windowLengthInMs: 200, + windowStart: util.GetTimeMilli(), + value: newEmptyMetricBucket(), + } + wwp = append(wwp, wwPtr) + continue + } + newWW := &WindowWrap{ + windowLengthInMs: wwPtr.windowLengthInMs, + windowStart: wwPtr.windowStart, + value: wwPtr.value, + } + wwp = append(wwp, newWW) + } + return wwp +} + +type BucketGenerator interface { + // 根据开始时间,创建一个新的统计bucket, bucket的具体数据结构可以有多个 + newEmptyBucket(startTime uint64) interface{} + + // 将窗口ww重置startTime和空的统计bucket + resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) +} + +// The implement of sliding window based on struct LeapArray +type SlidingWindow struct { + data *LeapArray + BucketType string +} + +func NewSlidingWindow(sampleCount uint32, intervalInMs uint32) *SlidingWindow { + if intervalInMs%sampleCount != 0 { + panic(fmt.Sprintf("invalid parameters, intervalInMs is %d, sampleCount is %d.", intervalInMs, sampleCount)) + } + winLengthInMs := intervalInMs / sampleCount + arr := make([]*WindowWrap, 5) + return &SlidingWindow{ + data: &LeapArray{ + windowLengthInMs: winLengthInMs, + sampleCount: sampleCount, + intervalInMs: intervalInMs, + array: arr, + }, + BucketType: "metrics", + } +} + +func (sw *SlidingWindow) newEmptyBucket(startTime uint64) interface{} { + return newEmptyMetricBucket() +} + +func (sw *SlidingWindow) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { + ww.windowStart = startTime + ww.value = newEmptyMetricBucket() + return ww, nil +} + +func (sw *SlidingWindow) Count(event MetricEventType) uint64 { + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + count := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.value.(*MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + count += mb.Get(event) + } + return count +} + +func (sw *SlidingWindow) AddCount(event MetricEventType, count uint64) { + curWindow, err := sw.data.CurrentWindow(sw) + if err != nil || curWindow == nil || curWindow.value == nil { + fmt.Println("sliding window fail to record success") + return + } + + mb, ok := curWindow.value.(*MetricBucket) + if !ok { + fmt.Println("assert fail") + return + } + mb.Add(event, count) +} + +func (sw *SlidingWindow) MaxSuccess() uint64 { + + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + + succ := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.value.(*MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + s := mb.Get(MetricEventSuccess) + if err != nil { + fmt.Println("get success counter fail, reason: ", err) + } + succ = uint64(math.Max(float64(succ), float64(s))) + } + return succ +} + +func (sw *SlidingWindow) MinSuccess() uint64 { + + _, err := sw.data.CurrentWindow(sw) + if err != nil { + fmt.Println("sliding window fail to record success") + } + + succ := uint64(0) + for _, ww := range sw.data.Values() { + mb, ok := ww.value.(*MetricBucket) + if !ok { + fmt.Println("assert fail") + continue + } + s := mb.Get(MetricEventSuccess) + if err != nil { + fmt.Println("get success counter fail, reason: ", err) + } + succ = uint64(math.Min(float64(succ), float64(s))) + } + return succ +} diff --git a/core/slots/statistic/data/leap_array_test.go b/core/slots/statistic/data/leap_array_test.go new file mode 100644 index 000000000..7c285406a --- /dev/null +++ b/core/slots/statistic/data/leap_array_test.go @@ -0,0 +1,205 @@ +package data + +import ( + "github.com/sentinel-group/sentinel-golang/core/util" + "sync" + "sync/atomic" + "testing" +) + +const ( + WindowLengthImMs uint32 = 200 + SampleCount uint32 = 5 + IntervalInMs uint32 = 1000 +) + +//Test sliding windows create windows +func TestNewWindow(t *testing.T) { + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) + time := util.GetTimeMilli() + + wr, err := slidingWindow.data.CurrentWindowWithTime(time, slidingWindow) + if wr == nil { + t.Errorf("Unexcepted error") + } + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != WindowLengthImMs { + t.Errorf("Unexcepted error, winlength is not same") + } + if wr.windowStart != (time - time%uint64(WindowLengthImMs)) { + t.Errorf("Unexcepted error, winlength is not same") + } + if wr.value == nil { + t.Errorf("Unexcepted error, value is nil") + } + if slidingWindow.Count(MetricEventPass) != 0 { + t.Errorf("Unexcepted error, pass value is invalid") + } +} + +// Test the logic get window start time. +func TestLeapArrayWindowStart(t *testing.T) { + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) + firstTime := util.GetTimeMilli() + previousWindowStart := firstTime - firstTime%uint64(WindowLengthImMs) + + wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != WindowLengthImMs { + t.Errorf("Unexpected error, winLength is not same") + } + if wr.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } +} + +// test sliding window has multi windows +func TestWindowAfterOneInterval(t *testing.T) { + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) + firstTime := util.GetTimeMilli() + previousWindowStart := firstTime - firstTime%uint64(WindowLengthImMs) + + wr, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr.windowLengthInMs != WindowLengthImMs { + t.Errorf("Unexpected error, winLength is not same") + } + if wr.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } + if wr.value == nil { + t.Errorf("Unexcepted error") + } + mb, ok := wr.value.(*MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + mb.Add(MetricEventPass, 1) + mb.Add(MetricEventBlock, 1) + mb.Add(MetricEventSuccess, 1) + mb.Add(MetricEventError, 1) + + if mb.Get(MetricEventPass) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventBlock) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventSuccess) != 1 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventError) != 1 { + t.Errorf("Unexcepted error") + } + + middleTime := previousWindowStart + uint64(WindowLengthImMs)/2 + wr2, err := slidingWindow.data.CurrentWindowWithTime(middleTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr2.windowStart != previousWindowStart { + t.Errorf("Unexpected error, winStart is not same") + } + mb2, ok := wr2.value.(*MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if wr != wr2 { + t.Errorf("Unexcepted error") + } + mb2.Add(MetricEventPass, 1) + if mb.Get(MetricEventPass) != 2 { + t.Errorf("Unexcepted error") + } + if mb.Get(MetricEventBlock) != 1 { + t.Errorf("Unexcepted error") + } + + lastTime := middleTime + uint64(WindowLengthImMs)/2 + wr3, err := slidingWindow.data.CurrentWindowWithTime(lastTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + if wr3.windowLengthInMs != WindowLengthImMs { + t.Errorf("Unexpected error") + } + if (wr3.windowStart - uint64(WindowLengthImMs)) != previousWindowStart { + t.Errorf("Unexpected error") + } + mb3, ok := wr3.value.(*MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if &mb3 == nil { + t.Errorf("Unexcepted error") + } + + if mb3.Get(MetricEventPass) != 0 { + t.Errorf("Unexcepted error") + } + if mb3.Get(MetricEventBlock) != 0 { + t.Errorf("Unexcepted error") + } +} + +func TestNTimeMultiGoroutineUpdateEmptyWindow(t *testing.T) { + for i := 0; i < 1000; i++ { + _nTestMultiGoroutineUpdateEmptyWindow(t) + } +} + +func _task(wg *sync.WaitGroup, slidingWindow *SlidingWindow, ti uint64, t *testing.T, ct *uint64) { + wr, err := slidingWindow.data.CurrentWindowWithTime(ti, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + mb, ok := wr.value.(*MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + mb.Add(MetricEventPass, 1) + mb.Add(MetricEventBlock, 1) + mb.Add(MetricEventSuccess, 1) + mb.Add(MetricEventError, 1) + atomic.AddUint64(ct, 1) + wg.Done() +} + +func _nTestMultiGoroutineUpdateEmptyWindow(t *testing.T) { + slidingWindow := NewSlidingWindow(SampleCount, IntervalInMs) + firstTime := util.GetTimeMilli() + + const GoroutineNum = 1000 + wg := &sync.WaitGroup{} + wg.Add(GoroutineNum) + //st := util.GetTimeNano() + var cnt = uint64(0) + for i := 0; i < GoroutineNum; i++ { + go _task(wg, slidingWindow, firstTime, t, &cnt) + } + wg.Wait() + //t.Logf("finish goroutines: %d", atomic.LoadUint64(&cnt)) + //et := util.GetTimeNano() + //dif := et - st + //t.Logf("finish all goroutines, cost time is %d ns", dif) + wr2, err := slidingWindow.data.CurrentWindowWithTime(firstTime, slidingWindow) + if err != nil { + t.Errorf("Unexcepted error") + } + mb2, ok := wr2.value.(*MetricBucket) + if !ok { + t.Errorf("Unexcepted error") + } + if mb2.Get(MetricEventPass) != GoroutineNum { + t.Errorf("Unexcepted error, infact, %d", mb2.Get(MetricEventPass)) + } + if mb2.Get(MetricEventBlock) != GoroutineNum { + t.Errorf("Unexcepted error, infact, %d", mb2.Get(MetricEventBlock)) + } +} diff --git a/core/slots/statistic/data/metric_bucket.go b/core/slots/statistic/data/metric_bucket.go new file mode 100644 index 000000000..58d3155b3 --- /dev/null +++ b/core/slots/statistic/data/metric_bucket.go @@ -0,0 +1,104 @@ +package data + +import ( + "math" + "sync/atomic" +) + +type MetricEventType int8 + +const ( + MetricEventPass MetricEventType = iota + MetricEventBlock + MetricEventSuccess + MetricEventError + MetricEventRt + // hack for getting length of enum + metricEventNum +) + +/** +MetricBucket store the metric statistic of each event +(MetricEventPass、MetricEventBlock、MetricEventError、MetricEventSuccess、MetricEventRt) +*/ +type MetricBucket struct { + // value of statistic + counters [metricEventNum]uint64 + minRt uint64 +} + +func newEmptyMetricBucket() *MetricBucket { + mb := &MetricBucket{ + minRt: math.MaxUint64, + } + return mb +} + +func (mb *MetricBucket) Add(event MetricEventType, count uint64) { + if event > metricEventNum { + panic("event is bigger then metricEventNum") + } + atomic.AddUint64(&mb.counters[event], count) +} + +func (mb *MetricBucket) Get(event MetricEventType) uint64 { + if event > metricEventNum { + panic("event is bigger then metricEventNum") + } + return mb.counters[event] +} + +func (mb *MetricBucket) MinRt() uint64 { + return mb.minRt +} + +func (mb *MetricBucket) Reset() { + for i := 0; i < int(metricEventNum); i++ { + atomic.StoreUint64(&mb.counters[i], 0) + } + atomic.StoreUint64(&mb.minRt, math.MaxUint64) +} + +func (mb *MetricBucket) AddPass(n uint64) { + mb.Add(MetricEventPass, n) +} + +func (mb *MetricBucket) Pass() uint64 { + return mb.Get(MetricEventPass) +} + +func (mb *MetricBucket) AddBlock(n uint64) { + mb.Add(MetricEventBlock, n) +} + +func (mb *MetricBucket) Block() uint64 { + return mb.Get(MetricEventBlock) +} + +func (mb *MetricBucket) AddSuccess(n uint64) { + mb.Add(MetricEventSuccess, n) +} + +func (mb *MetricBucket) Success() uint64 { + return mb.Get(MetricEventSuccess) +} + +func (mb *MetricBucket) AddError(n uint64) { + mb.Add(MetricEventError, n) +} + +func (mb *MetricBucket) Error() uint64 { + return mb.Get(MetricEventError) +} + +func (mb *MetricBucket) AddRt(rt uint64) { + mb.Add(MetricEventRt, rt) + // Not thread-safe, but it's okay. + if rt < mb.minRt { + mb.minRt = rt + } +} + +func (mb *MetricBucket) Rt() uint64 { + return mb.Get(MetricEventRt) +} diff --git a/core/slots/statistic/data/unary_leap_array.go b/core/slots/statistic/data/unary_leap_array.go new file mode 100644 index 000000000..7828de680 --- /dev/null +++ b/core/slots/statistic/data/unary_leap_array.go @@ -0,0 +1,15 @@ +package data + +type UnaryLeapArray struct { + LeapArray +} + +func (uls *UnaryLeapArray) newEmptyBucket(startTime uint64) interface{} { + return uint64(0) +} + +func (uls *UnaryLeapArray) resetWindowTo(ww *WindowWrap, startTime uint64) (*WindowWrap, error) { + ww.windowStart = startTime + ww.value = uint64(0) + return ww, nil +} diff --git a/core/slots/statistic/statistic_slot.go b/core/slots/statistic/statistic_slot.go new file mode 100644 index 000000000..a75dcba27 --- /dev/null +++ b/core/slots/statistic/statistic_slot.go @@ -0,0 +1,40 @@ +package statistic + +import ( + "errors" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" +) + +type StatisticSlot struct { + chain.LinkedSlot +} + +func (fs *StatisticSlot) Entry(ctx *base.Context, resWrapper *base.ResourceWrapper, node *base.DefaultNode, count int, prioritized bool) (*base.TokenResult, error) { + var r *base.TokenResult + var err error + defer func() { + if e := recover(); e != nil { + r = base.NewSlotResultError("StatisticSlot") + err = errors.New("panic occur") + } + }() + // fire next slot + result, err := fs.FireEntry(ctx, resWrapper, node, count, prioritized) + + if err != nil { + // TO DO + } + if result.Status == base.ResultStatusError { + // TO DO + } + if result.Status == base.ResultStatusPass { + node.AddPass(1) + + } + return result, err +} + +func (fs *StatisticSlot) Exit(ctx *base.Context, resourceWrapper *base.ResourceWrapper, count int) error { + return fs.FireExit(ctx, resourceWrapper, count) +} diff --git a/core/sphu.go b/core/sphu.go new file mode 100644 index 000000000..661156e04 --- /dev/null +++ b/core/sphu.go @@ -0,0 +1,48 @@ +package core + +import ( + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "github.com/sentinel-group/sentinel-golang/core/slots/chain" + "github.com/sentinel-group/sentinel-golang/core/slots/flow" + "github.com/sentinel-group/sentinel-golang/core/slots/statistic" +) + +type DefaultSlotChainBuilder struct { +} + +func (dsc *DefaultSlotChainBuilder) Build() chain.SlotChain { + linkedChain := chain.NewLinkedSlotChain() + linkedChain.AddFirst(new(flow.FlowSlot)) + linkedChain.AddFirst(new(statistic.StatisticSlot)) + // add all slot + return linkedChain +} + +func NewDefaultSlotChainBuilder() *DefaultSlotChainBuilder { + return &DefaultSlotChainBuilder{} +} + +var defaultChain chain.SlotChain +var defaultNode *base.DefaultNode + +func init() { + defaultChain = NewDefaultSlotChainBuilder().Build() + defaultNode = base.NewDefaultNode(nil) +} + +func Entry(resource string) (*base.TokenResult, error) { + resourceWrap := &base.ResourceWrapper{ + ResourceName: resource, + ResourceType: base.INBOUND, + } + + return defaultChain.Entry(nil, resourceWrap, defaultNode, 0, false) +} + +func Exit(resource string) error { + resourceWrap := &base.ResourceWrapper{ + ResourceName: resource, + ResourceType: base.INBOUND, + } + return defaultChain.Exit(nil, resourceWrap, 1) +} diff --git a/core/util/time.go b/core/util/time.go new file mode 100644 index 000000000..36d145b33 --- /dev/null +++ b/core/util/time.go @@ -0,0 +1,17 @@ +package util + +import "time" + +func GetTimeMilli() uint64 { + return uint64(time.Now().UnixNano() / 1e6) +} + +//noinspection GoUnusedExportedFunction +func GetTimeNano() uint64 { + return uint64(time.Now().UnixNano()) +} + +//noinspection GoUnusedExportedFunction +func GetTimeSecond() uint64 { + return uint64(time.Now().Unix()) +} diff --git a/core/util/triablemutx.go b/core/util/triablemutx.go new file mode 100644 index 000000000..61d3340ad --- /dev/null +++ b/core/util/triablemutx.go @@ -0,0 +1,17 @@ +package util + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +const mutexLocked = 1 << iota + +type TriableMutex struct { + sync.Mutex +} + +func (tmux *TriableMutex) TryLock() bool { + return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&tmux.Mutex)), 0, mutexLocked) +} diff --git a/core/util/triablemutx_test.go b/core/util/triablemutx_test.go new file mode 100644 index 000000000..8b36dcf82 --- /dev/null +++ b/core/util/triablemutx_test.go @@ -0,0 +1,61 @@ +package util + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestTriableMutex_TryLock(t *testing.T) { + var m TriableMutex + m.Lock() + time.Sleep(time.Second) + fmt.Printf("TryLock: %t\n", m.TryLock()) //false + fmt.Printf("TryLock: %t\n", m.TryLock()) // false + m.Unlock() + fmt.Printf("TryLock: %t\n", m.TryLock()) //true + fmt.Printf("TryLock: %t\n", m.TryLock()) //false + m.Unlock() + fmt.Printf("TryLock: %t\n", m.TryLock()) //true + m.Unlock() +} + +func iiiTestConcurrent100() { + var m TriableMutex + cnt := int32(0) + wg := &sync.WaitGroup{} + wg.Add(1000) + for i := 0; i < 1000; i++ { + ci := i + go func(tm TriableMutex, wg_ *sync.WaitGroup, i_ int, cntPtr *int32) { + for { + if tm.TryLock() { + atomic.AddInt32(cntPtr, 1) + tm.Unlock() + wg_.Done() + break + } else { + runtime.Gosched() + } + } + }(m, wg, ci, &cnt) + } + wg.Wait() + //fmt.Println("count=", cnt) + if cnt != 1000 { + fmt.Println("count error") + } +} + +func TestTriableMutex_TryLock_Concurrent1000(t *testing.T) { + iiiTestConcurrent100() +} + +func BenchmarkTriableMutex_TryLock(b *testing.B) { + for n := 0; n < b.N; n++ { + iiiTestConcurrent100() + } +} diff --git a/example/main.go b/example/main.go new file mode 100644 index 000000000..28dd40b3a --- /dev/null +++ b/example/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "github.com/sentinel-group/sentinel-golang/core" + "github.com/sentinel-group/sentinel-golang/core/slots/base" + "math/rand" + "sync" + "time" +) + +func main() { + fmt.Println("=================start=================") + wg := &sync.WaitGroup{} + wg.Add(10) + + for i := 0; i < 10; i++ { + test(wg) + } + wg.Wait() + fmt.Println("=================end=================") +} + +func test(wg *sync.WaitGroup) { + rand.Seed(1000) + r := rand.Int63() % 10 + time.Sleep(time.Duration(r) * time.Millisecond) + result, e := core.Entry("test") + if e != nil { + fmt.Println(e.Error()) + return + } + if result.Status == base.ResultStatusBlocked { + fmt.Println("reason:", result.BlockedReason) + } + if result.Status == base.ResultStatusError { + fmt.Println("reason:", result.ErrorMsg) + } + if result.Status == base.ResultStatusPass { + _ = core.Exit("test") + } + time.Sleep(time.Duration(r) * time.Millisecond) + wg.Done() +} diff --git a/go.mod b/go.mod new file mode 100644 index 000000000..abda04c93 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/sentinel-group/sentinel-golang + +go 1.12