/
sinking.go
115 lines (104 loc) · 2.95 KB
/
sinking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package sinking_sdk_go
import (
"math/rand"
"strings"
)
var (
checkTime = 5 //轮询间隔
)
// Register 注册中心
type Register struct {
Servers string `json:"-"` //注册中心
TokenName string `json:"-"` //通信密匙名称
Token string `json:"-"` //通信密匙
Name string `json:"name"` //服务名称
AppName string `json:"app_name"` //所属应用
EnvName string `json:"env_name"` //环境标识
GroupName string `json:"group_name"` //分组名称
Addr string `json:"addr"` //服务地址(规则ip:port)
server string //使用节点
useService map[string][]string //使用服务
}
// New 实例化
func New(server string, tokenName string, token string, appName string, envName string) *Register {
return &Register{
Servers: server,
TokenName: tokenName,
Token: token,
AppName: appName,
EnvName: envName,
}
}
// Register 注册服务
func (r *Register) Register(groupName string, name string, addr string) *Register {
r.GroupName = groupName
r.Name = name
r.Addr = addr
return r
}
// UseService 使用服务
func (r *Register) UseService(use map[string][]string) *Register {
r.useService = use
return r
}
// changeServer 更改注册中心
func (r *Register) changeServer(rand bool) {
if !rand {
r.changeServerByHash() //根据hash修改server
} else {
r.changeServerByRand() //根据rand修改server
}
}
// changeServerByHash 根据hash获取server
func (r *Register) changeServerByRand() {
data := strings.Split(r.Servers, ",")
if len(data) <= 1 {
return
}
var temp []string
for _, v := range data {
if v != r.server {
temp = append(temp, v)
}
}
r.server = temp[rand.Intn(len(temp))]
}
// changeServerByHash 根据hash获取server
func (r *Register) changeServerByHash() {
key := Md5Encode(r.AppName + r.EnvName + r.GroupName + r.Name + r.Addr)
test := NewConsistent()
data := strings.Split(r.Servers, ",")
for _, v := range data {
test.Add(v)
}
server, err := test.Get(key)
if err != nil {
return
}
r.server = server
}
// Listen 监听配置变动及发送服务心跳
func (r *Register) Listen() {
r.changeServer(false) //初始化节点根据hash获取
r.getConfigs(false) //监听配置列表
r.getConfigs(true) //监听配置列表
r.registerServices(false) //注册节点并维持心跳
r.registerServices(true) //注册节点并维持心跳
r.getServices(false) //监听服务列表
r.getServices(true) //监听服务列表
}
// SetOnline 设置服务上线下线
func (r *Register) SetOnline(online bool, imEf bool) {
OnlineStatusLock.RLock()
OnlineStatus = online
OnlineStatusLock.RUnlock()
if !imEf {
return
}
//更改服务状态(即时)
status := 0
if !OnlineStatus {
status = 1
}
r.changeServerStatus(Md5Encode(r.AppName+r.EnvName+r.GroupName+r.Addr), status)
}