/
policy.go
144 lines (117 loc) · 2.94 KB
/
policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package sqldb
import (
"math/rand"
"strings"
"sync"
"time"
)
/*
从库负载策略
*/
var policyHandlerMap = map[string]interface{}{
"random": RandomPolicy,
"weightrandom": WeightRandomPolicy,
"roundrobin": RoundRobinPolicy,
"weightroundrobin": WeightRoundRobinPolicy,
"leastconn": LeastConnPolicy,
}
func RegisterPolicyHandler(name string, handler interface{}) {
policyHandlerMap[strings.ToLower(name)] = handler
}
func GetPolicyHandler(name string) (handlerFunc interface{}, ok bool) {
handlerFunc, ok = policyHandlerMap[strings.ToLower(name)]
return
}
type IPolicy interface {
Slave(engine *ConnectionEngine) *Connection
}
type PolicyHandler func(engine *ConnectionEngine) *Connection
func (handler PolicyHandler) Slave(engine *ConnectionEngine) *Connection {
return handler(engine)
}
type PolicyParams struct {
Weights []int
}
// 随机访问负载策略
func RandomPolicy() PolicyHandler {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(engine *ConnectionEngine) *Connection {
return engine.slaves[r.Intn(len(engine.slaves))]
}
}
// 权重随机访问负载策略
func WeightRandomPolicy(params PolicyParams) PolicyHandler {
weightsLen := len(params.Weights)
rands := make([]int, 0, weightsLen)
for i := 0; i < weightsLen; i++ {
for n := 0; n < params.Weights[i]; n++ {
rands = append(rands, i)
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(engine *ConnectionEngine) *Connection {
index := rands[r.Intn(len(rands))]
count := len(engine.slaves)
if index >= count {
index = count - 1
}
return engine.slaves[index]
}
}
// 轮询访问负载策略
func RoundRobinPolicy() PolicyHandler {
pos := -1
var lock sync.Mutex
return func(engine *ConnectionEngine) *Connection {
lock.Lock()
defer lock.Unlock()
pos++
if pos >= len(engine.slaves) {
pos = 0
}
return engine.slaves[pos]
}
}
// 权重轮询访问负载策略
func WeightRoundRobinPolicy(params PolicyParams) PolicyHandler {
weightsLen := len(params.Weights)
rands := make([]int, 0, weightsLen)
for i := 0; i < weightsLen; i++ {
for n := 0; n < params.Weights[i]; n++ {
rands = append(rands, i)
}
}
pos := -1
var lock sync.Mutex
return func(engine *ConnectionEngine) *Connection {
lock.Lock()
defer lock.Unlock()
pos++
if pos >= len(rands) {
pos = 0
}
index := rands[pos]
count := len(engine.slaves)
if index > count {
index = count - 1
}
return engine.slaves[index]
}
}
// 最小连接数访问负载策略
func LeastConnPolicy() PolicyHandler {
return func(engine *ConnectionEngine) *Connection {
connections, index := 0, 0
for i, count := 0, len(engine.slaves); i < count; i++ {
openConnections := engine.slaves[i].Stats().OpenConnections
if i == 0 {
connections = openConnections
index = i
} else if openConnections <= connections {
connections = openConnections
index = i
}
}
return engine.slaves[index]
}
}