/
sd.go
141 lines (126 loc) · 4.21 KB
/
sd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package sd
import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"github.com/PolarPanda611/trinitygo/httputil"
"github.com/PolarPanda611/trinitygo/util"
"github.com/coreos/etcd/clientv3"
etcdnaming "github.com/coreos/etcd/clientv3/naming"
"google.golang.org/grpc"
"google.golang.org/grpc/naming"
)
// ServiceMesh interface
type ServiceMesh interface {
GetClient() interface{}
RegService(projectName string, projectVersion string, serviceIP string, servicePort int, Tags []string, timeout int) error
DeRegService(projectName string, projectVersion string, serviceIP string, servicePort int, timeout int) error
}
// ServiceMeshEtcdImpl consul register
type ServiceMeshEtcdImpl struct {
// config
Address string // consul address
Port int
// runtime
client *clientv3.Client
}
// NewEtcdRegister New consul register
func NewEtcdRegister(address string, port int) (ServiceMesh, error) {
s := &ServiceMeshEtcdImpl{
Address: address,
Port: port,
}
cli, err := clientv3.NewFromURL(fmt.Sprintf("http://%v:%v", s.Address, s.Port))
if err != nil {
return nil, err
}
s.client = cli
return s, nil
}
// GetClient get etcd client
func (s *ServiceMeshEtcdImpl) GetClient() interface{} {
return s.client
}
// RegService register etcd service
func (s *ServiceMeshEtcdImpl) RegService(projectName string, projectVersion string, serviceIP string, servicePort int, Tags []string, timeout int) error {
r := &etcdnaming.GRPCResolver{Client: s.client}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
err := r.Update(ctx, util.GetServiceName(projectName), naming.Update{Op: naming.Add, Addr: fmt.Sprintf("%v:%v", serviceIP, servicePort), Metadata: fmt.Sprintf("%v", Tags)})
if err != nil {
return err
}
return nil
}
// DeRegService deregister service
func (s *ServiceMeshEtcdImpl) DeRegService(projectName string, projectVersion string, serviceIP string, servicePort int, timeout int) error {
r := &etcdnaming.GRPCResolver{Client: s.client}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
err := r.Update(ctx, util.GetServiceName(projectName), naming.Update{Op: naming.Delete, Addr: fmt.Sprintf("%v:%v", serviceIP, servicePort)})
if err != nil {
return err
}
return nil
}
// NewEtcdClientConn new etcd client connection
// for grpc
func NewEtcdClientConn(address string, port int, serviceName string, timeout int) (*grpc.ClientConn, error) {
cli, err := clientv3.NewFromURL(fmt.Sprintf("http://%v:%v", address, port))
if err != nil {
return nil, fmt.Errorf("failed to conn etcd client , %v", err)
}
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
ctx1, cel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cel()
conn, err := grpc.DialContext(ctx1, serviceName, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithBalancer(b))
if err != nil {
return nil, err
}
return conn, nil
}
// NewEtcdHTTPClient new etcd http client
func NewEtcdHTTPClient(address string, port int, serviceName string, timeout int) (*httputil.ServiceClient, error) {
cli, err := clientv3.NewFromURL(fmt.Sprintf("http://%v:%v", address, port))
if err != nil {
return nil, fmt.Errorf("failed to conn etcd client , %v", err)
}
ctx, cel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cel()
res, err := cli.Get(ctx, serviceName, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to get service , %v", err)
}
clients := extractAddrs(res)
client, err := NewRoundRobin(clients).Client()
if err != nil {
return nil, err
}
return client, nil
}
func extractAddrs(resp *clientv3.GetResponse) []httputil.ServiceClient {
addrs := make([]httputil.ServiceClient, 0)
if resp == nil || resp.Kvs == nil {
return nil
}
for i := range resp.Kvs {
if v := resp.Kvs[i].Value; v != nil {
var v naming.Update
if err := json.Unmarshal(resp.Kvs[i].Value, &v); err != nil {
panic(err)
}
addr := strings.Split(v.Addr, ":")[0]
port, _ := strconv.Atoi(strings.Split(v.Addr, ":")[1])
client := httputil.ServiceClient{
Addr: addr,
Port: port,
}
addrs = append(addrs, client)
}
}
return addrs
}