/
discovery.go
108 lines (94 loc) · 2.54 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package service
import (
"context"
"log"
"strconv"
"strings"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/resolver"
)
const schema = "lb"
type ServiceDiscovery struct {
cli *clientv3.Client
cc resolver.ClientConn
serverList sync.Map
prefix string
}
func NewServiceDiscovery(endpoints []string) resolver.Builder {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
}
}
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
log.Println("Build")
s.cc = cc
s.prefix = "/" + target.Scheme + "/" + target.Endpoint + "/"
resp, err := s.cli.Get(context.Background(), s.prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
go s.watcher()
return s, nil
}
func (s *ServiceDiscovery) ResolveNow(rn resolver.ResolveNowOption) {
log.Println("ResolveNow")
}
func (s *ServiceDiscovery) Scheme() string {
return schema
}
func (s *ServiceDiscovery) Close() {
log.Println("Close")
s.cli.Close()
}
func (s *ServiceDiscovery) watcher() {
rch := s.cli.Watch(context.Background(), s.prefix, clientv3.WithPrefix())
log.Println("watching prefix: %s now...", s.prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE:
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
func (s *ServiceDiscovery) SetServiceList(key, val string) {
addr := resolver.Address{Addr: strings.TrimPrefix(key, s.prefix)}
nodeWeight, err := strconv.Atoi(val)
if err != nil {
nodeWeight = 1
}
addr = weight.SetAddrInfo(addr, weight.AddrInfo{Weight: nodeWeight})
s.serverList.Store(key, addr)
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("put key: ", key, "weight: ", val)
}
func (s *ServiceDiscovery) DelServiceList(key string) {
s.serverList.Delete(key)
s.cc.UpdateState(resolver.State{Addresses: s.getServices()})
log.Println("del key:", key)
}
func (s *ServiceDiscovery) getServices() []resolver.Address {
addrs := make([]resolver.Address, 0, 10)
s.serverList.Range(func(k, v interface{}) bool {
addrs = append(addrs, v.(resolver.Address))
return true
})
return addrs
}