-
Notifications
You must be signed in to change notification settings - Fork 1
/
etcd.go
132 lines (117 loc) · 2.71 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package discovery
import (
"github.com/coreos/etcd/clientv3"
"time"
"context"
"encoding/json"
"github.com/coreos/etcd/mvcc/mvccpb"
"strings"
"github.com/bysir-zl/bygo/log"
"strconv"
)
var root = "/service/"
type Etcd struct {
cli *clientv3.Client
ttl int64
etcdEndpoints []string
}
func (p *Etcd) GetServers() (servers map[string]*Server, err error) {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
rsp, err := p.cli.Get(ctx, root, clientv3.WithPrefix())
if err != nil {
return
}
servers = make(map[string]*Server, len(rsp.Kvs))
for _, kv := range rsp.Kvs {
server := Server{}
err = json.Unmarshal(kv.Value, &server)
if err != nil {
return
}
servers[server.Id] = &server
}
return
}
func (p *Etcd) WatchServer(fun ServersChanged) {
ctx := context.TODO()
ch := p.cli.Watch(ctx, root, clientv3.WithPrefix())
go func() {
for {
select {
case c := <-ch:
for _, e := range c.Events {
change := SC_Online
server := Server{}
if e.Type == mvccpb.PUT {
json.Unmarshal(e.Kv.Value, &server)
} else {
change = SC_Offline
server.Id = strings.Split(string(e.Kv.Key), root)[1]
}
fun(&server, change)
}
}
}
}()
// 通知所有服务上线
servers, err := p.GetServers()
if err != nil {
log.ErrorT("etcd", err)
return
}
for _, s := range servers {
fun(s, SC_Online)
}
}
func (p *Etcd) UpdateServerTTL(leaseId string) (err error) {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
id, err := strconv.ParseInt(leaseId, 10, 64)
if err != nil {
return
}
r, err := p.cli.KeepAlive(ctx, clientv3.LeaseID(id))
if err != nil {
return
}
// 这里必须接受并取出才能KeepAlive, 我也不知道为什么
<-r
return
}
func (p *Etcd) RegisterService(server *Server) (leaseId string, err error) {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
bs, _ := json.Marshal(server)
leaseOption, _ := p.cli.Grant(context.TODO(), p.ttl)
_, err = p.cli.Put(ctx, root+server.Id, string(bs), clientv3.WithLease(leaseOption.ID))
if err != nil {
return
}
leaseId = strconv.FormatInt(int64(leaseOption.ID), 10)
return
}
func (p *Etcd) UnRegisterService(serverId string) (err error) {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
_, err = p.cli.Delete(ctx, root+serverId)
return
}
func (p *Etcd) init() (err error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: p.etcdEndpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
return
}
p.cli = cli
return
}
func NewEtcd(etcdEndpoints []string, ttl int64) Discoverer {
etcd := &Etcd{
ttl: ttl,
etcdEndpoints: etcdEndpoints,
}
err := etcd.init()
if err != nil {
panic(err)
}
return etcd
}