-
Notifications
You must be signed in to change notification settings - Fork 0
/
resolver.go
111 lines (97 loc) · 2.73 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package grpclib
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/coreos/etcd/clientv3"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/naming"
)
var defaultMapResolvers = &mapResolvers{Resolvers: make(map[string]*Resolver, 0)}
type mapResolvers struct {
Resolvers map[string]*Resolver
locker sync.RWMutex
}
func (p *mapResolvers) getResolvers(name string) (*Resolver, bool) {
p.locker.RLock()
defer p.locker.RUnlock()
r, ok := p.Resolvers[name]
if ok {
return r, true
}
return nil, false
}
func (p *mapResolvers) setResolvers(name string, r *Resolver) bool {
p.locker.Lock()
defer p.locker.Unlock()
p.Resolvers[name] = r
return true
}
// GetResolverConn get resolver connection
func GetResolverConn(name string) (*grpc.ClientConn, bool) {
r, ok := defaultMapResolvers.getResolvers(name)
if !ok {
return nil, false
}
if r.conn == nil {
return nil, false
}
return r.conn, true
}
// Resolver is the implementaion of grpc.naming.Resolver
type Resolver struct {
// service name to resolve
serviceName string
// etcd target
etcdTarget string
// timeout
timeout time.Duration
// grpc conn
client *clientv3.Client
cancel context.CancelFunc
conn *grpc.ClientConn
}
// NewResolver return resolver with service name
func NewResolver(serviceName, target string, timeout time.Duration) error {
_, ok := defaultMapResolvers.getResolvers(serviceName)
if ok {
return nil
}
// generate etcd client
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(target, ","),
})
if err != nil {
return fmt.Errorf("grpclib: creat clientv3 failed: %s", err.Error())
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
r := &Resolver{serviceName: serviceName, client: client, cancel: cancel}
b := grpc.RoundRobin(r)
conn, err := grpc.DialContext(ctx, target, grpc.WithInsecure(), grpc.WithBalancer(b))
if err != nil {
return err
}
r.conn = conn
if defaultMapResolvers.setResolvers(serviceName, r) {
return nil
}
return fmt.Errorf("grpclib: creat clientv3 failed: set resolver, %s", serviceName)
}
// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (p *Resolver) Resolve(target string) (naming.Watcher, error) {
if 0 == len(p.serviceName) {
return nil, errors.New("grpclib: no service name provided")
}
ctx, cancel := context.WithCancel(context.Background())
w := &watcher{
resolver: p,
target: fmt.Sprintf("/%s/%s/", prefix, p.serviceName),
ctx: ctx,
cancel: cancel}
// Return watcher
return w, nil
}