-
Notifications
You must be signed in to change notification settings - Fork 0
/
runner.go
104 lines (83 loc) · 2.15 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package jobs
import (
"context"
"github.com/Noah-Huppert/kube-git-deploy/api/models"
"github.com/Noah-Huppert/golog"
etcd "go.etcd.io/etcd/client"
)
// JobRunner is responsible for running jobs.
type JobRunner struct {
// ctx is context
ctx context.Context
// logger prints debug information
logger golog.Logger
// etcdKV is an etcd key value API client
etcdKV etcd.KeysAPI
// jobs holds all the currently running jobs. Keys are JobIDs.
jobs map[models.JobID]*models.Job
// jobsChan accepts Jobs to run
jobsChan chan *models.Job
}
// NewJobRunner creates a new JobRunner
func NewJobRunner(ctx context.Context, logger golog.Logger,
etcdKV etcd.KeysAPI) *JobRunner {
return &JobRunner{
ctx: ctx,
logger: logger,
etcdKV: etcdKV,
jobs: map[models.JobID]*models.Job{},
jobsChan: make(chan *models.Job),
}
}
// Submit sends a job to the runner main loop for future execution
func (r *JobRunner) Submit(job *models.Job) {
r.jobsChan <- job
}
// Run starts the JobRunner main logic loop
func (r *JobRunner) Run() error {
// Wait for job to be submitted
for true {
select {
case job := <-r.jobsChan:
// Check job isn't already running
_, ok := r.jobs[job.ID]
if ok {
// Already running
break
}
// Add to jobs map
r.jobs[job.ID] = job
// Execute job
go r.executeJob(job)
case <-r.ctx.Done():
r.logger.Info("Job runner stopping")
return nil
}
}
return nil
}
// executeJob runs the logic for a job. Should be started in a Go routine as
// it will block execution until the job finishes.
func (r *JobRunner) executeJob(job *models.Job) {
// Prepare
// ... Run
prepareAction := NewPrepareAction(r.ctx, r.logger, r.etcdKV)
prepareOK := true
err := prepareAction.Run(job, job.State.PrepareState)
if err != nil {
r.logger.Errorf("error running prepare action, Job.ID: %#v "+
", error: %s", job.ID, err.Error())
job.State.PrepareState.SetError(err.Error())
prepareOK = false
}
// ... Save
err = job.Set(r.ctx, r.etcdKV)
if err != nil {
r.logger.Errorf("error saving job after prepare action, "+
"Job.ID: %#v, error: %s", job.ID, err.Error())
return
}
if !prepareOK {
return
}
}