forked from vulcand/vulcand
/
fsm.go
201 lines (177 loc) · 5.52 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package roundrobin
import (
"fmt"
"time"
"github.com/mailgun/vulcand/Godeps/_workspace/src/github.com/mailgun/timetools"
"github.com/mailgun/vulcand/Godeps/_workspace/src/github.com/mailgun/vulcan/metrics"
)
// This handler increases weights on endpoints that perform better than others
// it also rolls back to original weights if the endpoints have changed.
type FSMHandler struct {
// As usual, control time in tests
timeProvider timetools.TimeProvider
// Time that freezes state machine to accumulate stats after updating the weights
backoffDuration time.Duration
// Timer is set to give probing some time to take place
timer time.Time
// Endpoints for this round
endpoints []*WeightedEndpoint
// Precalculated original weights
originalWeights []SuggestedWeight
// Last returned weights
lastWeights []SuggestedWeight
}
const (
// This is the maximum weight that handler will set for the endpoint
FSMMaxWeight = 4096
// Multiplier for the endpoint weight
FSMGrowFactor = 16
)
func NewFSMHandler() (*FSMHandler, error) {
return NewFSMHandlerWithOptions(&timetools.RealTime{})
}
func NewFSMHandlerWithOptions(timeProvider timetools.TimeProvider) (*FSMHandler, error) {
if timeProvider == nil {
return nil, fmt.Errorf("time provider can not be nil")
}
return &FSMHandler{
timeProvider: timeProvider,
}, nil
}
func (fsm *FSMHandler) Init(endpoints []*WeightedEndpoint) {
fsm.originalWeights = makeOriginalWeights(endpoints)
fsm.lastWeights = fsm.originalWeights
fsm.endpoints = endpoints
if len(endpoints) > 0 {
fsm.backoffDuration = endpoints[0].meter.GetWindowSize() / 2
}
fsm.timer = fsm.timeProvider.UtcNow().Add(-1 * time.Second)
}
// Called on every load balancer NextEndpoint call, returns the suggested weights
// on every call, can adjust weights if needed.
func (fsm *FSMHandler) AdjustWeights() ([]SuggestedWeight, error) {
// In this case adjusting weights would have no effect, so do nothing
if len(fsm.endpoints) < 2 {
return fsm.originalWeights, nil
}
// Metrics are not ready
if !metricsReady(fsm.endpoints) {
return fsm.originalWeights, nil
}
if !fsm.timerExpired() {
return fsm.lastWeights, nil
}
// Select endpoints with highest error rates and lower their weight
good, bad := splitEndpoints(fsm.endpoints)
// No endpoints that are different by their quality, so converge weights
if len(bad) == 0 || len(good) == 0 {
weights, changed := fsm.convergeWeights()
if changed {
fsm.lastWeights = weights
fsm.setTimer()
}
return fsm.lastWeights, nil
}
fsm.lastWeights = fsm.adjustWeights(good, bad)
fsm.setTimer()
return fsm.lastWeights, nil
}
func (fsm *FSMHandler) convergeWeights() ([]SuggestedWeight, bool) {
weights := make([]SuggestedWeight, len(fsm.endpoints))
// If we have previoulsy changed endpoints try to restore weights to the original state
changed := false
for i, e := range fsm.endpoints {
weights[i] = &EndpointWeight{e, decrease(e.GetOriginalWeight(), e.GetEffectiveWeight())}
if e.GetEffectiveWeight() != e.GetOriginalWeight() {
changed = true
}
}
return normalizeWeights(weights), changed
}
func (fsm *FSMHandler) adjustWeights(good map[string]bool, bad map[string]bool) []SuggestedWeight {
// Increase weight on good endpoints
weights := make([]SuggestedWeight, len(fsm.endpoints))
for i, e := range fsm.endpoints {
if good[e.GetId()] && increase(e.GetEffectiveWeight()) <= FSMMaxWeight {
weights[i] = &EndpointWeight{e, increase(e.GetEffectiveWeight())}
} else {
weights[i] = &EndpointWeight{e, e.GetEffectiveWeight()}
}
}
return normalizeWeights(weights)
}
func weightsGcd(weights []SuggestedWeight) int {
divisor := -1
for _, w := range weights {
if divisor == -1 {
divisor = w.GetWeight()
} else {
divisor = gcd(divisor, w.GetWeight())
}
}
return divisor
}
func normalizeWeights(weights []SuggestedWeight) []SuggestedWeight {
gcd := weightsGcd(weights)
if gcd <= 1 {
return weights
}
for _, w := range weights {
w.SetWeight(w.GetWeight() / gcd)
}
return weights
}
func (fsm *FSMHandler) setTimer() {
fsm.timer = fsm.timeProvider.UtcNow().Add(fsm.backoffDuration)
}
func (fsm *FSMHandler) timerExpired() bool {
return fsm.timer.Before(fsm.timeProvider.UtcNow())
}
func metricsReady(endpoints []*WeightedEndpoint) bool {
for _, e := range endpoints {
if !e.meter.IsReady() {
return false
}
}
return true
}
func increase(weight int) int {
return weight * FSMGrowFactor
}
func decrease(target, current int) int {
adjusted := current / FSMGrowFactor
if adjusted < target {
return target
} else {
return adjusted
}
}
func makeOriginalWeights(endpoints []*WeightedEndpoint) []SuggestedWeight {
weights := make([]SuggestedWeight, len(endpoints))
for i, e := range endpoints {
weights[i] = &EndpointWeight{
Weight: e.GetOriginalWeight(),
Endpoint: e,
}
}
return weights
}
// splitEndpoints splits endpoints into two groups of endpoints with bad and good failure rate.
// It does compare relative performances of the endpoints though, so if all endpoints have approximately the same error rate
// this function returns the result as if all endpoints are equally good.
func splitEndpoints(endpoints []*WeightedEndpoint) (map[string]bool, map[string]bool) {
failRates := make([]float64, len(endpoints))
for i, e := range endpoints {
failRates[i] = e.failRate()
}
g, b := metrics.SplitFloat64(1.5, 0, failRates)
good, bad := make(map[string]bool, len(g)), make(map[string]bool, len(b))
for _, e := range endpoints {
if g[e.failRate()] {
good[e.GetId()] = true
} else {
bad[e.GetId()] = true
}
}
return good, bad
}