forked from hashicorp/consul
/
manager.go
519 lines (442 loc) · 16 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
package proxy
import (
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-multierror"
)
const (
// ManagerCoalescePeriod and ManagerQuiescentPeriod relate to how
// notifications in updates from the local state are colaesced to prevent
// lots of churn in the manager.
//
// When the local state updates, the manager will wait for quiescence.
// For each update, the quiscence timer is reset. If the coalesce period
// is reached, the manager will update proxies regardless of the frequent
// changes. Then the whole cycle resets.
ManagerCoalescePeriod = 5 * time.Second
ManagerQuiescentPeriod = 500 * time.Millisecond
// ManagerSnapshotPeriod is the interval that snapshots are taken.
// The last snapshot state is preserved and if it matches a file isn't
// written, so its safe for this to be reasonably frequent.
ManagerSnapshotPeriod = 1 * time.Second
)
// Manager starts, stops, snapshots, and restores managed proxies.
//
// The manager will not start or stop any processes until Start is called.
// Prior to this, any configuration, snapshot loading, etc. can be done.
// Even if a process is no longer running after loading the snapshot, it
// will not be restarted until Start is called.
//
// The Manager works by subscribing to change notifications on a local.State
// structure. Whenever a change is detected, the Manager syncs its internal
// state with the local.State and starts/stops any necessary proxies. The
// manager never holds a lock on local.State (except to read the proxies)
// and state updates may occur while the Manger is syncing. This is okay,
// since a change notification will be queued to trigger another sync.
//
// The change notifications from the local state are coalesced (see
// ManagerCoalescePeriod) so that frequent changes within the local state
// do not trigger dozens of proxy resyncs.
type Manager struct {
// State is the local state that is the source of truth for all
// configured managed proxies.
State *local.State
// Logger is the logger for information about manager behavior.
// Output for proxies will not go here generally but varies by proxy
// implementation type.
Logger *log.Logger
// DataDir is the path to the directory where data for proxies is
// written, including snapshots for any state changes in the manager.
// Within the data dir, files will be written in the following locatins:
//
// * logs/ - log files named <service id>-std{out|err}.log
// * pids/ - pid files for daemons named <service id>.pid
// * snapshot.json - the state of the manager
//
DataDir string
// Extra environment variables to set for the proxies
ProxyEnv []string
// SnapshotPeriod is the duration between snapshots. This can be set
// relatively low to ensure accuracy, because if the new snapshot matches
// the last snapshot taken, no file will be written. Therefore, setting
// this low causes only slight CPU/memory usage but doesn't result in
// disk IO. If this isn't set, ManagerSnapshotPeriod will be the default.
//
// This only has an effect if snapshots are enabled (DataDir is set).
SnapshotPeriod time.Duration
// CoalescePeriod and QuiescencePeriod control the timers for coalescing
// updates from the local state. See the defaults at the top of this
// file for more documentation. These will be set to those defaults
// by NewManager.
CoalescePeriod time.Duration
QuiescentPeriod time.Duration
// AllowRoot configures whether proxies can be executed as root (EUID == 0).
// If this is false then the manager will run and proxies can be added
// and removed but none will be started an errors will be logged
// to the logger.
AllowRoot bool
// lock is held while reading/writing any internal state of the manager.
// cond is a condition variable on lock that is broadcasted for runState
// changes.
lock *sync.Mutex
cond *sync.Cond
// runState is the current state of the manager. To read this the
// lock must be held. The condition variable cond can be waited on
// for changes to this value.
runState managerRunState
// lastSnapshot stores a pointer to the last snapshot that successfully
// wrote to disk. This is used for dup detection to prevent rewriting
// the same snapshot multiple times. snapshots should never be that
// large so keeping it in-memory should be cheap even for thousands of
// proxies (unlikely scenario).
lastSnapshot *snapshot
proxies map[string]Proxy
}
// NewManager initializes a Manager. After initialization, the exported
// fields should be configured as desired. To start the Manager, execute
// Run in a goroutine.
func NewManager() *Manager {
var lock sync.Mutex
return &Manager{
Logger: defaultLogger,
SnapshotPeriod: ManagerSnapshotPeriod,
CoalescePeriod: ManagerCoalescePeriod,
QuiescentPeriod: ManagerQuiescentPeriod,
lock: &lock,
cond: sync.NewCond(&lock),
proxies: make(map[string]Proxy),
}
}
// defaultLogger is the defaultLogger for NewManager so there it is never nil
var defaultLogger = log.New(os.Stderr, "", log.LstdFlags)
// managerRunState is the state of the Manager.
//
// This is a basic state machine with the following transitions:
//
// * idle => running, stopped
// * running => stopping, stopped
// * stopping => stopped
// * stopped => <>
//
type managerRunState uint8
const (
managerStateIdle managerRunState = iota
managerStateRunning
managerStateStopping
managerStateStopped
)
// Close stops the manager. Managed processes are NOT stopped.
func (m *Manager) Close() error {
m.lock.Lock()
defer m.lock.Unlock()
return m.stop(func(p Proxy) error {
return p.Close()
})
}
// Kill will Close the manager and Kill all proxies that were being managed.
// Only ONE of Kill or Close must be called. If Close has been called already
// then this will have no effect.
func (m *Manager) Kill() error {
m.lock.Lock()
defer m.lock.Unlock()
return m.stop(func(p Proxy) error {
return p.Stop()
})
}
// stop stops the run loop and cleans up all the proxies by calling
// the given cleaner. If the cleaner returns an error the proxy won't be
// removed from the map.
//
// The lock must be held while this is called.
func (m *Manager) stop(cleaner func(Proxy) error) error {
for {
// Special case state that exits the for loop
if m.runState == managerStateStopped {
break
}
switch m.runState {
case managerStateIdle:
// Idle so just set it to stopped and return. We notify
// the condition variable in case others are waiting.
m.runState = managerStateStopped
m.cond.Broadcast()
return nil
case managerStateRunning:
// Set the state to stopping and broadcast to all waiters,
// since Run is sitting on cond.Wait.
m.runState = managerStateStopping
m.cond.Broadcast()
m.cond.Wait() // Wait on the stopping event
case managerStateStopping:
// Still stopping, wait...
m.cond.Wait()
}
}
// Clean up all the proxies
var err error
for id, proxy := range m.proxies {
if err := cleaner(proxy); err != nil {
err = multierror.Append(
err, fmt.Errorf("failed to stop proxy %q: %s", id, err))
continue
}
// Remove it since it is already stopped successfully
delete(m.proxies, id)
}
return err
}
// Run syncs with the local state and supervises existing proxies.
//
// This blocks and should be run in a goroutine. If another Run is already
// executing, this will do nothing and return.
func (m *Manager) Run() {
m.lock.Lock()
if m.runState != managerStateIdle {
m.lock.Unlock()
return
}
// Set the state to running
m.runState = managerStateRunning
m.lock.Unlock()
// Start a goroutine that just waits for a stop request
stopCh := make(chan struct{})
go func() {
defer close(stopCh)
m.lock.Lock()
defer m.lock.Unlock()
// We wait for anything not running, just so we're more resilient
// in the face of state machine issues. Basically any state change
// will cause us to quit.
for m.runState == managerStateRunning {
m.cond.Wait()
}
}()
// When we exit, we set the state to stopped and broadcast to any
// waiting Close functions that they can return.
defer func() {
m.lock.Lock()
m.runState = managerStateStopped
m.cond.Broadcast()
m.lock.Unlock()
}()
// Register for proxy catalog change notifications
notifyCh := make(chan struct{}, 1)
m.State.NotifyProxy(notifyCh)
defer m.State.StopNotifyProxy(notifyCh)
// Start the timer for snapshots. We don't use a ticker because disk
// IO can be slow and we don't want overlapping notifications. So we only
// reset the timer once the snapshot is complete rather than continously.
snapshotTimer := time.NewTimer(m.SnapshotPeriod)
defer snapshotTimer.Stop()
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
SYNC:
for {
// Sync first, before waiting on further notifications so that
// we can start with a known-current state.
m.sync()
// Note for these variables we don't use a time.Timer because both
// periods are relatively short anyways so they end up being eligible
// for GC very quickly, so overhead is not a concern.
var quiescent, quantum <-chan time.Time
// Start a loop waiting for events from the local state store. This
// loops rather than just `select` so we can coalesce many state
// updates over a period of time.
for {
select {
case <-notifyCh:
// If this is our first notification since the last sync,
// reset the quantum timer which is the max time we'll wait.
if quantum == nil {
quantum = time.After(m.CoalescePeriod)
}
// Always reset the quiescent timer
quiescent = time.After(m.QuiescentPeriod)
case <-quantum:
continue SYNC
case <-quiescent:
continue SYNC
case <-snapshotTimer.C:
// Perform a snapshot
if path := m.SnapshotPath(); path != "" {
if err := m.snapshot(path, true); err != nil {
m.Logger.Printf("[WARN] agent/proxy: failed to snapshot state: %s", err)
}
}
// Reset
snapshotTimer.Reset(m.SnapshotPeriod)
case <-stopCh:
// Stop immediately, no cleanup
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
return
}
}
}
}
// sync syncs data with the local state store to update the current manager
// state and start/stop necessary proxies.
func (m *Manager) sync() {
m.lock.Lock()
defer m.lock.Unlock()
// If we don't allow root and we're root, then log a high sev message.
if !m.AllowRoot && isRoot() {
m.Logger.Println("[WARN] agent/proxy: running as root, will not start managed proxies")
return
}
// Get the current set of proxies
state := m.State.Proxies()
// Go through our existing proxies that we're currently managing to
// determine if they're still in the state or not. If they're in the
// state, we need to diff to determine if we're starting a new proxy
// If they're not in the state, then we need to stop the proxy since it
// is now orphaned.
for id, proxy := range m.proxies {
// Get the proxy.
stateProxy, ok := state[id]
if ok {
// Remove the proxy from the state so we don't start it new.
delete(state, id)
// Make the proxy so we can compare. This does not start it.
proxy2, err := m.newProxy(stateProxy)
if err != nil {
m.Logger.Printf("[ERROR] agent/proxy: failed to initialize proxy for %q: %s", id, err)
continue
}
// If the proxies are equal, then do nothing
if proxy.Equal(proxy2) {
continue
}
// Proxies are not equal, so we should stop it. We add it
// back to the state here (unlikely case) so the loop below starts
// the new one.
state[id] = stateProxy
// Continue out of `if` as if proxy didn't exist so we stop it
}
// Proxy is deregistered. Remove it from our map and stop it
delete(m.proxies, id)
if err := proxy.Stop(); err != nil {
m.Logger.Printf("[ERROR] agent/proxy: failed to stop deregistered proxy for %q: %s", id, err)
}
}
// Remaining entries in state are new proxies. Start them!
for id, stateProxy := range state {
proxy, err := m.newProxy(stateProxy)
if err != nil {
m.Logger.Printf("[ERROR] agent/proxy: failed to initialize proxy for %q: %s", id, err)
continue
}
if err := proxy.Start(); err != nil {
m.Logger.Printf("[ERROR] agent/proxy: failed to start proxy for %q: %s", id, err)
continue
}
m.proxies[id] = proxy
}
}
// newProxy creates the proper Proxy implementation for the configured
// local managed proxy.
func (m *Manager) newProxy(mp *local.ManagedProxy) (Proxy, error) {
// Defensive because the alternative is to panic which is not desired
if mp == nil || mp.Proxy == nil {
return nil, fmt.Errorf("internal error: nil *local.ManagedProxy or Proxy field")
}
p := mp.Proxy
// We reuse the service ID a few times
id := p.ProxyService.ID
// Create the Proxy. We could just as easily switch on p.ExecMode
// but I wanted there to be only location where ExecMode => Proxy so
// it lowers the chance that is wrong.
proxy, err := m.newProxyFromMode(p.ExecMode, id)
if err != nil {
return nil, err
}
// Depending on the proxy type we configure the rest from our ManagedProxy
switch proxy := proxy.(type) {
case *Daemon:
command := p.Command
// This should never happen since validation should happen upstream
// but verify it because the alternative is to panic below.
if len(command) == 0 {
return nil, fmt.Errorf("daemon mode managed proxy requires command")
}
// Build the command to execute.
var cmd exec.Cmd
cmd.Path = command[0]
cmd.Args = command // idx 0 is path but preserved since it should be
if err := m.configureLogDir(id, &cmd); err != nil {
return nil, fmt.Errorf("error configuring proxy logs: %s", err)
}
// Pass in the environmental variables for the proxy process
cmd.Env = append(m.ProxyEnv, os.Environ()...)
// Build the daemon structure
proxy.Command = &cmd
proxy.ProxyID = id
proxy.ProxyToken = mp.ProxyToken
return proxy, nil
default:
return nil, fmt.Errorf("unsupported managed proxy type: %q", p.ExecMode)
}
}
// newProxyFromMode just initializes the proxy structure from only the mode
// and the service ID. This is a shared method between newProxy and Restore
// so that we only have one location where we turn ExecMode into a Proxy.
func (m *Manager) newProxyFromMode(mode structs.ProxyExecMode, id string) (Proxy, error) {
switch mode {
case structs.ProxyExecModeDaemon:
return &Daemon{
Logger: m.Logger,
PidPath: pidPath(filepath.Join(m.DataDir, "pids"), id),
}, nil
default:
return nil, fmt.Errorf("unsupported managed proxy type: %q", mode)
}
}
// configureLogDir sets up the file descriptors to stdout/stderr so that
// they log to the proper file path for the given service ID.
func (m *Manager) configureLogDir(id string, cmd *exec.Cmd) error {
// Create the log directory
logDir := ""
if m.DataDir != "" {
logDir = filepath.Join(m.DataDir, "logs")
if err := os.MkdirAll(logDir, 0700); err != nil {
return err
}
}
// Configure the stdout, stderr paths
stdoutPath := logPath(logDir, id, "stdout")
stderrPath := logPath(logDir, id, "stderr")
// Open the files. We want to append to each. We expect these files
// to be rotated by some external process.
stdoutF, err := os.OpenFile(stdoutPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("error creating stdout file: %s", err)
}
stderrF, err := os.OpenFile(stderrPath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
if err != nil {
// Don't forget to close stdoutF which successfully opened
stdoutF.Close()
return fmt.Errorf("error creating stderr file: %s", err)
}
cmd.Stdout = stdoutF
cmd.Stderr = stderrF
return nil
}
// logPath is a helper to return the path to the log file for the given
// directory, service ID, and stream type (stdout or stderr).
func logPath(dir, id, stream string) string {
return filepath.Join(dir, fmt.Sprintf("%s-%s.log", id, stream))
}
// pidPath is a helper to return the path to the pid file for the given
// directory and service ID.
func pidPath(dir, id string) string {
// If no directory is given we do not write a pid
if dir == "" {
return ""
}
return filepath.Join(dir, fmt.Sprintf("%s.pid", id))
}