/
resolver.go
99 lines (86 loc) · 2.04 KB
/
resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package loadbalance
import (
"context"
"fmt"
"sync"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
api "github.com/heyjun3/proglog/api/v1"
)
type Resolver struct {
mu sync.Mutex
clientConn resolver.ClientConn
resolverConn *grpc.ClientConn
serviceConfig *serviceconfig.ParseResult
logger *zap.Logger
}
var _ resolver.Builder = (*Resolver)(nil)
func (r *Resolver) Build(
target resolver.Target,
cc resolver.ClientConn,
opts resolver.BuildOptions,
) (resolver.Resolver, error) {
r.logger = zap.L().Named("resolver")
r.clientConn = cc
var dialOpts []grpc.DialOption
if opts.DialCreds != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(opts.DialCreds))
}
r.serviceConfig = r.clientConn.ParseServiceConfig(
fmt.Sprintf(`{"loadBalancingConfig": [{"%s": {}}]}`, Name),
)
var err error
r.resolverConn, err = grpc.Dial(target.URL.Host, dialOpts...)
if err != nil {
return nil, err
}
r.ResolveNow(resolver.ResolveNowOptions{})
return r, nil
}
const Name = "proglog"
func (r *Resolver) Scheme() string {
return Name
}
func init() {
resolver.Register(&Resolver{})
}
var _ resolver.Resolver = (*Resolver)(nil)
func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
client := api.NewLogClient(r.resolverConn)
ctx := context.Background()
res, err := client.GetServers(ctx, &api.GetServersRequest{})
if err != nil {
r.logger.Error(
"failed to resolve server",
zap.Error(err),
)
return
}
var addrs []resolver.Address
for _, server := range res.Servers {
addrs = append(addrs, resolver.Address{
Addr: server.RpcAddr,
Attributes: attributes.New(
"is_leader",
server.IsLeader,
),
})
}
r.clientConn.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: r.serviceConfig,
})
}
func (r *Resolver) Close() {
if err := r.resolverConn.Close(); err != nil {
r.logger.Error(
"failed to close conn",
zap.Error(err),
)
}
}