-
Notifications
You must be signed in to change notification settings - Fork 12
/
utils.go
649 lines (589 loc) · 16.7 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
// Copyright © 2016-2018 Genome Research Limited
// Author: Sendu Bala <sb10@sanger.ac.uk>.
//
// This file is part of wr.
//
// wr is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// wr is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with wr. If not, see <http://www.gnu.org/licenses/>.
package jobqueue
// This file contains some general utility functions for use by client and
// server.
import (
"bufio"
"bytes"
"compress/zlib"
crand "crypto/rand"
"crypto/subtle"
"encoding/base64"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/VertebrateResequencing/wr/internal"
"github.com/VertebrateResequencing/wr/jobqueue/scheduler"
"github.com/dgryski/go-farm"
multierror "github.com/hashicorp/go-multierror"
"github.com/jpillora/backoff"
"github.com/shirou/gopsutil/process"
)
// AppName gets used in certain places like naming the base directory of created
// working directories during Client.Execute().
var AppName = "jobqueue"
// mkHashedLevels is the number of directory levels we create in mkHashedDirs
const mkHashedLevels = 4
// tokenLength is the fixed size of our authentication token
const tokenLength = 43
const reqSchedSpecialRAM = 924
const reqSchedExtraRAM = 100
const reqSchedTimeRound = 30 * time.Minute
var pss = []byte("Pss:")
// cr, lf and ellipses get used by stdFilter()
var cr = []byte("\r")
var lf = []byte("\n")
var ellipses = []byte("[...]\n")
// generateToken creates a cryptographically secure pseudorandom URL-safe base64
// encoded string 43 bytes long. Used by the server to create a token passed to
// to the caller for subsequent client authentication. If the given file exists
// and contains a single 43 byte string, then that is used as the token instead.
func generateToken(tokenFile string) ([]byte, error) {
if token, err := os.ReadFile(tokenFile); err == nil && len(token) == tokenLength {
return token, nil
}
b := make([]byte, 32)
_, err := crand.Read(b)
if err != nil {
return nil, err
}
token := make([]byte, tokenLength)
base64.URLEncoding.WithPadding(base64.NoPadding).Encode(token, b)
return token, err
}
// tokenMatches compares a token supplied by a client with a server token (eg.
// generated by generateToken()) and tells you if they match. Does so in a
// cryptographically secure way (avoiding timing attacks).
func tokenMatches(input, expected []byte) bool {
result := subtle.ConstantTimeCompare(input, expected)
return result == 1
}
// byteKey calculates a unique key that describes a byte slice.
func byteKey(b []byte) string {
l, h := farm.Hash128(b)
return fmt.Sprintf("%016x%016x", l, h)
}
// copy a file *** should be updated to handle source being on a different
// machine or in an S3-style object store.
func copyFile(source string, dest string) error {
in, err := os.Open(source)
if err != nil {
return err
}
defer func() {
errc := in.Close()
if errc != nil {
if err == nil {
err = errc
} else {
err = fmt.Errorf("%s (and closing source failed: %s)", err.Error(), errc)
}
}
}()
out, err := os.Create(dest)
if err != nil {
return err
}
defer func() {
errc := out.Close()
if errc != nil {
if err == nil {
err = errc
} else {
err = fmt.Errorf("%s (and closing dest failed: %s)", err.Error(), errc)
}
}
}()
_, err = io.Copy(out, in)
return err
}
// compress uses zlib to compress stuff, for transferring big stuff like
// stdout, stderr and environment variables over the network, and for storing
// of same on disk.
func compress(data []byte) ([]byte, error) {
var compressed bytes.Buffer
w, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression)
if err != nil {
return nil, err
}
_, err = w.Write(data)
if err != nil {
return nil, err
}
err = w.Close()
if err != nil {
return nil, err
}
return compressed.Bytes(), nil
}
// decompress uses zlib to decompress stuff compressed by compress().
func decompress(compressed []byte) ([]byte, error) {
b := bytes.NewReader(compressed)
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
return nil, err
}
return buf.Bytes(), err
}
// get the current memory usage of a pid and all its children, relying on modern
// linux /proc/*/smaps (based on http://stackoverflow.com/a/31881979/675083).
func currentMemory(pid int) (int, error) {
f, err := os.Open(fmt.Sprintf("/proc/%d/smaps", pid))
if err != nil {
return 0, err
}
defer func() {
errc := f.Close()
if errc != nil {
if err == nil {
err = errc
} else {
err = fmt.Errorf("%s (and closing smaps failed: %s)", err.Error(), errc)
}
}
}()
kb := uint64(0)
r := bufio.NewScanner(f)
for r.Scan() {
line := r.Bytes()
if bytes.HasPrefix(line, pss) {
var size uint64
_, err = fmt.Sscanf(string(line[4:]), "%d", &size)
if err != nil {
return 0, err
}
kb += size
}
}
if err = r.Err(); err != nil {
return 0, err
}
// convert kB to MB
mem := int(kb / 1024)
// recurse for children
p, err := process.NewProcess(int32(pid))
if err != nil {
return mem, err
}
children, err := p.Children()
if err != nil && err.Error() != "process does not have children" { // err != process.ErrorNoChildren
return mem, err
}
for _, child := range children {
childMem, errr := currentMemory(int(child.Pid))
if errr != nil {
continue
}
mem += childMem
}
return mem, nil
}
// get the current disk usage within a directory, in MBs. Optionally, provide a
// map of absolute paths to dirs (within path) that should not be checked.
func currentDisk(path string, ignore ...map[string]bool) (int64, error) {
var disk int64
skip := make(map[string]bool)
if len(ignore) == 1 && len(ignore[0]) > 0 {
skip = ignore[0]
}
dir, err := os.Open(path)
if err != nil {
return disk, err
}
defer func() {
err = dir.Close()
}()
files, err := dir.Readdir(-1)
if err != nil {
return disk, err
}
for _, file := range files {
if file.IsDir() {
abs := filepath.Join(path, file.Name())
if skip[abs] {
continue
}
recurse, errr := currentDisk(abs, ignore...)
if errr != nil {
return disk, errr
}
disk += recurse
} else {
disk += file.Size() / (1024 * 1024)
}
}
return disk, err
}
// getChildProcesses gets the child processes of the given pid, recursively.
func getChildProcesses(pid int32) ([]*process.Process, error) {
var children []*process.Process
p, err := process.NewProcess(pid)
if err != nil {
// we ignore errors, since we allow for working on processes that we're in
// the process of killing
return children, nil
}
children, err = p.Children()
if err != nil && err.Error() != "process does not have children" {
return children, err
}
for _, child := range children {
theseKids, errk := getChildProcesses(child.Pid)
if errk != nil {
continue
}
if len(theseKids) > 0 {
children = append(children, theseKids...)
}
}
return children, nil
}
// this prefixSuffixSaver-related code is taken from os/exec, since they are not
// exported. prefixSuffixSaver is an io.Writer which retains the first N bytes
// and the last N bytes written to it. The Bytes() methods reconstructs it with
// a pretty error message.
type prefixSuffixSaver struct {
N int
prefix []byte
suffix []byte
suffixOff int
skipped int64
}
func (w *prefixSuffixSaver) Write(p []byte) (int, error) {
lenp := len(p)
p = w.fill(&w.prefix, p)
if overage := len(p) - w.N; overage > 0 {
p = p[overage:]
w.skipped += int64(overage)
}
p = w.fill(&w.suffix, p)
for len(p) > 0 { // 0, 1, or 2 iterations.
n := copy(w.suffix[w.suffixOff:], p)
p = p[n:]
w.skipped += int64(n)
w.suffixOff += n
if w.suffixOff == w.N {
w.suffixOff = 0
}
}
return lenp, nil
}
func (w *prefixSuffixSaver) fill(dst *[]byte, p []byte) []byte {
if remain := w.N - len(*dst); remain > 0 {
add := minInt(len(p), remain)
*dst = append(*dst, p[:add]...)
p = p[add:]
}
return p
}
func (w *prefixSuffixSaver) Bytes() []byte {
if w.suffix == nil {
return w.prefix
}
if w.skipped == 0 {
return append(w.prefix, w.suffix...)
}
var buf bytes.Buffer
buf.Grow(len(w.prefix) + len(w.suffix) + 50)
buf.Write(w.prefix)
buf.WriteString("\n... omitting ")
buf.WriteString(strconv.FormatInt(w.skipped, 10))
buf.WriteString(" bytes ...\n")
buf.Write(w.suffix[w.suffixOff:])
buf.Write(w.suffix[:w.suffixOff])
return buf.Bytes()
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// stdFilter keeps only the first and last line of any contiguous block of \r
// terminated lines (to mostly eliminate progress bars), intended for use with
// stdout/err streaming input, outputting to a prefixSuffixSaver. Because you
// must finish reading from the input before continuing, it returns a channel
// that you should wait to receive an error from (nil if everything workd).
func stdFilter(std io.Reader, out io.Writer) chan error {
reader := bufio.NewReader(std)
done := make(chan error)
go func() {
var merr *multierror.Error
for {
p, err := reader.ReadBytes('\n')
lines := bytes.Split(p, cr)
_, errw := out.Write(lines[0])
if errw != nil {
merr = multierror.Append(merr, errw)
}
if len(lines) > 2 {
_, errw = out.Write(lf)
if errw != nil {
merr = multierror.Append(merr, errw)
}
if len(lines) > 3 {
_, errw = out.Write(ellipses)
if errw != nil {
merr = multierror.Append(merr, errw)
}
}
_, errw = out.Write(lines[len(lines)-2])
if errw != nil {
merr = multierror.Append(merr, errw)
}
_, errw = out.Write(lf)
if errw != nil {
merr = multierror.Append(merr, errw)
}
}
if err != nil {
break
}
}
done <- merr.ErrorOrNil()
}()
return done
}
// envOverride deals with values you get from os.Environ, overriding one set
// with values from another. Returns the new slice of environment variables.
func envOverride(orig []string, over []string) []string {
override := make(map[string]string)
for _, envvar := range over {
pair := strings.Split(envvar, "=")
override[pair[0]] = envvar
}
env := orig
for i, envvar := range env {
pair := strings.Split(envvar, "=")
if replace, do := override[pair[0]]; do {
env[i] = replace
delete(override, pair[0])
}
}
for _, envvar := range override {
env = append(env, envvar)
}
return env
}
// calculateHashedDir returns the hashed directory structure corresponding to
// a given string. Returns dirs rooted at baseDir, and a leaf name.
func calculateHashedDir(baseDir, tohash string) (string, string) {
dirs := strings.SplitN(tohash, "", mkHashedLevels)
dirs, leaf := dirs[0:mkHashedLevels-1], dirs[mkHashedLevels-1]
dirs = append([]string{baseDir}, dirs...)
return filepath.Join(dirs...), leaf
}
// mkHashedDir uses tohash (which should be a 32 char long string from
// byteKey()) to create a folder nested within baseDir, and in that folder
// creates 2 folders called cwd and tmp, which it returns. Returns an error if
// there were problems making the directories.
func mkHashedDir(baseDir, tohash string) (cwd, tmpDir string, err error) {
dir, leaf := calculateHashedDir(filepath.Join(baseDir, AppName+"_cwd"), tohash)
holdFile := filepath.Join(dir, ".hold")
defer func() {
errr := os.Remove(holdFile)
if errr != nil && !os.IsNotExist(errr) {
if err == nil {
err = errr
} else {
err = fmt.Errorf("%s (and removing the hold file failed: %s)", err.Error(), errr)
}
}
}()
tries := 0
for {
err = os.MkdirAll(dir, os.ModePerm)
if err != nil {
tries++
if tries <= 3 {
// we retry a few times in case another process is calling
// rmEmptyDirs on the same baseDir and so conflicting with us
continue
}
return cwd, tmpDir, err
}
// and drop a temp file in here so rmEmptyDirs will not immediately
// remove these dirs
tries = 0
var f *os.File
f, err = os.OpenFile(holdFile, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
tries++
if tries <= 3 {
continue
}
return cwd, tmpDir, err
}
err = f.Close()
if err != nil {
return cwd, tmpDir, err
}
break
}
// if tohash is a job key then we expect that only 1 of that job is
// running at any one time per jobqueue, but there could be multiple users
// running the same cmd, or this user could be running the same command in
// multiple queues, so we must still create a unique dir at the leaf of our
// hashed dir structure, to avoid any conflict of multiple processes using
// the same working directory
dir, err = os.MkdirTemp(dir, leaf)
if err != nil {
return cwd, tmpDir, err
}
cwd = filepath.Join(dir, "cwd")
err = os.Mkdir(cwd, os.ModePerm)
if err != nil {
return cwd, tmpDir, err
}
tmpDir = filepath.Join(dir, "tmp")
return cwd, tmpDir, os.Mkdir(tmpDir, os.ModePerm)
}
// rmEmptyDirs deletes leafDir and it's parent directories if they are empty,
// stopping if it reaches baseDir (leaving that undeleted). It's ok if leafDir
// doesn't exist.
func rmEmptyDirs(leafDir, baseDir string) error {
err := os.Remove(leafDir)
if err != nil && !os.IsNotExist(err) {
if strings.Contains(err.Error(), "directory not empty") { //*** not sure where this string comes; probably not cross platform!
return nil
}
return err
}
current := leafDir
parent := filepath.Dir(current)
for ; parent != baseDir; parent = filepath.Dir(current) {
thisErr := os.Remove(parent)
if thisErr != nil {
// it's expected that we might not be able to delete parents, since
// some other Job may be running from the same Cwd, meaning this
// parent dir is not empty
break
}
current = parent
}
return nil
}
// removeAllExcept deletes the contents of a given directory (absolute path),
// except for the given folders (relative paths).
func removeAllExcept(path string, exceptions []string) error {
keepDirs := make(map[string]bool)
checkDirs := make(map[string]bool)
path = filepath.Clean(path)
for _, dir := range exceptions {
abs := filepath.Join(path, dir)
keepDirs[abs] = true
parent := filepath.Dir(abs)
for {
if parent == path {
break
}
checkDirs[parent] = true
parent = filepath.Dir(parent)
}
}
return removeWithExceptions(path, keepDirs, checkDirs)
}
// removeWithExceptions is the recursive part of removeAllExcept's
// implementation that does the real work of deleting stuff.
func removeWithExceptions(path string, keepDirs map[string]bool, checkDirs map[string]bool) error {
entries, errr := os.ReadDir(path)
if errr != nil {
return errr
}
for _, entry := range entries {
abs := filepath.Join(path, entry.Name())
if !entry.IsDir() {
err := os.Remove(abs)
if err != nil {
return err
}
continue
}
if keepDirs[abs] {
continue
}
if checkDirs[abs] {
err := removeWithExceptions(abs, keepDirs, checkDirs)
if err != nil {
return err
}
} else {
err := os.RemoveAll(abs)
if err != nil {
return err
}
}
}
return nil
}
// compressFile reads the content of the given file then compresses that. Since
// this happens in memory, only suitable for small files!
func compressFile(path string) ([]byte, error) {
path = internal.TildaToHome(path)
content, err := os.ReadFile(path)
if err != nil {
return nil, err
}
return compress(content)
}
// reqForScheduler takes a job's Requirements and returns a possibly modified
// version if using less than 924MB memory to have +100MB memory to allow some
// leeway in case the job scheduler calculates used memory differently, and for
// other memory usage vagaries. It also rounds up the Time to the nearest half
// hour.
func reqForScheduler(req *scheduler.Requirements) *scheduler.Requirements {
ram := req.RAM
if ram < reqSchedSpecialRAM {
ram += reqSchedExtraRAM
}
d := req.Time.Round(reqSchedTimeRound)
if d < req.Time {
d += reqSchedTimeRound
}
return &scheduler.Requirements{
RAM: ram,
Time: d,
Cores: req.Cores,
Disk: req.Disk,
Other: req.Other,
}
}
// calculateItemDelay returns a delay based on a backoff and the number of
// previous delays.
func calculateItemDelay(numPreviousDelays int) time.Duration {
b := &backoff.Backoff{
Min: ClientReleaseDelayMin,
Max: ClientReleaseDelayMax,
Factor: ClientReleaseDelayStepFactor,
Jitter: false, // don't like the behaviour of it's jitter
}
d := b.ForAttempt(float64(numPreviousDelays))
d -= time.Duration(rand.Float64()*float64(ClientReleaseDelayMin) - float64(ClientReleaseDelayMin)) // #nosec
return d
}