forked from alpacahq/marketstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.go
179 lines (162 loc) · 4.01 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package utils
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
)
/*
This is the authoritative root context for all processes managed by this package
In other words, if you cancel/kill this parent, all processes managed here will be canceled/killed
*/
var (
runningProcesses struct {
sync.Mutex
procs map[uint32]*Process
rootContext context.Context
kill context.CancelFunc
}
lastPID uint32
)
func init() {
/*
This sets up a root context for running processes, so that we can kill all procs by canceling root
*/
runningProcesses.rootContext,
runningProcesses.kill = context.WithCancel(context.Background())
runningProcesses.procs = make(map[uint32]*Process)
}
func GetProcFromPID(pid uint32) *Process {
runningProcesses.Lock()
defer runningProcesses.Unlock()
if proc, ok := runningProcesses.procs[pid]; ok {
return proc
}
return nil
}
func IsRunning(pid uint32) bool {
proc := GetProcFromPID(pid)
return proc.running()
}
type Process struct {
PID uint32
Context context.Context
kill context.CancelFunc
input, output chan interface{}
runfunc func(chan interface{}, chan interface{})
Messages *MessageQueue
}
func (pr *Process) Kill() {
pr.input, pr.output = nil, nil
pr.kill()
runningProcesses.Lock()
delete(runningProcesses.procs, pr.PID)
runningProcesses.Unlock()
}
func (pr *Process) running() bool {
switch {
case pr == nil:
fallthrough
case pr.input == nil || pr.output == nil:
return false
}
return true
}
func (pr *Process) PutInput(msg interface{}) (err error) {
if pr.running() {
pr.input <- msg
return nil
}
return fmt.Errorf("process is not running")
}
func (pr *Process) GetOutput() (timestamps []time.Time, messages []interface{}, err error) {
if pr.running() {
timestamps, messages := pr.Messages.GetMessages()
return timestamps, messages, nil
}
return nil, nil, fmt.Errorf("process is not running")
}
func NewPID() uint32 {
return atomic.AddUint32(&lastPID, 1)
}
func NewProcess(run func(chan interface{}, chan interface{}), parentContext context.Context) (pid uint32) {
if parentContext == nil {
/*
Parent context allows for process grouping, so that all under it can be managed together
*/
parentContext = runningProcesses.rootContext
}
ctx, kill := context.WithCancel(parentContext)
input, output := make(chan interface{}), make(chan interface{})
mq := NewMessageQueue(50) // Buffer output from process in a message queue
proc := &Process{
0,
ctx,
kill,
input,
output,
run,
mq,
}
if run != nil {
proc.PID = NewPID()
go run(input, output)
go BufferProcessOutput(output, mq)
runningProcesses.Lock()
runningProcesses.procs[proc.PID] = proc
runningProcesses.Unlock()
}
return proc.PID
}
type MessageQueue struct {
sync.Mutex
length, cursor int
timeStamp []time.Time
messages []interface{}
}
func NewMessageQueue(length int) *MessageQueue {
mq := new(MessageQueue)
mq.length = length
mq.timeStamp = make([]time.Time, length)
mq.messages = make([]interface{}, length)
return mq
}
func (mq *MessageQueue) Len() int { return mq.length }
func (mq *MessageQueue) Swap(i, j int) {
mq.Lock()
defer mq.Unlock()
mq.timeStamp[j], mq.timeStamp[i] = mq.timeStamp[i], mq.timeStamp[j]
mq.messages[j], mq.messages[i] = mq.messages[i], mq.messages[j]
}
func (mq *MessageQueue) Less(i, j int) bool {
mq.Lock()
defer mq.Unlock()
return mq.timeStamp[i].After(mq.timeStamp[j])
}
func (mq *MessageQueue) AddMessage(msg interface{}) {
mq.Lock()
defer mq.Unlock()
mq.messages[mq.cursor] = msg
mq.timeStamp[mq.cursor] = time.Now()
mq.cursor = (mq.cursor + 1) % mq.length
}
func (mq *MessageQueue) GetMessages() (times []time.Time, messages []interface{}) {
sort.Sort(mq)
mq.Lock()
defer mq.Unlock()
for i, ts := range mq.timeStamp {
if ts.IsZero() == false {
times = append(times, ts)
messages = append(messages, mq.messages[i])
}
}
return times, messages
}
func BufferProcessOutput(output chan interface{}, mq *MessageQueue) {
for {
msg := <-output
mq.AddMessage(msg)
}
}