/
process.go
83 lines (75 loc) · 1.48 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package plugin
import (
"bufio"
"log"
"os/exec"
)
// Process manages the start and shutdown of a plugin process
type Process struct {
path string
args []string
doneCh chan struct{}
pcmd *exec.Cmd
actorName string // Name of watcher/reactor used for logs
}
func NewProcess(path string, args []string, actorName string) *Process {
return &Process{
path: path,
args: args,
doneCh: make(chan struct{}),
actorName: actorName,
}
}
func (p *Process) Start() error {
p.pcmd = exec.Command(p.path, p.args...)
stdout, err := p.pcmd.StdoutPipe()
if err != nil {
return err
}
stderr, err := p.pcmd.StderrPipe()
if err != nil {
return err
}
p.pcmd.Start()
// Wait for plugin to print socket information, signals plugin is ready
rd := bufio.NewReader(stdout)
_, err = rd.ReadString('\n')
if err != nil {
p.Stop()
return err
}
go func() {
for {
line, err := rd.ReadString('\n')
if err != nil {
return
}
log.Printf("[Plugin %s] %s", p.actorName, line)
}
}()
go func() {
errRd := bufio.NewReader(stderr)
for {
line, err := errRd.ReadString('\n')
if err != nil {
return
}
log.Printf("[Plugin %s] %s", p.actorName, line)
}
}()
go func() {
p.pcmd.Wait()
close(p.doneCh)
log.Printf("[Plugin %s] Process stopped", p.actorName)
}()
return nil
}
func (p *Process) Wait() {
<-p.doneCh
}
func (p *Process) WaitCh() <-chan struct{} {
return p.doneCh
}
func (p *Process) Stop() {
p.pcmd.Process.Kill()
}