/
router_db.go
222 lines (203 loc) · 6.92 KB
/
router_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package service_router
import (
"sync"
"time"
"github.com/zentures/cityhash"
)
type RouterDb interface {
ServiceSubscriber
ConfigSubscriber
GetConfigs() map[string]ServiceConfig
GetServiceConfig(serviceName string) (map[string]string, bool)
GetServiceRouterConfig(serviceName string) ServiceRouterConfig
PickServers(serviceName string, serviceList ServerList) ServerList
UpdateServers(serviceName string, serverList ServerList)
UpdateConfig(serviceName string, config ServiceConfig)
SelectServers(serviceName string, protocol string, routerId int64, shardType string, dc string) (ServerList, bool)
BatchSelectServers(serviceName, protocol string, routerIds []int64, shardType string, dc string) map[int64]ServerList
}
// RouterDbImpl所提供的api返回的数据必须是副本
// golang没法控制引用的写权限
type RouterDbImpl struct {
servicesLock sync.RWMutex
partitionKeyLock sync.RWMutex
configsLock sync.RWMutex
services map[int64]ServerList
lastPartitionKeys map[string][]int64
configs map[string]ServiceConfig
}
var (
routerDbInstance RouterDb
routerDbOnce sync.Once
)
func GetServiceKey(serviceName string, protocol string, routerId int64, shardType string, dc string) int64 {
key := cityhash.CityHash64WithSeed([]byte(dc), uint32(len(dc)), uint64(routerId))
key = cityhash.CityHash64WithSeed([]byte(serviceName), uint32(len(serviceName)), key)
key = cityhash.CityHash64WithSeed([]byte(protocol), uint32(len(protocol)), key)
key = cityhash.CityHash64WithSeed([]byte(shardType), uint32(len(shardType)), key)
return int64(key)
}
func GetRouterDb() RouterDb {
routerDbOnce.Do(func() {
routerDbInstance = &RouterDbImpl{
services: make(map[int64]ServerList),
configs: make(map[string]ServiceConfig),
lastPartitionKeys: make(map[string][]int64),
}
})
return routerDbInstance
}
func (rd *RouterDbImpl) GetConfigs() map[string]ServiceConfig {
result := make(map[string]ServiceConfig, len(rd.configs))
for k, v := range rd.configs {
result[k] = v
}
return result
}
func (rd *RouterDbImpl) GetServiceConfig(serviceName string) (map[string]string, bool) {
rd.configsLock.RLock()
defer rd.configsLock.RUnlock()
config, ok := rd.configs[serviceName]
if ok && config.Configs != nil {
var result = make(map[string]string, len(config.Configs))
for k, v := range config.Configs {
result[k] = v
}
return result, true
} else {
return nil, false
}
}
func (rd *RouterDbImpl) GetServiceRouterConfig(serviceName string) ServiceRouterConfig {
rd.configsLock.RLock()
defer rd.configsLock.RUnlock()
config, ok := rd.configs[serviceName]
if ok {
return config.Router
} else {
return DefaultServiceRouterConfig()
}
}
func (rd *RouterDbImpl) PickServers(serviceName string, serviceList ServerList) ServerList {
var result ServerList
// current milliseconds
now := time.Now().UnixNano() / 1e6
routerConfig := rd.GetServiceRouterConfig(serviceName)
for _, s := range serviceList {
if uint64(now) > s.UpdateTime+uint64(routerConfig.TtlInMs) {
continue
}
if s.Status != ServerStatus_AVAILABLE {
continue
}
result = append(result, s)
}
return result
}
func (rd *RouterDbImpl) UpdateServers(serviceName string, serverList ServerList) {
if len(serverList) == 0 {
return
}
shardServers := make(map[int64]ServerList)
partitionServers := make(map[int64]ServerList)
pickedResult := rd.PickServers(serviceName, serverList)
// 如果服务全挂了,那就赌一把,万一呢
if len(pickedResult) == 0 {
pickedResult = serverList
}
for _, sv := range pickedResult {
if len(sv.Dc) == 0 {
sv.Dc = DEFAULT_DC
}
if sv.IsEdgeNode {
partitionList := sv.PartitionList
for _, partitionHash := range partitionList {
key := GetServiceKey(serviceName, sv.Protocol, partitionHash, ShardType_FOLLOWER, sv.Dc)
partitionServers[key] = append(partitionServers[key], sv)
}
continue
}
availableShardList := sv.AvailableShardList
followerAvailableShardList := sv.FollowerAvailableShardList
if len(sv.ShardList) == 0 && len(sv.FollowerShardList) == 0 {
// 按照不分区处理
// 2^32 - 1
availableShardList = append(availableShardList, uint32(UINT32MAX))
}
for _, as := range availableShardList {
shardId := int64(as)
key := GetServiceKey(serviceName, sv.Protocol, shardId, ShardType_LEADER, sv.Dc)
shardServers[key] = append(shardServers[key], sv)
key = GetServiceKey(serviceName, sv.Protocol, shardId, ShardType_ALL, sv.Dc)
shardServers[key] = append(shardServers[key], sv)
}
for _, afs := range followerAvailableShardList {
shardId := int64(afs)
key := GetServiceKey(serviceName, sv.Protocol, shardId, ShardType_FOLLOWER, sv.Dc)
shardServers[key] = append(shardServers[key], sv)
key = GetServiceKey(serviceName, sv.Protocol, shardId, ShardType_ALL, sv.Dc)
shardServers[key] = append(shardServers[key], sv)
}
}
// clean partition ServerList
rd.servicesLock.Lock()
rd.partitionKeyLock.Lock()
if _, ok := rd.lastPartitionKeys[serviceName]; ok {
for _, key := range rd.lastPartitionKeys[serviceName] {
delete(rd.services, key)
}
rd.lastPartitionKeys[serviceName] = rd.lastPartitionKeys[serviceName][:0]
}
rd.partitionKeyLock.Unlock()
rd.servicesLock.Unlock()
if _, ok := rd.lastPartitionKeys[serviceName]; !ok {
rd.lastPartitionKeys[serviceName] = make([]int64, 0)
}
for key, serverList := range partitionServers {
rd.servicesLock.Lock()
rd.services[key] = serverList
rd.partitionKeyLock.Lock()
rd.lastPartitionKeys[serviceName] = append(rd.lastPartitionKeys[serviceName], key)
rd.partitionKeyLock.Unlock()
rd.servicesLock.Unlock()
}
for key, serverList := range shardServers {
rd.servicesLock.Lock()
rd.services[key] = serverList
rd.servicesLock.Unlock()
}
}
func (rd *RouterDbImpl) UpdateConfig(serviceName string, config ServiceConfig) {
rd.configsLock.RLock()
defer rd.configsLock.RUnlock()
rd.configs[serviceName] = config
}
func (rd *RouterDbImpl) ServiceNotify(serviceName string, services ServerList) {
rd.UpdateServers(serviceName, services)
}
func (rd *RouterDbImpl) ConfigNotify(serviceName string, serviceConfig ServiceConfig) {
rd.UpdateConfig(serviceName, serviceConfig)
}
func (rd *RouterDbImpl) BatchSelectServers(serviceName, protocol string, routerIds []int64, shardType string, dc string) map[int64]ServerList {
result := make(map[int64]ServerList)
for _, routerId := range routerIds {
serverList, ok := rd.SelectServers(serviceName, protocol, routerId, shardType, dc)
if ok {
result[routerId] = serverList
}
}
return result
}
func (rd *RouterDbImpl) SelectServers(serviceName string, protocol string, routerId int64, shardType string, dc string) (ServerList, bool) {
key := GetServiceKey(serviceName, protocol, routerId, shardType, dc)
rd.servicesLock.RLock()
defer rd.servicesLock.RUnlock()
serverList, ok := rd.services[key]
if ok {
var results = make(ServerList, len(serverList))
copy(results, serverList)
return results, ok
} else {
return nil, false
}
}