/
run.go
106 lines (90 loc) · 2.45 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package run
/*
User defines a task.Task, after validation, it goes to a queue.Queue
and a run.Runner pick it and run.Runner#Prepare then run.Runner#Run it.
*/
import (
"bytes"
"fmt"
"io"
"github.com/factorysh/microdensity/task"
"github.com/factorysh/microdensity/volumes"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
serviceRun = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "run_total",
Help: "Total tasks run",
}, []string{"service", "project"})
)
// Context is a run context, with a STDOUT and a STDERR
type Context struct {
Stdout io.WriteCloser
Stderr io.WriteCloser
task *task.Task
run Runnable
}
type Runnable interface {
Prepare(map[string]string, string, uuid.UUID, []string) error
Run(stdout io.WriteCloser, stderr io.WriteCloser) (int, error)
Cancel()
}
type Runner struct {
tasks map[uuid.UUID]*Context
servicesDir string
volumes *volumes.Volumes
hosts []string
}
func NewRunner(servicesDir string, volumesRoot string, hosts []string) (*Runner, error) {
v, err := volumes.New(volumesRoot)
if err != nil {
return nil, err
}
return &Runner{
tasks: make(map[uuid.UUID]*Context),
servicesDir: servicesDir,
volumes: v,
hosts: hosts,
}, nil
}
// Prepare the run
// Prepare is synchronous, in order to raise an error in the REST endpoint.
// Prepare checks volumes stuff.
func (r *Runner) Prepare(t *task.Task, env map[string]string) (string, error) {
if t.Id == uuid.Nil {
return "", fmt.Errorf("task requires an ID to be prepared")
}
if _, found := r.tasks[t.Id]; found {
return "", fmt.Errorf("task with id `%s` already prepared", t.Id)
}
runnable, err := NewComposeRun(fmt.Sprintf("%s/%s", r.servicesDir, t.Service), env)
if err != nil {
return "", err
}
err = runnable.Prepare(env,
r.volumes.Path(t.Service, t.Project, t.Branch, t.Id.String()),
t.Id,
r.hosts)
if err != nil {
return "", err
}
r.tasks[t.Id] = &Context{
task: t,
Stdout: &ClosingBuffer{&bytes.Buffer{}},
Stderr: &ClosingBuffer{&bytes.Buffer{}},
run: runnable,
}
return runnable.run, nil
}
func (r *Runner) Run(t *task.Task) (int, error) {
ctx, found := r.tasks[t.Id]
if !found {
return 0, fmt.Errorf("task with id `%s` not found in runner", t.Id)
}
defer serviceRun.With(prometheus.Labels{
"service": t.Service,
"project": t.Project}).Inc()
return ctx.run.Run(ctx.Stdout, ctx.Stderr)
}