-
Notifications
You must be signed in to change notification settings - Fork 684
/
d_resource.go
406 lines (364 loc) · 10.9 KB
/
d_resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
package daemon
import (
"fmt"
"strings"
"syscall"
"time"
"github.com/pkg/errors"
"github.com/datawire/ambassador/pkg/supervisor"
)
// Resource represents one thing managed by edgectl daemon. Examples include
// network intercepts (via teleproxy intercept) and cluster connectivity.
type Resource interface {
Name() string
IsOkay() bool
Close() error
}
// ResourceBase has helpers to create a monitored resource
type ResourceBase struct {
name string
doCheck func(*supervisor.Process) error
doQuit func(*supervisor.Process) error
tasks chan func(*supervisor.Process) error
okay bool // (monitor) cmd is running and check passes
transAt time.Time // (monitor) time of transition (okay value changed)
done bool // (Close) to get everything to quit
end chan struct{} // (Close) closed when the processor finishes
}
// Name implements Resource
func (rb *ResourceBase) Name() string {
res := make(chan string)
rb.tasks <- func(_ *supervisor.Process) error {
res <- rb.name
return nil
}
return <-res
}
// IsOkay returns whether the resource is okay as far as monitoring is aware
func (rb *ResourceBase) IsOkay() bool {
res := make(chan bool)
rb.tasks <- func(_ *supervisor.Process) error {
res <- rb.okay
return nil
}
return <-res
}
// Close shuts down this resource
func (rb *ResourceBase) Close() error {
if rb.tasks != nil {
rb.tasks <- rb.quit
<-rb.end // Wait until things have closed
rb.tasks = nil
}
return nil
}
func (rb *ResourceBase) setup(sup *supervisor.Supervisor, name string) {
rb.name = name
rb.tasks = make(chan func(*supervisor.Process) error, 10)
rb.end = make(chan struct{})
rb.transAt = time.Now()
sup.Supervise(&supervisor.Worker{
Name: name,
Work: rb.processor,
})
sup.Supervise(&supervisor.Worker{
Name: name + "/shutdown",
Work: func(p *supervisor.Process) error {
select {
case <-p.Shutdown():
p.Log("daemon is shutting down")
return rb.Close()
case <-rb.end:
p.Log("Close() complete")
return nil
}
},
})
}
func (rb *ResourceBase) quit(p *supervisor.Process) error {
p.Log("Close() / resource quit() called")
return rb.doQuit(p)
}
func (rb *ResourceBase) monitor(p *supervisor.Process) error {
old := rb.okay
p.Log("monitor: checking...")
if err := rb.doCheck(p); err != nil {
rb.okay = false // Check failed is not okay
p.Logf("monitor: check failed: %v", err)
} else {
p.Log("monitor: check passed")
rb.okay = true
}
if old != rb.okay {
Notify(p, fmt.Sprintf("%s: %t -> %t after %s", rb.name, old, rb.okay, time.Since(rb.transAt)))
rb.transAt = time.Now()
}
return nil
}
func (rb *ResourceBase) processor(p *supervisor.Process) error {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
defer close(rb.end)
p.Ready()
for {
var task func(*supervisor.Process) error
select {
case fn := <-rb.tasks: // There is work to do
task = fn
case <-ticker.C: // Ticker says it's time to monitor
task = rb.monitor
}
if err := task(p); err != nil {
p.Logf("task failed: %v", err)
return err
}
if rb.done {
Notify(p, fmt.Sprintf("%s: %t -> Closed after %s", rb.name, rb.okay, time.Since(rb.transAt)))
p.Log("done")
return nil
}
}
}
// KCluster is a Kubernetes cluster reference
type KCluster struct {
context string
namespace string
server string
rai *RunAsInfo
kargs []string
isBridgeOkay func() bool
ResourceBase
}
// RAI returns the RunAsInfo for this cluster
func (c *KCluster) RAI() *RunAsInfo {
return c.rai
}
// GetKubectlArgs returns the kubectl command arguments to run a
// kubectl command with this cluster, including the namespace argument.
func (c *KCluster) GetKubectlArgs(args ...string) []string {
return c.getKubectlArgs(true, args...)
}
// GetKubectlArgsNoNamespace returns the kubectl command arguments to run a
// kubectl command with this cluster, but without the namespace argument.
func (c *KCluster) GetKubectlArgsNoNamespace(args ...string) []string {
return c.getKubectlArgs(false, args...)
}
func (c *KCluster) getKubectlArgs(includeNamespace bool, args ...string) []string {
cmdArgs := make([]string, 0, 1+len(c.kargs)+len(args))
cmdArgs = append(cmdArgs, "kubectl")
if c.context != "" {
cmdArgs = append(cmdArgs, "--context", c.context)
}
if includeNamespace {
if c.namespace != "" {
cmdArgs = append(cmdArgs, "--namespace", c.namespace)
}
}
cmdArgs = append(cmdArgs, c.kargs...)
cmdArgs = append(cmdArgs, args...)
return cmdArgs
}
// GetKubectlCmd returns a Cmd that runs kubectl with the given arguments and
// the appropriate environment to talk to the cluster
func (c *KCluster) GetKubectlCmd(p *supervisor.Process, args ...string) *supervisor.Cmd {
return c.rai.Command(p, c.GetKubectlArgs(args...)...)
}
// GetKubectlCmdNoNamespace returns a Cmd that runs kubectl with the given arguments and
// the appropriate environment to talk to the cluster, but it doesn't supply a namespace
// arg.
func (c *KCluster) GetKubectlCmdNoNamespace(p *supervisor.Process, args ...string) *supervisor.Cmd {
return c.rai.Command(p, c.GetKubectlArgsNoNamespace(args...)...)
}
// Context returns the cluster's context name
func (c *KCluster) Context() string {
return c.context
}
// Server returns the cluster's server configuration
func (c *KCluster) Server() string {
return c.server
}
// SetBridgeCheck sets the callable used to check whether the Teleproxy bridge
// is functioning. If this is nil/unset, cluster monitoring checks the cluster
// directly (via kubectl)
func (c *KCluster) SetBridgeCheck(isBridgeOkay func() bool) {
c.isBridgeOkay = isBridgeOkay
}
// check for cluster connectivity
func (c *KCluster) check(p *supervisor.Process) error {
// If the bridge is okay then the cluster is okay
if c.isBridgeOkay != nil && c.isBridgeOkay() {
return nil
}
cmd := c.GetKubectlCmd(p, "get", "po", "ohai", "--ignore-not-found")
return cmd.Run()
}
// TrackKCluster tracks connectivity to a cluster
func TrackKCluster(
p *supervisor.Process, rai *RunAsInfo, context, namespace string, kargs []string,
) (*KCluster, error) {
c := &KCluster{
rai: rai,
kargs: kargs,
context: context,
namespace: namespace,
}
c.doCheck = c.check
c.doQuit = func(p *supervisor.Process) error { c.done = true; return nil }
if err := c.check(p); err != nil {
return nil, errors.Wrap(err, "initial cluster check")
}
if c.context == "" {
cmd := c.GetKubectlCmd(p, "config", "current-context")
p.Logf("%s %v", cmd.Path, cmd.Args[1:])
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrap(err, "kubectl config current-context")
}
c.context = strings.TrimSpace(string(output))
}
p.Logf("Context: %s", c.context)
if c.namespace == "" {
nsQuery := fmt.Sprintf("jsonpath={.contexts[?(@.name==\"%s\")].context.namespace}", c.context)
cmd := c.GetKubectlCmd(p, "config", "view", "-o", nsQuery)
p.Logf("%s %v", cmd.Path, cmd.Args[1:])
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrap(err, "kubectl config view ns")
}
c.namespace = strings.TrimSpace(string(output))
if c.namespace == "" { // This is what kubens does
c.namespace = "default"
}
}
p.Logf("Namespace: %s", c.namespace)
cmd := c.GetKubectlCmd(p, "config", "view", "--minify", "-o", "jsonpath={.clusters[0].cluster.server}")
p.Logf("%s %v", cmd.Path, cmd.Args[1:])
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrap(err, "kubectl config view server")
}
c.server = strings.TrimSpace(string(output))
p.Logf("Server: %s", c.server)
c.setup(p.Supervisor(), "cluster")
return c, nil
}
// crCmd is a handle to a checked retrying command
type crCmd struct {
args []string
rai *RunAsInfo
check func(p *supervisor.Process) error
startGrace time.Duration
cmd *supervisor.Cmd // (run loop) tracks the cmd for killing it
quitting bool // (run loop) enables Close()
startedAt time.Time
ResourceBase
}
// CheckedRetryingCommand launches a command, restarting it repeatedly if it
// quits, and killing and restarting it if it fails the given check.
func CheckedRetryingCommand(
p *supervisor.Process, name string, args []string, rai *RunAsInfo,
check func(*supervisor.Process) error, startGrace time.Duration,
) (Resource, error) {
if check == nil {
check = func(*supervisor.Process) error { return nil }
}
crc := &crCmd{
args: args,
rai: rai,
check: check,
startGrace: startGrace,
}
crc.ResourceBase.doCheck = crc.doCheck
crc.ResourceBase.doQuit = crc.doQuit
crc.setup(p.Supervisor(), name)
if err := crc.launch(p); err != nil {
return nil, errors.Wrapf(err, "initial launch of %s", name)
}
return crc, nil
}
func (crc *crCmd) subprocessEnded(p *supervisor.Process) error {
p.Log("end: subprocess ended")
crc.cmd = nil
if crc.quitting {
p.Log("end: marking as done")
crc.done = true
}
return nil
}
func (crc *crCmd) launch(p *supervisor.Process) error {
if crc.cmd != nil {
panic(fmt.Errorf("launching %s: already launched", crc.name))
}
// Launch the subprocess (set up logging using a worker)
p.Logf("Launching %s...", crc.name)
launchErr := make(chan error)
p.Supervisor().Supervise(&supervisor.Worker{
Name: crc.name + "/out",
Work: func(p *supervisor.Process) error {
crc.cmd = crc.rai.Command(p, crc.args...)
launchErr <- crc.cmd.Start()
// Wait for the subprocess to end. Another worker will
// call kill() on shutdown (via quit()) so we don't need
// to worry about supervisor shutdown ourselves.
if err := crc.cmd.Wait(); err != nil {
p.Log(err)
}
crc.tasks <- crc.subprocessEnded
return nil
},
})
// Wait for it to start
select {
case err := <-launchErr:
if err != nil {
return err
}
case <-p.Shutdown():
return nil
}
crc.startedAt = time.Now()
p.Logf("Launched %s", crc.name)
return nil
}
func (crc *crCmd) kill(p *supervisor.Process) error {
if crc.cmd != nil {
p.Log("kill: sending signal")
if err := crc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
p.Logf("kill: failed (ignoring): %v", err)
}
} else {
p.Log("kill: no subprocess to kill")
}
return nil
}
func (crc *crCmd) doQuit(p *supervisor.Process) error {
crc.quitting = true
return crc.kill(p)
}
// doCheck determines whether the subprocess is running and healthy
func (crc *crCmd) doCheck(p *supervisor.Process) error {
if crc.cmd == nil {
if crc.quitting {
p.Log("check: no subprocess + quitting -> done")
crc.done = true
return nil
}
p.Log("check: no subprocess -> launch")
crc.tasks <- crc.launch
return errors.New("not running")
}
if err := crc.check(p); err != nil {
p.Logf("check: failed: %v", err)
runTime := time.Since(crc.startedAt)
if runTime > crc.startGrace {
// Kill the process because it's in a bad state
p.Log("check: killing...")
_ = crc.kill(p)
} else {
p.Logf("check: not killing yet (%v < %v)", runTime, crc.startGrace)
}
return err // from crc.check() above
}
p.Log("check: passed")
return nil
}