-
Notifications
You must be signed in to change notification settings - Fork 12
/
cslb.go
121 lines (102 loc) · 2.46 KB
/
cslb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package cslb
import (
"math"
"time"
"golang.org/x/sync/singleflight"
)
const (
NodeFailedKey = "node-failed."
RefreshKey = "refresh"
)
type LoadBalancer struct {
service Service
strategy Strategy
option LoadBalancerOption
sf *singleflight.Group
nodes *Group
ttlTimer *time.Timer
metrics *Metrics
}
func NewLoadBalancer(service Service, strategy Strategy, option ...LoadBalancerOption) *LoadBalancer {
opt := DefaultLoadBalancerOption
if len(option) > 0 {
opt = option[0]
}
lb := &LoadBalancer{
service: service,
strategy: strategy,
option: opt,
sf: new(singleflight.Group),
nodes: NewGroup(opt.MaxNodeCount),
ttlTimer: nil,
metrics: NewMetrics(opt.MaxNodeFailedRatio, opt.MinSampleSize),
}
<-lb.refresh()
if lb.option.TTL != TTLUnlimited {
lb.ttlTimer = time.NewTimer(lb.option.TTL)
go func() {
for {
select {
case <-lb.ttlTimer.C:
<-lb.refresh()
lb.ttlTimer.Reset(lb.option.TTL)
}
}
}()
}
return lb
}
func (lb *LoadBalancer) next(nextFunc func() (Node, error)) (Node, error) {
next, err := nextFunc()
if err != nil {
// Refresh and retry
<-lb.refresh()
next, err = nextFunc()
}
if lb.metrics != nil {
lb.metrics.NodeInc(next)
}
return next, err
}
func (lb *LoadBalancer) Next() (Node, error) {
return lb.next(lb.strategy.Next)
}
func (lb *LoadBalancer) NextFor(input interface{}) (Node, error) {
return lb.next(func() (Node, error) {
return lb.strategy.NextFor(input)
})
}
func (lb *LoadBalancer) NodeFailed(node Node) {
if lb.metrics == nil {
return
}
lb.metrics.NodeFailedInc(node)
if ratio, err := lb.metrics.GetNodeFailedRatio(node); err == nil && ratio > lb.option.MaxNodeFailedRatio {
lb.sf.Do(NodeFailedKey+node.String(), func() (interface{}, error) {
lb.metrics.ResetNode(node)
lb.nodes.Exile(node)
if fn := lb.service.NodeFailedCallbackFunc(); fn != nil {
go fn(node)
}
nodes := lb.nodes.Get()
if len(nodes) <= 0 ||
math.Round(float64(lb.nodes.GetOriginalCount())*lb.option.MinHealthyNodeRatio) > float64(lb.nodes.GetCurrentCount()) {
<-lb.refresh()
} else {
lb.strategy.SetNodes(nodes)
}
return nil, nil
})
}
}
func (lb *LoadBalancer) refresh() <-chan singleflight.Result {
return lb.sf.DoChan(RefreshKey, func() (interface{}, error) {
lb.service.Refresh()
if lb.metrics != nil {
lb.metrics.ResetAllNodes()
}
lb.nodes.Set(lb.service.Nodes())
lb.strategy.SetNodes(lb.nodes.Get())
return nil, nil
})
}