/
etcd_balancer.go
145 lines (124 loc) · 3.19 KB
/
etcd_balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package etcd
import (
"context"
"time"
"go.uber.org/zap"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
)
// Resolver for grpc client
type Resolver struct {
closeCh chan struct{}
watchCh clientv3.WatchChan
cli *clientv3.Client
keyPrifix string
srvAddrsList []resolver.Address
cc resolver.ClientConn
logger *zap.Logger
}
// NewResolver create a new resolver.Builder base on etcd
func NewResolver(logger *zap.Logger) *Resolver {
return &Resolver{
logger: logger,
}
}
// Scheme returns the scheme supported by this resolver.
func (r *Resolver) Scheme() string {
return ""
}
// Build creates a new resolver.Resolver for the given target
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.cc = cc
r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority})
if _, err := r.start(); err != nil {
return nil, err
}
return r, nil
}
// ResolveNow resolver.Resolver interface
func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {}
// Close resolver.Resolver interface
func (r *Resolver) Close() {
r.closeCh <- struct{}{}
}
// start
func (r *Resolver) start() (chan<- struct{}, error) {
var err error
r.cli = cli
resolver.Register(r)
r.closeCh = make(chan struct{})
if err = r.sync(); err != nil {
return nil, err
}
go r.watch()
return r.closeCh, nil
}
// watch update events
func (r *Resolver) watch() {
ticker := time.NewTicker(time.Minute)
r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())
for {
select {
case <-r.closeCh:
return
case res, ok := <-r.watchCh:
if ok {
r.update(res.Events)
}
case <-ticker.C:
if err := r.sync(); err != nil {
r.logger.Error("sync failed", zap.Error(err))
}
}
}
}
// update
func (r *Resolver) update(events []*clientv3.Event) {
for _, ev := range events {
var info Server
var err error
switch ev.Type {
case mvccpb.PUT:
info, err = ParseValue(ev.Kv.Value)
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
if !Exist(r.srvAddrsList, addr) {
r.srvAddrsList = append(r.srvAddrsList, addr)
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
}
case mvccpb.DELETE:
info, err = SplitPath(string(ev.Kv.Key))
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr}
if s, ok := Remove(r.srvAddrsList, addr); ok {
r.srvAddrsList = s
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
}
}
}
}
// sync 同步获取所有地址信息
func (r *Resolver) sync() error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
if err != nil {
return err
}
r.srvAddrsList = []resolver.Address{}
for _, v := range res.Kvs {
info, err := ParseValue(v.Value)
if err != nil {
continue
}
addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
r.srvAddrsList = append(r.srvAddrsList, addr)
}
r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
return nil
}