-
Notifications
You must be signed in to change notification settings - Fork 3
/
etcd.go
119 lines (94 loc) · 2.72 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package etcdv3
import (
"context"
"fmt"
"log/slog"
"time"
"dario.cat/mergo"
"github.com/go-kod/kod"
"github.com/go-kod/kod/ext/registry"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/naming/endpoints"
"google.golang.org/grpc"
gresolver "google.golang.org/grpc/resolver"
)
// nolint
type Config struct {
Endpoints []string
Timeout time.Duration
TTL int
}
// nolint
// nolint
func (r Config) Build(ctx context.Context) (*client, error) {
err := mergo.Merge(&r, Config{
Timeout: 3 * time.Second,
TTL: 60,
})
if err != nil {
return nil, fmt.Errorf("failed to merge config: %w", err)
}
etcd, err := clientv3.New(clientv3.Config{
Endpoints: r.Endpoints,
DialTimeout: r.Timeout,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
return nil, fmt.Errorf("failed to create etcd client: %w", err)
}
cc := new(client)
cc.client = etcd
if r.TTL > 0 {
ss, err := concurrency.NewSession(cc.client, concurrency.WithTTL(r.TTL), concurrency.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("failed to create etcd session: %w", err)
}
cc.session = ss
}
manager, err := endpoints.NewManager(cc.client, cc.registryPrefix(ctx))
if err != nil {
return cc, err
}
cc.manager = manager
return cc, nil
}
type client struct {
client *clientv3.Client
manager endpoints.Manager
session *concurrency.Session
}
var _ registry.Registry = (*client)(nil)
// nolint
func (r *client) Register(ctx context.Context, info registry.ServiceInfo) error {
opts := []clientv3.OpOption{}
if r.session != nil {
opts = append(opts, clientv3.WithLease(r.session.Lease()))
}
slog.InfoContext(ctx, "Register service", "key", r.registryKey(ctx, info))
err := r.manager.AddEndpoint(context.Background(), r.registryKey(ctx, info), endpoints.Endpoint{
Addr: info.Addr,
Metadata: info.Metadata,
}, opts...)
if err != nil {
return err
}
return nil
}
// nolint
func (r *client) UnRegister(ctx context.Context, info registry.ServiceInfo) error {
slog.InfoContext(ctx, "UnRegister service", "key", r.registryKey(ctx, info))
return r.manager.DeleteEndpoint(context.Background(), r.registryKey(ctx, info))
}
// nolint
func (r *client) registryPrefix(ctx context.Context) string {
return fmt.Sprintf("%s/%s", kod.FromContext(ctx).Config().Env, kod.FromContext(ctx).Config().Name)
}
// nolint
func (r *client) registryKey(ctx context.Context, info registry.ServiceInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", kod.FromContext(ctx).Config().Env, kod.FromContext(ctx).Config().Name, info.Scheme, info.Addr)
}
// nolint
func (r *client) ResolveBuilder(ctx context.Context) (gresolver.Builder, error) {
return builder{c: r.client}, nil
}