/
config.go
104 lines (91 loc) · 2.31 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package load_balance
import (
"fmt"
"github.com/donscoco/gateway_be/pkg/base_server/zookeeper"
)
// 配置主题
type LoadBalanceConf interface {
Attach(o Observer)
GetConf() []string
WatchConf()
UpdateConf(conf []string)
}
type LoadBalanceZkConf struct {
observers []Observer
path string
zkHosts []string
confIpWeight map[string]string
activeList []string
format string
}
func (s *LoadBalanceZkConf) Attach(o Observer) {
s.observers = append(s.observers, o)
}
func (s *LoadBalanceZkConf) NotifyAllObservers() {
for _, obs := range s.observers {
obs.Update()
}
}
func (s *LoadBalanceZkConf) GetConf() []string {
confList := []string{}
for _, ip := range s.activeList {
weight, ok := s.confIpWeight[ip]
if !ok {
weight = "50" //默认weight
}
confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)
}
return confList
}
// 更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) WatchConf() {
zkManager := zookeeper.NewZkManager(s.zkHosts)
zkManager.GetConnect()
fmt.Println("watchConf")
chanList, chanErr := zkManager.WatchServerListByPath(s.path)
go func() {
defer zkManager.Close()
for {
select {
case changeErr := <-chanErr:
fmt.Println("changeErr", changeErr)
case changedList := <-chanList:
fmt.Println("watch node changed")
s.UpdateConf(changedList)
}
}
}()
}
// 更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) UpdateConf(conf []string) {
s.activeList = conf
for _, obs := range s.observers {
obs.Update()
}
}
func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {
zkManager := zookeeper.NewZkManager(zkHosts)
zkManager.GetConnect()
defer zkManager.Close()
zlist, err := zkManager.GetServerListByPath(path)
if err != nil {
return nil, err
}
mConf := &LoadBalanceZkConf{format: format, activeList: zlist, confIpWeight: conf, zkHosts: zkHosts, path: path}
mConf.WatchConf()
return mConf, nil
}
type Observer interface {
Update()
}
type LoadBalanceObserver struct {
ModuleConf *LoadBalanceZkConf
}
func (l *LoadBalanceObserver) Update() {
fmt.Println("Update get conf:", l.ModuleConf.GetConf())
}
func NewLoadBalanceObserver(conf *LoadBalanceZkConf) *LoadBalanceObserver {
return &LoadBalanceObserver{
ModuleConf: conf,
}
}