-
Notifications
You must be signed in to change notification settings - Fork 687
/
process.go
153 lines (134 loc) · 3.72 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package supervisor
import (
"context"
"fmt"
"runtime/debug"
"sync/atomic"
"github.com/pkg/errors"
)
// A Process represents a goroutine being run from a Worker.
type Process struct {
supervisor *Supervisor
worker *Worker
// Used to signal graceful shutdown.
shutdown chan struct{}
ready bool
shutdownClosed bool
}
// Supervisor returns the Supervisor that is managing this Process.
func (p *Process) Supervisor() *Supervisor {
return p.supervisor
}
// Worker returns the Worker that this Process is running.
func (p *Process) Worker() *Worker {
return p.worker
}
// Context returns the Process' context.
func (p *Process) Context() context.Context {
return p.supervisor.context
}
// Ready is called by the Process' Worker to notify the supervisor
// that it is now ready.
func (p *Process) Ready() {
p.Supervisor().change(func() {
p.ready = true
})
}
// Shutdown is used for graceful shutdown...
func (p *Process) Shutdown() <-chan struct{} {
return p.shutdown
}
// Log is used for logging...
func (p *Process) Log(obj interface{}) {
p.supervisor.Logger.Printf("%s: %v", p.Worker().Name, obj)
}
// Logf is used for logging...
func (p *Process) Logf(format string, args ...interface{}) {
p.supervisor.Logger.Printf("%s: %v", p.Worker().Name, fmt.Sprintf(format, args...))
}
// We would _like_ to have Debug and Debugf, but we can't really support that with
// dlog right now. So for now, these are no-ops.
func (p *Process) Debug(obj interface{}) {
// Yes, this is a no-op, see above.
if false {
p.supervisor.Logger.Printf("%s: %v", p.Worker().Name, obj)
}
}
func (p *Process) Debugf(format string, args ...interface{}) {
// Yes, this is a no-op, see above.
if false {
p.supervisor.Logger.Printf("%s: %v", p.Worker().Name, fmt.Sprintf(format, args...))
}
}
func (p *Process) allocateID() int64 {
return atomic.AddInt64(&p.Worker().children, 1)
}
// Go is shorthand for launching a child worker... it is named
// "<parent>[<child-count>]".
func (p *Process) Go(fn func(*Process) error) *Worker {
w := &Worker{
Name: fmt.Sprintf("%s[%d]", p.Worker().Name, p.allocateID()),
Work: fn,
}
p.Supervisor().Supervise(w)
return w
}
// GoName is shorthand for launching a named worker... it is named
// "<parent>.<name>".
func (p *Process) GoName(name string, fn func(*Process) error) *Worker {
w := &Worker{
Name: fmt.Sprintf("%s.%s", p.Worker().Name, name),
Work: fn,
}
p.Supervisor().Supervise(w)
return w
}
// Do is shorthand for proper shutdown handling while doing a
// potentially blocking activity. This method will return nil if the
// activity completes normally and an error if the activity panics or
// returns an error.
//
// If you want to know whether the work was aborted or might still be
// running when Do returns, then use DoClean like so:
//
// aborted := errors.New("aborted")
//
// err := p.DoClean(..., func() { return aborted })
//
// if err == aborted {
// ...
// }
func (p *Process) Do(fn func() error) (err error) {
return p.DoClean(fn, func() error { return nil })
}
// DoClean is the same as Process.Do() but executes the supplied clean
// function on abort.
func (p *Process) DoClean(fn, clean func() error) error {
sup := p.Supervisor()
done := make(chan error)
go func() {
var err error
defer func() {
if r := recover(); r != nil {
stack := string(debug.Stack())
err := errors.Errorf("FUNCTION PANICKED: %v\n%s", r, stack)
sup.mutex.Lock()
sup.errors = append(sup.errors, err)
sup.wantsShutdown = true
sup.mutex.Unlock()
}
select {
case done <- err:
default: // don't block if p.Shutdown() caused us to return early
}
close(done)
}()
err = fn()
}()
select {
case <-p.Shutdown():
return clean()
case err := <-done:
return err
}
}