/
grpc_etcd.go
86 lines (82 loc) · 2.56 KB
/
grpc_etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package grpc
import (
"context"
"fmt"
"time"
"github.com/hyetpang/go-frame/pkgs/logs"
clientv3 "go.etcd.io/etcd/client/v3"
resolver "go.etcd.io/etcd/client/v3/naming/resolver"
"go.uber.org/fx"
"go.uber.org/zap"
"google.golang.org/grpc"
)
// 使用etcd作为服务发现
func NewServerEtcd(lc fx.Lifecycle, zapLog *zap.Logger, etcdClient *clientv3.Client) *grpc.Server {
s, lis, conf := newServer(zapLog)
serviceNamePrefix := conf.ServicePrefix
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
// 开始处理
err := make(chan error, 1)
go func() {
if e := s.Serve(lis); e != nil {
logs.Error("grpc监听出错", zap.Error(e))
err <- e
}
}()
select {
case e := <-err:
logs.Fatal("启动grpc serve出错", zap.Error(e))
return e
case <-ctx.Done():
logs.Fatal("启动grpc serve超时", zap.Error(ctx.Err()))
return ctx.Err()
case <-time.After(time.Second):
if len(conf.ServiceNames) > 0 {
for _, serviceName := range conf.ServiceNames {
// 服务注册
err := etcdRegisterService(context.TODO(), serviceNamePrefix, serviceName, conf.Address, etcdClient)
if err != nil {
logs.Fatal("注册服务出错", zap.Error(err))
}
logs.Info("注册GRPC服务", zap.String("服务名", serviceName))
}
} else {
for serviceName := range s.GetServiceInfo() {
// 服务注册
err := etcdRegisterService(context.TODO(), serviceNamePrefix, serviceName, conf.Address, etcdClient)
if err != nil {
logs.Fatal("注册服务出错", zap.Error(err))
}
logs.Info("注册GRPC服务", zap.String("服务名", serviceName))
}
}
logs.Debug("grpc start success", zap.String("address", conf.Address))
return nil
}
},
OnStop: func(ctx context.Context) error {
// 关闭服务
s.GracefulStop()
return nil
},
})
return s
}
// 使用etcd作为服务发现
func NewClientEtcd(lc fx.Lifecycle, zapLog *zap.Logger, etcdClient *clientv3.Client) map[string]*grpc.ClientConn {
conf := newConfig()
if len(conf.ServiceNames) < 1 {
logs.Fatal("grpc client 必须配置一个服务名字!")
}
etcdResolver, err := resolver.NewBuilder(etcdClient)
if err != nil {
logs.Fatal("创建etcd服务解析器对象出错", zap.Error(err))
}
clients := make(map[string]*grpc.ClientConn, len(conf.ServiceNames))
for _, serviceName := range conf.ServiceNames {
conn := newClient(fmt.Sprintf("%s:///%s/%s", etcdResolver.Scheme(), conf.ServicePrefix, serviceName), lc, zapLog, etcdResolver)
clients[serviceName] = conn
}
return clients
}