/
grpc_etcd_keep_live.go
81 lines (74 loc) · 2.02 KB
/
grpc_etcd_keep_live.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package grpc
import (
"context"
"fmt"
"log"
"time"
"github.com/hyetpang/go-frame/pkgs/common"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
)
// grpc 使用etcd心跳保活
func etcdKeepLive(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), servicePrefix, serviceName, addr string, client *clientv3.Client) {
go func() {
failedCount := 0
for {
select {
case resp := <-leaseChannel:
if resp != nil {
// log.Println("keep alive success.")
} else {
log.Println("keep alive failed.")
failedCount++
for failedCount > 3 {
cleanFunc()
if err := etcdRegisterService(ctx, servicePrefix, serviceName, addr, client); err != nil {
time.Sleep(time.Second)
continue
}
return
}
continue
}
case <-ctx.Done():
cleanFunc()
return
}
}
}()
}
// 服务注册
func etcdRegisterService(ctx context.Context, servicePrefix, serviceName, addr string, client *clientv3.Client) error {
// 创建一个租约
lease := clientv3.NewLease(client)
cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
leaseResp, err := lease.Grant(cancelCtx, 3)
if err != nil {
return err
}
leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID) // 长链接, 不用设置超时时间
if err != nil {
return err
}
em, err := endpoints.NewManager(client, servicePrefix)
if err != nil {
return err
}
cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
defer cancel()
if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", servicePrefix, serviceName, common.GenID()), endpoints.Endpoint{
Addr: addr,
}, clientv3.WithLease(leaseResp.ID)); err != nil {
return err
}
del := func() {
cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
defer cancel()
em.DeleteEndpoint(cancelCtx, serviceName)
lease.Close()
}
// 保持注册状态(连接断开重连)
etcdKeepLive(ctx, leaseChannel, del, servicePrefix, serviceName, addr, client)
return nil
}