-
Notifications
You must be signed in to change notification settings - Fork 684
/
connectwatcher.go
131 lines (104 loc) · 2.99 KB
/
connectwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package consulwatch
import (
"context"
"errors"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"github.com/datawire/dlib/dlog"
)
type ConnectLeafWatcher struct {
consul *api.Client
plan *watch.Plan
}
func NewConnectLeafWatcher(consul *api.Client, service string) (*ConnectLeafWatcher, error) {
if service == "" {
err := errors.New("service name is empty")
return nil, err
}
watcher := &ConnectLeafWatcher{consul: consul}
plan, err := watch.Parse(map[string]interface{}{"type": "connect_leaf", "service": service})
if err != nil {
return nil, err
}
watcher.plan = plan
return watcher, nil
}
func (w *ConnectLeafWatcher) Watch(handler func(*Certificate, error)) {
w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
if raw == nil {
handler(nil, fmt.Errorf("unexpected empty/nil response from consul"))
return
}
v, ok := raw.(*api.LeafCert)
if !ok {
handler(nil, fmt.Errorf("unexpected raw type. expected: %T, was: %T", &api.LeafCert{}, raw))
return
}
certificate := &Certificate{
PEM: v.CertPEM,
PrivateKeyPEM: v.PrivateKeyPEM,
ValidBefore: v.ValidBefore,
ValidAfter: v.ValidAfter,
SerialNumber: v.SerialNumber,
Service: v.Service,
ServiceURI: v.ServiceURI,
}
handler(certificate, nil)
}
}
func (w *ConnectLeafWatcher) Start(ctx context.Context) error {
return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
}
func (w *ConnectLeafWatcher) Stop() {
w.plan.Stop()
}
// ConnectCARootsWatcher watches the Consul Connect CA roots endpoint for changes and invokes a a handler function
// whenever it changes.
type ConnectCARootsWatcher struct {
consul *api.Client
plan *watch.Plan
}
func NewConnectCARootsWatcher(consul *api.Client) (*ConnectCARootsWatcher, error) {
watcher := &ConnectCARootsWatcher{consul: consul}
plan, err := watch.Parse(map[string]interface{}{"type": "connect_roots"})
if err != nil {
return nil, err
}
watcher.plan = plan
return watcher, nil
}
func (w *ConnectCARootsWatcher) Watch(handler func(*CARoots, error)) {
w.plan.HybridHandler = func(val watch.BlockingParamVal, raw interface{}) {
if raw == nil {
handler(nil, fmt.Errorf("unexpected empty/nil response from consul"))
return
}
v, ok := raw.(*api.CARootList)
if !ok {
handler(nil, fmt.Errorf("unexpected raw type. expected: %T, was: %T", &api.CARootList{}, raw))
return
}
rootsMap := make(map[string]CARoot)
for _, root := range v.Roots {
rootsMap[root.ID] = CARoot{
ID: root.ID,
Name: root.Name,
PEM: root.RootCertPEM,
Active: root.Active,
}
}
roots := &CARoots{
ActiveRootID: v.ActiveRootID,
TrustDomain: v.TrustDomain,
Roots: rootsMap,
}
handler(roots, nil)
}
}
func (w *ConnectCARootsWatcher) Start(ctx context.Context) error {
return w.plan.RunWithClientAndLogger(w.consul, dlog.StdLogger(ctx, dlog.LogLevelInfo))
}
func (w *ConnectCARootsWatcher) Stop() {
w.plan.Stop()
}