forked from goodrain/rainbond
/
client.go
145 lines (124 loc) · 4.21 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package client
import (
"errors"
"fmt"
"strings"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/goodrain/rainbond/pkg/mq/api/grpc/pb"
clientv3 "github.com/coreos/etcd/clientv3"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/naming"
)
//NewMqClient new a mq client
func NewMqClient(endpoint string) (pb.TaskQueueClient, error) {
ctx := context.Background()
//TODO:
//实现客户端服务发现和负载均衡
//b := grpc.RoundRobin(newResolver())
//conn, err := grpc.DialContext(ctx, "http://127.0.0.1:2379", grpc.WithInsecure(), grpc.WithBalancer(b))
//time.Sleep(time.Second * 4)
//目前grpc版本实现负载均衡有BUG,暂时不实现
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithInsecure())
if err != nil {
return nil, err
}
return pb.NewTaskQueueClient(conn), nil
}
// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
serviceName string // service name to resolve
}
// newResolver return resolver with service name
func newResolver() *resolver {
return &resolver{serviceName: "rainbond_mq"}
}
// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
logrus.Info("Resolve target %s", target)
if re.serviceName == "" {
return nil, errors.New("grpclb: no service name provided")
}
// generate etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
}
// Return watcher
return &watcher{re: re, client: *client}, nil
}
// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
re *resolver // re: Etcd Resolver
client clientv3.Client
isInitialized bool
}
// Close do nothing
func (w *watcher) Close() {
}
// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
// prefix is the etcd prefix/value to watch
prefix := fmt.Sprintf("/traefik/backends/%s/servers", w.re.serviceName)
// check if is initialized
if !w.isInitialized {
// query addresses from etcd
resp, err := w.client.Get(context.Background(), prefix, clientv3.WithPrefix())
w.isInitialized = true
if err == nil {
addrs := extractAddrs(resp)
//if not empty, return the updates or watcher new dir
if l := len(addrs); l != 0 {
updates := make([]*naming.Update, l)
for i := range addrs {
updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
}
logrus.Info("Servers:", updates)
return updates, nil
}
}
}
// generate etcd Watcher
rch := w.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT:
return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
case mvccpb.DELETE:
return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
}
}
}
return nil, nil
}
func extractAddrs(resp *clientv3.GetResponse) []string {
addrs := []string{}
if resp == nil || resp.Kvs == nil {
return addrs
}
for i := range resp.Kvs {
if v := resp.Kvs[i].Value; v != nil {
addrs = append(addrs, string(v))
}
}
return addrs
}