/
rrepl.go
350 lines (297 loc) · 8.58 KB
/
rrepl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
/*
startup process
===============
1. start the master go process. Mount the embedded R system so the child process can read it.
1. start the child R process reading from the fuse mounted filesystem, and
~~~
Master Go process ---> starts --> R child process acts as REPL.
| ^ |
websocket | |launches
server | |websocket client
^ | |
| | V
\----------------------------> o-RMQ library, in Go
~~~
*/
package main
// Copyright 2015 Jason E. Aten
// License: Apache 2.0. http://www.apache.org/licenses/LICENSE-2.0
import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/gorilla/websocket"
"github.com/glycerine/libzipfs"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
func main() {
// mount the attached zip file as R_HOME
// libzipfs.MountComboZip() call serves the
// zipfile described by the footer in the current
// program, as generated by the libzipfs-combiner
// utility.
z, mountpoint, err := libzipfs.MountComboZip()
if err != nil {
panic(err)
}
defer z.Stop() // if you want to stop serving files
// set R_HOME so our child process can find it.
rhome := mountpoint + "/3.2.2_1/Frameworks/R.framework/Resources/"
err = os.Setenv("R_HOME", rhome)
panicOn(err)
// start our webserver, and set an env var so child repl can find us.
addr := "localhost:10101"
err = os.Setenv("ROGUE_REPL_SERVER_ADDR", addr)
panicOn(err)
// start our child repl. monitor it and restart it if it dies.
cfg := &WatchdogConfig{
PathToChildProcess: mountpoint + "/3.2.2_1/Frameworks/R.framework/Resources/bin/R",
}
watcher := NewWatchdog(cfg)
watcher.Start()
// start the websocket server that communicates with the R REPL session
requestFromReplCh := make(chan []byte)
replyToReplCh := make(chan []byte)
// for now just echo it back with a small addition
go func() {
for {
select {
case req := <-requestFromReplCh:
var reply bytes.Buffer
fmt.Fprintf(&reply, "echoing: ")
reqBuf := bytes.NewBuffer(req)
_, err := io.Copy(&reply, reqBuf)
panicOn(err)
replyToReplCh <- reply.Bytes()
}
}
}()
fmt.Printf("ListenAndServe listening on address '%s'...\n", addr)
webSockHandler := func(w http.ResponseWriter, r *http.Request) {
fmt.Printf("\nwebSockHandler called.\n")
if r.URL.Path != "/" {
http.Error(w, "Not found", 404)
return
}
if r.Method != "GET" {
http.Error(w, "Method not allowed, only GET allowed.", 405)
return
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
msg := fmt.Sprintf("server webSockHandler() handler saw "+
"websocket upgrader.Upgrade() error: '%s'", err)
fmt.Printf("%s\n", msg)
http.Error(w, msg, 500)
return
}
defer c.Close()
_, message, err := c.ReadMessage()
if err != nil {
msg := fmt.Sprintf("server webSockHandler() handler saw "+
"websocket ReadMessage() error: '%s'", err)
fmt.Printf("%s\n", msg)
http.Error(w, msg, 500)
return
}
requestFromReplCh <- message
reply := <-replyToReplCh
err = c.WriteMessage(websocket.BinaryMessage, reply)
if err != nil {
msg := fmt.Sprintf("server webSockHandler() handler saw "+
"websocket WriteMessage() error: '%s'", err)
fmt.Printf("%s\n", msg)
http.Error(w, msg, 500)
return
}
} // end webSockHandler
// start a new server, to avoid registration issues with
// the default http library mux/server which may be in use
// already for other purposes.
mux := http.NewServeMux()
mux.HandleFunc("/", webSockHandler)
server := NewWebServer(addr, mux)
server.Start()
select {}
}
type WatchdogConfig struct {
PathToChildProcess string
}
type Watchdog struct {
Ready chan bool
RestartChild chan bool
ReqStop chan bool
Done chan bool
startCount int64
mut sync.Mutex
shutdown bool
cfg *WatchdogConfig
err error
needRestart bool
proc *os.Process
}
func NewWatchdog(cfg *WatchdogConfig) *Watchdog {
w := &Watchdog{
Ready: make(chan bool),
RestartChild: make(chan bool),
ReqStop: make(chan bool),
Done: make(chan bool),
cfg: cfg,
}
return w
}
func (w *Watchdog) AlreadyDone() bool {
select {
case <-w.Done:
return true
default:
return false
}
}
func (w *Watchdog) Stop() error {
if w.AlreadyDone() {
// once Done, w.err is immutable, so we don't need to lock.
return w.err
}
w.mut.Lock()
if w.shutdown {
defer w.mut.Unlock()
return w.err
}
w.mut.Unlock()
close(w.ReqStop)
<-w.Done
// don't wait for Done while holding the lock,
// as that is deadlock prone.
w.mut.Lock()
defer w.mut.Unlock()
w.shutdown = true
return w.err
}
func (w *Watchdog) SetErr(err error) {
w.mut.Lock()
defer w.mut.Unlock()
w.err = err
}
func (w *Watchdog) GetErr() error {
w.mut.Lock()
defer w.mut.Unlock()
return w.err
}
// see w.err for any error after w.Done
func (w *Watchdog) Start() {
signalChild := make(chan os.Signal, 1)
//defer close(signalChild)
signal.Notify(signalChild, syscall.SIGCHLD)
var attr os.ProcAttr
argv := []string{}
w.needRestart = true
var ws syscall.WaitStatus
go func() {
defer func() {
if w.proc != nil {
w.proc.Release()
}
close(w.Done)
// can deadlock if we don't close(w.Done) before grabbing the mutex:
w.mut.Lock()
w.shutdown = true
w.mut.Unlock()
signal.Stop(signalChild) // reverse the effect of the above Notify()
}()
var err error
reaploop:
for {
if w.needRestart {
if w.proc != nil {
w.proc.Release()
}
VPrintf("\n debug: about to start '%s'\n", w.cfg.PathToChildProcess)
w.proc, err = os.StartProcess(w.cfg.PathToChildProcess, argv, &attr)
if err != nil {
w.err = err
return
}
w.needRestart = false
w.startCount++
VPrintf("\n Start number %d: Watchdog started pid %d / new process '%s'\n", w.startCount, w.proc.Pid, w.cfg.PathToChildProcess)
}
select {
case <-w.ReqStop:
VPrintf("\n ReqStop noted, exiting watchdog.Start() loop\n")
return
case <-w.RestartChild:
VPrintf("\n debug: got <-w.RestartChild\n")
err := w.proc.Signal(syscall.SIGKILL)
if err != nil {
err = fmt.Errorf("warning: watchdog tried to SIGKILL pid %d but got error: '%s'", w.proc.Pid, err)
w.SetErr(err)
log.Printf("%s", err)
return
}
continue reaploop
case <-signalChild:
VPrintf("\n debug: got <-signalChild\n")
for i := 0; i < 1000; i++ {
pid, err := syscall.Wait4(w.proc.Pid, &ws, syscall.WNOHANG, nil)
// pid > 0 => pid is the ID of the child that died, but
// there could be other children that are signalling us
// and not the one we in particular are waiting for.
// pid -1 && errno == ECHILD => no new status children
// pid -1 && errno != ECHILD => syscall interupped by signal
// pid == 0 => no more children to wait for.
VPrintf("\n pid=%v ws=%v and err == %v\n", pid, ws, err)
switch {
case err != nil:
err = fmt.Errorf("wait4() got error back: '%s' and ws:%v", err, ws)
log.Printf("warning in reaploop, wait4(WNOHANG) returned error: '%s'. ws=%v", err, ws)
w.SetErr(err)
continue reaploop
case pid == w.proc.Pid:
w.needRestart = true
VPrintf("\n Watchdog saw pid %d/process '%s' finish with waitstatus: %v.\n", pid, w.cfg.PathToChildProcess, ws)
continue reaploop
case pid == 0:
// this is what we get when SIGSTOP is sent on OSX. ws == 0 in this case.
// Note that on OSX we never get a SIGCONT signal.
// Under WNOHANG, pid == 0 means there is nobody left to wait for,
// so just go back to waiting for another SIGCHLD.
VPrintf("pid == 0 on wait4, (perhaps SIGSTOP?): nobody left to wait for, keep looping. ws = %v\n", ws)
continue reaploop
default:
VPrintf("\n warning in reaploop: wait4() negative or not our pid, sleep and try again\n")
time.Sleep(time.Millisecond)
}
} // end for i
w.SetErr(fmt.Errorf("could not reap child PID %d or obtain wait4(WNOHANG)==0 even after 1000 attempts", w.proc.Pid))
log.Printf("%s", w.err)
return
} // end select
} // end for reaploop
}()
}
func panicOn(err error) {
if err != nil {
panic(err)
}
}