/
job.go
105 lines (84 loc) · 2.39 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package template
import (
"context"
"time"
"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
func CreateJob(ctx context.Context, c client.Client, instance *batchv1.Job, log logr.Logger) error {
if err := c.Get(ctx, client.ObjectKeyFromObject(instance), instance); err != nil {
if !errors.IsNotFound(err) {
return err
}
log.Info("Creating Job", "Name", instance.Name)
return c.Create(ctx, instance)
}
return nil
}
func DeleteJob(ctx context.Context, c client.Client, instance *batchv1.Job, log logr.Logger) error {
log.Info("Deleting Job", "Name", instance.Name)
return c.Delete(ctx, instance,
client.PropagationPolicy(metav1.DeletePropagationForeground))
}
func NewJobRunner(ctx context.Context, c client.Client, log logr.Logger) *JobRunner {
return &JobRunner{
ctx: ctx,
client: c,
log: log,
}
}
type JobRunner struct {
ctx context.Context
client client.Client
log logr.Logger
jobs []jobHashField
readyField *bool
}
func (r *JobRunner) Add(hashField *string, job *batchv1.Job) {
r.jobs = append(r.jobs, jobHashField{
Job: job,
HashField: hashField,
})
}
func (r *JobRunner) SetReady(readyField *bool) {
r.readyField = readyField
}
func (r *JobRunner) Run(owner client.Object) (ctrl.Result, error) {
for i, jh := range r.jobs {
job := jh.Job
controllerutil.SetControllerReference(owner, job, r.client.Scheme())
jobHash, err := ObjectHash(job)
if err != nil {
return ctrl.Result{}, err
}
if *jh.HashField == jobHash {
continue
}
if err := CreateJob(r.ctx, r.client, job, r.log); err != nil {
return ctrl.Result{}, err
} else if job.Status.CompletionTime == nil {
r.log.Info("Waiting on job completion", "name", job.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
if err := DeleteJob(r.ctx, r.client, job, r.log); err != nil {
return ctrl.Result{}, err
}
*jh.HashField = jobHash
if i == len(r.jobs)-1 && r.readyField != nil {
*r.readyField = true
}
if err := r.client.Status().Update(r.ctx, owner); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
type jobHashField struct {
Job *batchv1.Job
HashField *string
}