-
Notifications
You must be signed in to change notification settings - Fork 13
/
parallelize.go
154 lines (132 loc) · 3.66 KB
/
parallelize.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package main
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/ActiveState/cli/internal/constraints"
"github.com/ActiveState/cli/internal/errs"
"github.com/ActiveState/cli/internal/fileutils"
"github.com/ActiveState/cli/internal/installation/storage"
"github.com/ActiveState/cli/internal/osutils"
"github.com/ActiveState/cli/pkg/project"
"github.com/gammazero/workerpool"
)
type Job struct {
ID string
Args []string
Env []string
If string
}
func main() {
if err := run(); err != nil {
fmt.Fprintln(os.Stderr, errs.JoinMessage(err))
os.Exit(1)
}
}
func run() error {
if len(os.Args) <= 1 {
return errs.New("Must provide single argument with JSON blob, or [job <ID>] to check the results of a job.")
}
if os.Args[1] == "results" {
if len(os.Args) != 3 {
return errs.New("Must provide job ID")
}
return readJob(os.Args[2])
}
jsonData := []byte(strings.Join(os.Args[1:], ""))
var jobs []Job
if err := json.Unmarshal(jsonData, &jobs); err != nil {
return errs.Wrap(err, "Invalid JSON. Data: %s", jsonData)
}
wp := workerpool.New(3)
for _, job := range jobs {
func(job Job) {
wp.Submit(func() {
t := time.Now()
fmt.Printf("Running: %s\n", job.ID)
runJob(job)
fmt.Printf("Finished %s after %s\n", job.ID, time.Since(t))
})
}(job)
}
wp.StopWait()
return nil
}
func jobDir() string {
path, err := storage.AppDataPath()
if err != nil {
panic(err)
}
path = filepath.Join(path, "jobs")
if err := fileutils.MkdirUnlessExists(path); err != nil {
panic(err)
}
return path
}
func runJob(job Job) {
outname := filepath.Join(jobDir(), fmt.Sprintf("%s.out", job.ID))
fmt.Printf("%s: saving to %s\n", job.ID, outname)
outfile, err := os.Create(outname)
if err != nil {
panic(fmt.Sprintf("Could not create: %#v, error: %s\n", job, errs.JoinMessage(err)))
}
defer outfile.Close()
failure := func(msg string, args ...interface{}) {
fmt.Fprintf(outfile, msg+"\n1", args...)
fmt.Fprintf(os.Stderr, fmt.Sprintf("%s: ", job.ID)+msg, args...)
}
if job.If != "" {
pj, err := project.GetOnce()
if err != nil {
failure("Could not get project: %s", errs.JoinMessage(err))
return
}
cond := constraints.NewPrimeConditional(nil, pj, "")
run, err := cond.Eval(job.If)
if err != nil {
failure("Could not evaluate conditonal: %s, error: %s\n", job.If, errs.JoinMessage(err))
return
}
if !run {
fmt.Printf("%s: Skipping as per conditional: %s\n", job.ID, job.If)
return
}
}
if len(job.Args) == 0 {
failure("Job must have arguments: %#v\n", job)
return
}
code, _, err := osutils.Execute(job.Args[0]+osutils.ExeExtension, job.Args[1:], func(cmd *exec.Cmd) error {
cmd.Stdout = outfile
cmd.Stderr = outfile
cmd.Env = append(job.Env, os.Environ()...)
return nil
})
if err != nil {
failure("Executing job %s failed, error: %s", job.ID, errs.JoinMessage(err))
}
_, err = outfile.WriteString(fmt.Sprintf("\n%d", code)) // last entry is the exit code
if err != nil {
fmt.Printf("Could not write exit code to file: %s\n", errs.JoinMessage(err))
}
fmt.Printf("%s: Completed, exit code: %d\n", job.ID, code)
}
func readJob(id string) error {
jobfile := filepath.Join(jobDir(), fmt.Sprintf("%s.out", id))
if !fileutils.FileExists(jobfile) {
return errs.New("Job does not exist: %s", jobfile)
}
contents := strings.Split(string(fileutils.ReadFileUnsafe(jobfile)), "\n")
code, err := strconv.Atoi(contents[len(contents)-1])
if err != nil {
return errs.Wrap(err, "Expected last line to be the exit code, instead found: %s", contents[len(contents)-1])
}
fmt.Println(strings.Join(contents[0:(len(contents)-2)], "\n"))
os.Exit(code)
return nil
}