/
exec.go
444 lines (377 loc) · 12 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
package main
import (
"io"
"os"
"os/exec"
"sync"
"syscall"
"time"
)
/*
Runnable is a type of function that runs a process, one of the actions in the
action chain a user defines for Wago.
It takes a channel to receive a kill signal. When a file change occurs (or
program exit), a kill signal is sent to the Runnable by closing the channel.
Runnable then kills and cleans up the process.
It returns two channels:
- The first channel will send a signal if the runnable is "done", that the
chain can start the next action. This might mean the process has finished
or that has finished starting (as in the case of a daemon). The boolean
represents success, if it is false, the chain should abort.
- The second channel indicates by closing that the process has exited
completely. All processes must be completely exited to ensure resources
have been properly freed and the action chain can be safely started again.
New Runnables are created with one of the three constructors:
NewRunWait, NewDaemonTimer, NewDaemonTrigger.
*/
type Runnable func(chan struct{}) (chan bool, chan struct{})
// Cmd extends exec.Cmd to include channels and i/o necessary for advanced
// process management.
type Cmd struct {
*exec.Cmd
Name string
done chan bool
dead chan struct{}
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}
// newCmd is a constructor for Runnables to set up their internal exec.Cmd along
// with channels to manage state and i/o pipes.
func newCmd(command string) *Cmd {
cmd := &Cmd{
// -c is the POSIX switch for a shell to run a command
Cmd: exec.Command(*shell, "-c", command),
Name: command,
// These channels will only be used once.
// done is buffered so that the send can always succeed and the Runnable can
// proceed to cleanup.
done: make(chan bool, 1),
// dead sends by closing the channel and so does not need buffering.
dead: make(chan struct{}),
}
// Processes are set as a process group leader in a new process group. If it
// creates any child processes, they will also belong to the new group and
// allows us to kill all processes when necessary.
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
var err error
cmd.Stdin, err = cmd.StdinPipe()
if err != nil {
log.Fatal("Error making stdin (command, error):", cmd.Name, err)(9)
}
cmd.Stdout, err = cmd.StdoutPipe()
if err != nil {
log.Fatal("Error making stdout (command, error):", cmd.Name, err)(9)
}
cmd.Stderr, err = cmd.StderrPipe()
if err != nil {
log.Fatal("Error making stderr (command, error):", cmd.Name, err)(9)
}
return cmd
}
// kill nicely kills a process with escalating signals. This can only be called
// after a process has actually been started and so is only called internally
// by Runnables.
func (cmd *Cmd) kill(proc chan error) {
log.Info("Sending signal SIGTERM to command:", cmd.Name)
pgid, err := syscall.Getpgid(cmd.Process.Pid)
if err != nil {
if err.Error() == "no such process" {
log.Info("Process exited before SIGTERM:", cmd.Name)
} else {
log.Err("Error getting process group:", err)
}
return
}
if err := syscall.Kill(-pgid, syscall.SIGTERM); err != nil {
log.Warn("Failed to send SIGTERM, command must have exited (name, error):", cmd.Name, err)
return
}
// Give process time to exit…
timerDone := make(chan struct{})
timer := time.AfterFunc(time.Duration(*exitWait)*time.Millisecond, func() {
close(timerDone)
})
select {
case <-timerDone:
case <-proc:
timer.Stop()
return
}
log.Info("After exitwait, command still running, sending SIGKILL…")
if err := syscall.Kill(-pgid, syscall.SIGKILL); err != nil {
if err.Error() == "no such process" {
log.Info("Process exited before SIGKILL:", cmd.Name)
} else {
log.Err("Error killing command (cmd, error):", cmd.Name, err)
}
}
}
// NewRunWait constructs the Runnable RunWait.
func NewRunWait(command string) Runnable {
return func(kill chan struct{}) (chan bool, chan struct{}) {
log.Info("Running command, waiting:", command)
cmd := newCmd(command)
go cmd.RunWait(kill)
return cmd.done, cmd.dead
}
}
// RunWait starts a process and signals done after the process has exited.
//
// This is appropriate for actions like build commands which must complete successfully
// for the action chain to continue.
func (cmd *Cmd) RunWait(kill chan struct{}) {
defer close(cmd.done)
defer close(cmd.dead)
err := cmd.Start()
if err != nil {
// This error is a program, system or environment error (shell is set wrong).
// Because it is not recoverable between builds, it is fatal. The user needs
// to adjust their system or command invocation.
log.Fatal("Error starting command:", err)(6)
}
// The active process is now managed concurrently with signal management (below).
// proc signals the process exit by closing.
proc := make(chan error)
go func() {
var wg sync.WaitGroup
// Subscribe to stdin, allows the process to receive input from the user.
subStdin <- cmd
copyPipe(cmd.Stdout, os.Stdout, &wg)
copyPipe(cmd.Stderr, os.Stderr, &wg)
// Wait for both copyPipes to finish. They will exit when the process has exited.
wg.Wait()
unsubStdin <- cmd
proc <- cmd.Wait()
close(proc)
}()
// We wait now for either the process to exit or a kill request. If the process
// exits, we return success status so that action chain can conditionally continue.
// If we receive a kill signal, the exit status no longer matters and isn't tracked.
select {
case err := <-proc:
if err != nil {
log.Err("Command error:", err)
cmd.done <- false
} else {
cmd.done <- true
}
case <-kill:
cmd.kill(proc)
}
// Runnables must not return until the process has exited completely.
// TODO: All three Runnables end with <-proc however a code read suggests it is no
// longer necessary. At this point either the process has exited or been killed.
<-proc
}
// NewDaemonTimer constructs the Runnable RunDaemonTimer.
func NewDaemonTimer(command string, period int) Runnable {
return func(kill chan struct{}) (chan bool, chan struct{}) {
log.Info("Starting daemon:", command)
cmd := newCmd(command)
go cmd.RunDaemonTimer(kill, period)
return cmd.done, cmd.dead
}
}
// RunDaemonTimer starts a daemon, waits period milliseconds, then signals done
// for the action chain to continue.
//
// This is used for daemons without a timer, period is set to 0.
//
// Because Wago will warn the user when a daemon has exited (even with success)
// this can be used for for regular commands that do not have any output as the
// user will be told when it has completed.
func (cmd *Cmd) RunDaemonTimer(kill chan struct{}, period int) {
defer close(cmd.done)
defer close(cmd.dead)
err := cmd.Start()
if err != nil {
// This error is a program, system or environment error (shell is set wrong).
// Because it is not recoverable between builds, it is fatal. The user needs
// to adjust their system or command invocation.
log.Fatal("Error starting daemon:", err)(7)
}
// The active process is now managed concurrently with signal management (below).
// proc signals the process exit by closing.
proc := make(chan error)
go func() {
var wg sync.WaitGroup
// Subscribe to stdin, allows the process to receive input from the user.
subStdin <- cmd
copyPipe(cmd.Stdout, os.Stdout, &wg)
copyPipe(cmd.Stderr, os.Stderr, &wg)
wg.Wait()
unsubStdin <- cmd
proc <- cmd.Wait()
close(proc)
}()
// timerDone signals by closing.
timerDone := make(chan struct{})
log.Debug("Waiting milliseconds:", period)
timer := time.AfterFunc(time.Duration(period)*time.Millisecond, func() {
close(timerDone)
})
// Signal management.
select {
case <-timerDone:
log.Debug("Daemon timer done")
cmd.done <- true
// Timer is done, but we still need to wait for an exit/kill. This nested
// select duplicates the two cases of the parent select.
select {
case err := <-proc:
if err != nil {
log.Err("Daemon error:", err)
cmd.done <- false
} else {
// A daemon probably shouldn't be exiting, warn the user.
log.Warn("Daemon exited cleanly")
cmd.done <- true
}
case <-kill:
cmd.kill(proc)
}
case err := <-proc:
timer.Stop()
if err != nil {
log.Err("Daemon error:", err)
cmd.done <- false
} else {
log.Warn("Daemon exited cleanly")
cmd.done <- true
}
case <-kill:
timer.Stop()
cmd.kill(proc)
}
// Runnables must not return until the process has exited completely.
// TODO: All three Runnables end with <-proc however a code read suggests it is no
// longer necessary. At this point either the process has exited or been killed.
<-proc
}
// NewDaemonTrigger constructs the Runnable RunDaemonTrigger.
func NewDaemonTrigger(command string, trigger string) Runnable {
return func(kill chan struct{}) (chan bool, chan struct{}) {
log.Info("Starting daemon:", command)
cmd := newCmd(command)
go cmd.RunDaemonTrigger(kill, trigger)
return cmd.done, cmd.dead
}
}
// RunDaemonTrigger starts a daemon, waits until trigger is emitted by the process
// stdout or stderr, then signals done for the action chain to continue.
//
// This is useful for running daemons that have some setup and then output a ready
// status like "Listening on port…"
func (cmd *Cmd) RunDaemonTrigger(kill chan struct{}, trigger string) {
defer close(cmd.done)
defer close(cmd.dead)
err := cmd.Start()
if err != nil {
log.Fatal("Error starting daemon:", err)(7)
}
key := []byte(trigger)
match := make(chan struct{})
// watchPipe watches an output stream for the trigger text. When the trigger
// text is seen, it stops checking and simply copies.
//
// TODO: Consider abstracting further and moving to io.go.
// TODO: This reads one byte at a time. Intead, we should fill a buffer, similar
// to what ManageUserInput does.
watchPipe := func(in io.Reader, out io.Writer) {
b := make([]byte, 1)
spot := 0
for {
// Check for a trigger match
select {
case <-match:
// If there is a match, call io.Copy which will simply copy output to stdout
// until the process exits.
_, err := io.Copy(out, in)
if err != nil {
log.Err("Unwatched pipe has errored:", err)
}
return
default:
// If there is no match, fallthrough and check the next byte. This paradigm
// is necessary as len(chan) only works when chan is buffered.
}
n, err := in.Read(b)
if n > 0 {
out.Write(b[:n])
if b[0] == key[spot] {
spot++
if spot == len(key) {
log.Debug("Trigger match")
close(match)
}
}
}
if err != nil {
if err.Error() != "EOF" {
log.Err("Watched pipe error:", err)
}
return
}
}
}
// The active process is now managed concurrently with signal management (below).
// proc signals the process exit by closing.
proc := make(chan error)
go func() {
var wg sync.WaitGroup
// Subscribe to stdin, allows the process to receive input from the user.
subStdin <- cmd
wg.Add(2)
go func() {
watchPipe(cmd.Stdout, os.Stdout)
wg.Done()
}()
go func() {
watchPipe(cmd.Stderr, os.Stderr)
wg.Done()
}()
wg.Wait()
unsubStdin <- cmd
proc <- cmd.Wait()
close(proc)
}()
// Signal management.
select {
case <-match:
log.Debug("Daemon trigger matched")
cmd.done <- true
// Trigger is matched and we have signaled done, but we still need to wait for
// an exit/kill. This nested select duplicates the two cases of the parent select.
select {
case err := <-proc:
if err != nil {
log.Err("Daemon error:", err)
cmd.done <- false
} else {
// A daemon probably shouldn't be exiting, warn the user.
log.Warn("Daemon exited cleanly")
cmd.done <- true
}
case <-kill:
cmd.kill(proc)
}
case err := <-proc:
if err != nil {
log.Err("Daemon error:", err)
cmd.done <- false
} else {
// A daemon probably shouldn't be exiting, warn the user.
log.Warn("Daemon exited cleanly")
cmd.done <- true
}
case <-kill:
cmd.kill(proc)
}
// Runnables must not return until the process has exited completely.
// TODO: All three Runnables end with <-proc however a code read suggests it is no
// longer necessary. At this point either the process has exited or been killed.
<-proc
}