-
Notifications
You must be signed in to change notification settings - Fork 12
/
manager.go
422 lines (379 loc) · 15.6 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
// Copyright © 2016 Genome Research Limited
// Author: Sendu Bala <sb10@sanger.ac.uk>.
//
// This file is part of wr.
//
// wr is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// wr is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with wr. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"github.com/VertebrateResequencing/wr/internal"
"github.com/VertebrateResequencing/wr/jobqueue"
jqs "github.com/VertebrateResequencing/wr/jobqueue/scheduler"
"github.com/kardianos/osext"
"github.com/sevlyar/go-daemon"
"github.com/spf13/cobra"
"io/ioutil"
"log"
"os"
"path/filepath"
"runtime"
"strconv"
"syscall"
"time"
)
// options for this cmd
var foreground bool
var scheduler string
// managerCmd represents the manager command
var managerCmd = &cobra.Command{
Use: "manager",
Short: "Workflow manager",
Long: `The workflow management system.
The wr manager works in the background, doing all the work of ensuring your
commands get run successfully.
It maintains both a temporary queue of the commands you want to run, and a
permanent history of commands you've run in the past, along with a simple
key/val database that can be used to store result metadata associated with
output files. As commands are added to the queue, it makes sure to spawn
sufficient 'wr runner' agents to get them all run.
You'll need to start this daemon with the 'start' sub-command before you can
achieve anything useful with the other wr commands. If the background
process that is spawned when you run this dies, your workflows will become
stalled until you run the 'start' sub-command again.
If the manager fails to start or dies unexpectedly, you can check the logs which
are by default found in ~/.wr_[deployment]/log.
If using the openstack scheduler, note that you must be running on an openstack
server already. Instead you can use 'wr cloud deploy -p openstack' to create an
openstack server on which wr manager will be started in openstack mode for
you.`,
}
// start sub-command starts the daemon
var managerStartCmd = &cobra.Command{
Use: "start",
Short: "Start workflow management",
Long: `Start the workflow manager, daemonizing it in to the background
(unless --foreground option is supplied).`,
Run: func(cmd *cobra.Command, args []string) {
// first we need our working directory to exist
createWorkingDir()
// check to see if the manager is already running (regardless of the
// state of the pid file), giving us a meaningful error message in the
// most obvious case of failure to start
jq := connect(1 * time.Second)
if jq != nil {
sstats, err := jq.ServerStats()
var pid int
if err == nil {
pid = sstats.ServerInfo.PID
}
die("wr manager on port %s is already running (pid %d)", config.ManagerPort, pid)
}
var postCreation []byte
var extraArgs []string
if postCreationScript != "" {
var err error
postCreation, err = ioutil.ReadFile(postCreationScript)
if err != nil {
die("--cloud_script %s could not be read: %s", postCreationScript, err)
}
// daemon runs from /, so we need to convert relative to absolute
// path *** and then pretty hackily, re-specify the option by
// repeating it on the end of os.Args, where the daemonization code
// will pick it up
pcsAbs, err := filepath.Abs(postCreationScript)
if err != nil {
die("--cloud_script %s could not be converted to an absolute path: %s", postCreationScript, err)
}
if pcsAbs != postCreationScript {
extraArgs = append(extraArgs, "--cloud_script")
extraArgs = append(extraArgs, pcsAbs)
}
}
// now daemonize unless in foreground mode
if foreground {
syscall.Umask(config.ManagerUmask)
startJQ(true, postCreation)
} else {
child, context := daemonize(config.ManagerPidFile, config.ManagerUmask, extraArgs...)
if child != nil {
// parent; wait a while for our child to bring up the manager
// before exiting
jq := connect(10 * time.Second)
if jq == nil {
die("wr manager failed to start on port %s after 10s", config.ManagerPort)
}
sstats, err := jq.ServerStats()
if err != nil {
die("wr manager started but doesn't seem to be functional: %s", err)
}
logStarted(sstats.ServerInfo)
} else {
// daemonized child, that will run until signalled to stop
defer context.Release()
startJQ(false, postCreation)
}
}
},
}
// stop sub-command stops the daemon by sending it a term signal
var managerStopCmd = &cobra.Command{
Use: "stop",
Short: "Stop workflow management",
Long: `Immediately stop the workflow manager, saving its state.
Note that any runners that are currently running will die, along with any
commands they were running. It is more graceful to use 'drain' instead.`,
Run: func(cmd *cobra.Command, args []string) {
// the daemon could be running but be non-responsive, or it could have
// exited but left the pid file in place; to best cover all
// eventualities we check the pid file first, try and terminate its pid,
// then confirm we can't connect
pid, err := daemon.ReadPidFile(config.ManagerPidFile)
var stopped bool
if err == nil {
stopped = stopdaemon(pid, "pid file "+config.ManagerPidFile, "manager")
} else {
// probably no pid file, we'll see if the daemon is up by trying to
// connect
jq := connect(1 * time.Second)
if jq == nil {
die("wr manager does not seem to be running on port %s", config.ManagerPort)
}
}
var jq *jobqueue.Client
if stopped {
// we'll do a quick test to confirm the daemon is down
jq = connect(1 * time.Second)
if jq != nil {
warn("according to the pid file %s, wr manager was running with pid %d, and I terminated that pid, but the manager is still up on port %s!", config.ManagerPidFile, pid, config.ManagerPort)
} else {
info("wr manager running on port %s was gracefully shut down", config.ManagerPort)
return
}
} else {
// we failed to SIGTERM the pid in the pid file, let's take some
// time to confirm the daemon is really up
jq = connect(5 * time.Second)
if jq == nil {
die("according to the pid file %s, wr manager for port %s was running with pid %d, but that process could not be terminated and the manager could not be connected to; most likely the pid file is wrong and the manager is not running - after confirming, delete the pid file before trying to start the manager again", config.ManagerPidFile, config.ManagerPort, pid)
}
}
// we managed to connect to the daemon; get it's real pid and try to
// stop it again
sstats, err := jq.ServerStats()
if err != nil {
die("even though I was able to connect to the manager, it failed to tell me its true pid; giving up trying to stop it")
}
// though it may actually be running on a remote host and we managed to
// connect to it via ssh port forwarding; compare the server ip to our
// own
myAddr := jobqueue.CurrentIP() + ":" + config.ManagerPort
sAddr := sstats.ServerInfo.Addr
if myAddr == sAddr {
jq.Disconnect()
stopped = stopdaemon(sstats.ServerInfo.PID, "the manager itself", "manager")
} else {
// use the client command to stop it
stopped = jq.ShutdownServer()
// since I don't trust using a client connection to shut down the
// server, double check I can no longer connect
if stopped {
jq = connect(1 * time.Second)
if jq != nil {
warn("I requested shut down of the remote manager at %s, but it still up!", sAddr)
stopped = false
}
}
}
if stopped {
info("wr manager running at %s was gracefully shut down", sAddr)
} else {
info("I've tried everything; giving up trying to stop the manager at %s", sAddr)
}
},
}
// drain sub-command makes the server stop spawning new runners and stops it
// letting existing runners reserve jobs, and when there are no more runners
// running it will exit by itself
var managerDrainCmd = &cobra.Command{
Use: "drain",
Short: "Drain the workflow manager of running jobs and then stop",
Long: `Wait for currently running jobs to finish and then gracefully stop the workflow manager, saving its state.
While draining you can continue to add new Jobs, but nothing new will start
running until the drain completes (or the manager is stopped) and the manager is
then started again.
It is safe to repeat this command to get an update on how long before the drain
completes.`,
Run: func(cmd *cobra.Command, args []string) {
// first try and connect
jq := connect(5 * time.Second)
if jq == nil {
die("could not connect to the manager on port %s, so could not initiate a drain; has it already been stopped?", config.ManagerPort)
}
// we managed to connect to the daemon; ask it to go in to drain mode
numLeft, etc, err := jq.DrainServer()
if err != nil {
die("even though I was able to connect to the manager, it failed to enter drain mode: %s", err)
}
if numLeft == 0 {
info("wr manager running on port %s is drained: there were no jobs still running, so the manger should stop right away.", config.ManagerPort)
} else if numLeft == 1 {
info("wr manager running on port %s is now draining; there is a job still running, and it should complete in less than %s", config.ManagerPort, etc)
} else {
info("wr manager running on port %s is now draining; there are %d jobs still running, and they should complete in less than %s", config.ManagerPort, numLeft, etc)
}
jq.Disconnect()
},
}
// status sub-command tells if the manger is up or down
var managerStatusCmd = &cobra.Command{
Use: "status",
Short: "Get status of the workflow manager",
Long: `Find out if the workflow manager is currently running or not.`,
Run: func(cmd *cobra.Command, args []string) {
// see if pid file suggests it is supposed to be running
pid, err := daemon.ReadPidFile(config.ManagerPidFile)
if err == nil {
// confirm
jq := connect(5 * time.Second)
if jq != nil {
reportLiveStatus(jq)
return
}
die("wr manager on port %s is supposed to be running with pid %d, but is non-responsive", config.ManagerPort, pid)
}
// no pid file, so it's supposed to be down; confirm
jq := connect(1 * time.Second)
if jq == nil {
fmt.Println("stopped")
} else {
reportLiveStatus(jq)
}
},
}
// reportLiveStatus is used by the status command on a working connection to
// distinguish between the server being in a normal 'started' state or the
// 'drain' state.
func reportLiveStatus(jq *jobqueue.Client) {
sstats, err := jq.ServerStats()
if err != nil {
die("even though I was able to connect to the manager, it wasn't able to tell me about itself: %s", err)
}
mode := sstats.ServerInfo.Mode
fmt.Println(mode)
}
func init() {
RootCmd.AddCommand(managerCmd)
managerCmd.AddCommand(managerStartCmd)
managerCmd.AddCommand(managerDrainCmd)
managerCmd.AddCommand(managerStopCmd)
managerCmd.AddCommand(managerStatusCmd)
// flags specific to these sub-commands
managerStartCmd.Flags().BoolVarP(&foreground, "foreground", "f", false, "do not daemonize")
managerStartCmd.Flags().StringVarP(&scheduler, "scheduler", "s", internal.DefaultScheduler(), "['local','lsf','openstack'] job scheduler")
managerStartCmd.Flags().StringVarP(&osPrefix, "cloud_os", "o", "Ubuntu 16", "for cloud schedulers, prefix name of the OS image your servers should use")
managerStartCmd.Flags().StringVarP(&osUsername, "cloud_username", "u", "ubuntu", "for cloud schedulers, username needed to log in to the OS image specified by --cloud_os")
managerStartCmd.Flags().IntVarP(&osRAM, "cloud_ram", "r", 2048, "for cloud schedulers, ram (MB) needed by the OS image specified by --cloud_os")
managerStartCmd.Flags().StringVarP(&flavorRegex, "cloud_flavor", "l", "", "for cloud schedulers, a regular expression to limit server flavors that can be automatically picked")
managerStartCmd.Flags().StringVarP(&postCreationScript, "cloud_script", "p", "", "for cloud schedulers, path to a start-up script that will be run on each server created")
managerStartCmd.Flags().IntVarP(&serverKeepAlive, "cloud_keepalive", "k", 120, "for cloud schedulers, how long in seconds to keep idle spawned servers alive for")
managerStartCmd.Flags().IntVarP(&maxServers, "cloud_servers", "m", 0, "for cloud schedulers, maximum number of servers to spawn; 0 means unlimited (default 0)")
}
func logStarted(s *jobqueue.ServerInfo) {
info("wr manager started on %s, pid %d", sAddr(s), s.PID)
info("wr's web interface can be reached at http://%s:%s", s.Host, s.WebPort)
}
func startJQ(sayStarted bool, postCreation []byte) {
runtime.GOMAXPROCS(runtime.NumCPU())
// we will spawn runners, which means we need to know the path to ourselves
// in case we're not in the user's $PATH
exe, err := osext.Executable()
if err != nil {
log.Printf("wr manager failed to start : %s\n", err)
os.Exit(1)
}
var schedulerConfig interface{}
switch scheduler {
case "local":
schedulerConfig = &jqs.ConfigLocal{Shell: config.RunnerExecShell}
case "lsf":
schedulerConfig = &jqs.ConfigLSF{Deployment: config.Deployment, Shell: config.RunnerExecShell}
case "openstack":
mport, _ := strconv.Atoi(config.ManagerPort)
schedulerConfig = &jqs.ConfigOpenStack{
ResourceName: "wr-" + config.Deployment,
SavePath: filepath.Join(config.ManagerDir, "cloud_resources.openstack"),
ServerPorts: []int{22, mport},
OSPrefix: osPrefix,
OSUser: osUsername,
OSRAM: osRAM,
FlavorRegex: flavorRegex,
PostCreationScript: postCreation,
ServerKeepTime: time.Duration(serverKeepAlive) * time.Second,
MaxInstances: maxServers,
Shell: config.RunnerExecShell,
}
}
// start the jobqueue server
server, msg, err := jobqueue.Serve(jobqueue.ServerConfig{
Port: config.ManagerPort,
WebPort: config.ManagerWeb,
SchedulerName: scheduler,
SchedulerConfig: schedulerConfig,
RunnerCmd: exe + " runner -q %s -s '%s' --deployment %s --server '%s' -r %d -m %d",
DBFile: config.ManagerDbFile,
DBFileBackup: config.ManagerDbBkFile,
Deployment: config.Deployment,
})
if sayStarted && err == nil {
logStarted(server.ServerInfo)
}
// start logging to configured file
logfile, errlog := os.OpenFile(config.ManagerLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if errlog != nil {
warn("could not log to %s, will log to STDOUT: %v", config.ManagerLogFile, errlog)
} else {
defer logfile.Close()
log.SetOutput(logfile)
}
// log to file failure to Serve
if err != nil {
if msg != "" {
log.Printf("wr manager : %s\n", msg)
}
log.Printf("wr manager failed to start : %s\n", err)
os.Exit(1)
}
// log to file that we started
addr := sAddr(server.ServerInfo)
log.Printf("wr manager started on %s\n", addr)
if msg != "" {
log.Printf("wr manager : %s\n", msg)
}
// block forever while the jobqueue does its work
err = server.Block()
if err != nil {
jqerr, ok := err.(jobqueue.Error)
switch {
case ok && jqerr.Err == jobqueue.ErrClosedTerm:
log.Printf("wr manager on %s gracefully stopped (received SIGTERM)\n", addr)
case ok && jqerr.Err == jobqueue.ErrClosedInt:
log.Printf("wr manager on %s gracefully stopped (received SIGINT)\n", addr)
case ok && jqerr.Err == jobqueue.ErrClosedStop:
log.Printf("wr manager on %s gracefully stopped (following a drain)\n", addr)
default:
log.Printf("wr manager on %s exited unexpectedly: %s\n", addr, err)
}
}
}