-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
143 lines (133 loc) · 3.26 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package rununit
import (
uberatomic "go.uber.org/atomic"
"github.com/cybriq/p9/pkg/log"
"github.com/cybriq/p9/pkg/pipe"
"github.com/cybriq/p9/pkg/interrupt"
"github.com/cybriq/p9/pkg/qu"
)
// RunUnit handles correctly starting and stopping child processes that have StdConn pipe logging enabled, allowing
// custom hooks to run on start and stop,
type RunUnit struct {
name string
args []string
running, shuttingDown uberatomic.Bool
commandChan chan bool
worker *pipe.Worker
quit qu.C
}
// New creates and starts a new rununit. run and stop functions are executed after starting and stopping. logger
// receives log entries and processes them (such as logging them).
func New(
name string, run, stop func(), logger func(ent *log.Entry) (e error),
pkgFilter func(pkg string) (out bool), quit qu.C, args ...string,
) (r *RunUnit) {
r = &RunUnit{
name: name,
args: args,
commandChan: make(chan bool),
quit: qu.T(),
}
r.running.Store(false)
r.shuttingDown.Store(false)
go func() {
D.Ln("run unit command loop", args)
var e error
out:
for {
select {
case cmd := <-r.commandChan:
switch cmd {
case true:
D.Ln(r.running.Load(), "run called for", args)
if r.running.Load() {
D.Ln("already running", args)
continue
}
if r.worker != nil {
if e = r.worker.Kill(); E.Chk(e) {
}
}
// quit from rununit's quit, which closes after the main quit triggers stopping in the watcher loop
r.worker = pipe.LogConsume(r.quit, logger, pkgFilter,
args...,
)
// D.S(r.worker)
pipe.Start(r.worker)
r.running.Store(true)
run()
// D.Ln(r.running.Load())
case false:
running := r.running.Load()
D.Ln("stop called for", args, running)
if !running {
D.Ln("wasn't running", args)
continue
}
pipe.Kill(r.worker)
// var e error
// if e = r.worker.Wait(); E.Chk(e) {
// }
r.running.Store(false)
stop()
D.Ln(args, "after stop", r.running.Load())
}
break
case <-r.quit.Wait():
D.Ln("runner stopped for", args)
break out
}
}
}()
// when the main quit signal is triggered, stop the run unit cleanly
go func() {
out:
select {
case <-quit.Wait():
D.Ln("runner quit trigger called", args)
running := r.running.Load()
if !running {
D.Ln("wasn't running", args)
break out
}
// r.quit.Q()
pipe.Kill(r.worker)
var e error
if e = r.worker.Wait(); E.Chk(e) {
}
r.running.Store(false)
stop()
D.Ln(args, "after stop", r.running.Load())
}
}()
interrupt.AddHandler(
func() {
quit.Q()
},
)
return
}
// Running returns whether the unit is running
func (r *RunUnit) Running() bool {
return r.running.Load()
}
// Start signals the run unit to start
func (r *RunUnit) Start() {
r.commandChan <- true
}
// Stop signals the run unit to stop
func (r *RunUnit) Stop() {
r.commandChan <- false
}
// Shutdown terminates the run unit
func (r *RunUnit) Shutdown() {
// debug.PrintStack()
if !r.shuttingDown.Load() {
r.shuttingDown.Store(true)
r.quit.Q()
}
}
// ShuttingDown returns true if the server is shuting down
func (r *RunUnit) ShuttingDown() bool {
return r.shuttingDown.Load()
}