forked from go-chassis/go-chassis
/
latency_strategy.go
executable file
·161 lines (143 loc) · 4.63 KB
/
latency_strategy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package loadbalance
import (
"github.com/ServiceComb/go-chassis/core/registry"
"github.com/ServiceComb/go-chassis/third_party/forked/go-micro/selector"
"sort"
"strings"
"sync"
"time"
)
// ByDuration is for calculating the duration
type ByDuration []time.Duration
func (a ByDuration) Len() int { return len(a) }
func (a ByDuration) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByDuration) Less(i, j int) bool { return a[i] < a[j] }
// variables for latency map, rest and highway requests count
var (
//LatencyMap key is the combination of instance addr and microservice name separated by "/"
LatencyMap map[string][]time.Duration
//maintain different locks since multiple goroutine access the map
LatencyMapRWMutex sync.RWMutex
avgmtx sync.RWMutex
weightedRespMutex sync.Mutex
)
// SetLatency for each requests
func SetLatency(duration time.Duration, addr, microServiceNameAndProtocol string) {
key := addr + "/" + microServiceNameAndProtocol
LatencyMapRWMutex.RLock()
val, ok := LatencyMap[key]
LatencyMapRWMutex.RUnlock()
if !ok {
var durationQueue []time.Duration
durationQueue = append(durationQueue, duration)
LatencyMapRWMutex.Lock()
LatencyMap[key] = durationQueue
LatencyMapRWMutex.Unlock()
} else {
LatencyMapRWMutex.Lock()
if len(val) < 10 {
val = append(val, duration)
LatencyMap[key] = val
} else { // save only latest 10 data for one micro service's protocol endpoint
val = val[1:]
val = append(val, duration)
LatencyMap[key] = val
}
LatencyMapRWMutex.Unlock()
}
}
// WeightedResponse is a strategy plugin,interface must be a service/protocol string
func WeightedResponse(instances []*registry.MicroServiceInstance, serviceAndProtocol interface{}) selector.Next {
return selectWeightedInstance(instances, serviceAndProtocol)
}
// SortingLatencyDuration sorting the average latencies recored for each instance
// and returning the instance addr which has the least latency
func SortingLatencyDuration(serviceAndProtocol string, avgLatencyMap map[string]time.Duration) string {
var mtx sync.Mutex
var tempLatencyMap = make(map[string]time.Duration)
for k, v := range avgLatencyMap {
epMs := strings.Split(k, "/")
//comparing the microservice and protocol name
if (epMs[1] + "/" + epMs[2]) == serviceAndProtocol {
mtx.Lock()
tempLatencyMap[epMs[0]] = v
mtx.Unlock()
}
}
//Inverting maps
invMap := make(map[time.Duration]string, len(tempLatencyMap))
for k, v := range tempLatencyMap {
mtx.Lock()
invMap[v] = k
mtx.Unlock()
}
//Sorting
sortedKeys := make([]time.Duration, len(invMap))
var i int
for k := range invMap {
sortedKeys[i] = k
i++
}
sort.Sort(ByDuration(sortedKeys))
return invMap[sortedKeys[0]]
}
// FindingAvgLatency Calculating the average latency for each instance using the statistics collected,
// key is addr/service/protocol
func FindingAvgLatency(metadata string) (avgMap map[string]time.Duration, protocol string) {
avgMap = make(map[string]time.Duration)
LatencyMapRWMutex.RLock()
defer LatencyMapRWMutex.RUnlock()
for k, v := range LatencyMap {
epMs := strings.Split(k, "/")
//comparing the microservice/protocol name
if (epMs[1] + "/" + epMs[2]) == metadata {
protocol = epMs[2]
var sum time.Duration
for i := 0; i < len(v); i++ {
sum = sum + v[i]
}
avgmtx.Lock()
avgMap[k] = time.Duration(sum.Nanoseconds() / int64(len(v)))
avgmtx.Unlock()
}
}
return avgMap, protocol
}
// selectWeightedInstance select instance based on protocol and less latency
func selectWeightedInstance(instances []*registry.MicroServiceInstance, serviceAndProtocol interface{}) selector.Next {
var instanceAddr string
avgLatencyMap, protocol := FindingAvgLatency(serviceAndProtocol.(string))
if len(avgLatencyMap) == 0 {
return func() (*registry.MicroServiceInstance, error) {
if len(instances) == 0 {
return nil, selector.ErrNoneAvailable
}
//if no instances are selected round robin will be done
weightedRespMutex.Lock()
node := instances[i%len(instances)]
i++
weightedRespMutex.Unlock()
return node, nil
}
}
instanceAddr = SortingLatencyDuration(serviceAndProtocol.(string), avgLatencyMap)
return func() (*registry.MicroServiceInstance, error) {
if len(instances) == 0 {
return nil, selector.ErrNoneAvailable
}
for _, node := range instances {
weightedRespMutex.Lock()
if instanceAddr == node.EndpointsMap[protocol] {
weightedRespMutex.Unlock()
return node, nil
}
weightedRespMutex.Unlock()
}
//if no instances are selected round robin will be done
weightedRespMutex.Lock()
node := instances[i%len(instances)]
i++
weightedRespMutex.Unlock()
return node, nil
}
}