/
w-round-robin.go
145 lines (122 loc) · 3.88 KB
/
w-round-robin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package w_round_robin
import (
"math/rand"
"strconv"
"sync/atomic"
"time"
types "github.com/aaydin-tr/divisor/core/types"
"github.com/aaydin-tr/divisor/internal/proxy"
"github.com/aaydin-tr/divisor/pkg/config"
"github.com/aaydin-tr/divisor/pkg/helper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
type serverMap struct {
proxy proxy.IProxyClient
weight uint
isHostAlive bool
i int
}
type WRoundRobin struct {
serversMap map[uint32]*serverMap
isHostAlive types.IsHostAlive
hashFunc types.HashFunc
stopHealthChecker chan bool
servers []proxy.IProxyClient
len uint64
i uint64
healthCheckerTime time.Duration
}
func NewWRoundRobin(config *config.Config, proxyFunc proxy.ProxyFunc) types.IBalancer {
wRoundRobin := &WRoundRobin{
isHostAlive: config.HealthCheckerFunc,
healthCheckerTime: config.HealthCheckerTime,
serversMap: make(map[uint32]*serverMap),
hashFunc: config.HashFunc,
}
for i, b := range config.Backends {
if !wRoundRobin.isHostAlive(b.GetHealthCheckURL()) {
zap.S().Warnf("Could not add for load balancing because the server is not live, Addr: %s", b.Url)
continue
}
proxy := proxyFunc(b, config.CustomHeaders)
for i := 0; i < int(b.Weight); i++ {
wRoundRobin.servers = append(wRoundRobin.servers, proxy)
}
wRoundRobin.serversMap[wRoundRobin.hashFunc(helper.S2b(b.Url+strconv.Itoa(i)))] = &serverMap{proxy: proxy, weight: b.Weight, isHostAlive: true, i: i}
zap.S().Infof("Server add for load balancing successfully Addr: %s", b.Url)
}
if len(wRoundRobin.servers) <= 0 {
return nil
}
wRoundRobin.len = uint64(len(wRoundRobin.servers))
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(wRoundRobin.servers), func(i, j int) {
wRoundRobin.servers[i], wRoundRobin.servers[j] = wRoundRobin.servers[j], wRoundRobin.servers[i]
})
go wRoundRobin.healthChecker(config.Backends)
return wRoundRobin
}
func (w *WRoundRobin) Serve() func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
w.next().ReverseProxyHandler(ctx)
}
}
func (w *WRoundRobin) next() proxy.IProxyClient {
v := atomic.AddUint64(&w.i, 1)
return w.servers[v%w.len]
}
func (w *WRoundRobin) healthChecker(backends []config.Backend) {
for {
select {
case <-w.stopHealthChecker:
return
default:
time.Sleep(w.healthCheckerTime)
for i, backend := range backends {
w.healthCheck(backend, i)
}
}
}
}
func (w *WRoundRobin) healthCheck(backend config.Backend, index int) {
status := w.isHostAlive(backend.GetHealthCheckURL())
backendHash := w.hashFunc(helper.S2b(backend.Url + strconv.Itoa(index)))
proxyMap, ok := w.serversMap[backendHash]
if ok && (!status && proxyMap.isHostAlive) {
w.servers = helper.RemoveByValue(w.servers, proxyMap.proxy)
w.len = w.len - uint64(proxyMap.weight)
proxyMap.isHostAlive = false
zap.S().Infof("Server is down, removing from load balancer, Addr: %s", backend.Url)
if w.len == 0 {
panic("All backends are down")
}
} else if ok && (status && !proxyMap.isHostAlive) {
for i := 0; i < int(proxyMap.weight); i++ {
w.servers = append(w.servers, proxyMap.proxy)
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(w.servers), func(i, j int) {
w.servers[i], w.servers[j] = w.servers[j], w.servers[i]
})
w.len = w.len + uint64(proxyMap.weight)
proxyMap.isHostAlive = true
zap.S().Infof("Server is live again, adding back to load balancer, Addr: %s", backend.Url)
}
}
func (w *WRoundRobin) Stats() []types.ProxyStat {
stats := make([]types.ProxyStat, len(w.serversMap))
for hash, p := range w.serversMap {
s := p.proxy.Stat()
stats[p.i] = types.ProxyStat{
Addr: s.Addr,
TotalReqCount: s.TotalReqCount,
AvgResTime: s.AvgResTime,
LastUseTime: s.LastUseTime,
ConnsCount: s.ConnsCount,
IsHostAlive: p.isHostAlive,
BackendHash: hash,
}
}
return stats
}