This repository has been archived by the owner on Oct 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
job_controller.go
75 lines (65 loc) · 2.42 KB
/
job_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package quarksjob
import (
"context"
"fmt"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
qjv1a1 "code.cloudfoundry.org/quarks-job/pkg/kube/apis/quarksjob/v1alpha1"
"code.cloudfoundry.org/quarks-utils/pkg/config"
"code.cloudfoundry.org/quarks-utils/pkg/ctxlog"
)
// AddJob creates a new Job controller to collect the output from jobs, persist
// that output as a secret and delete the k8s job afterwards.
func AddJob(ctx context.Context, config *config.Config, mgr manager.Manager) error {
ctx = ctxlog.NewContextWithRecorder(ctx, "job-reconciler", mgr.GetEventRecorderFor("job-recorder"))
jobReconciler, err := NewJobReconciler(ctx, config, mgr)
if err != nil {
return err
}
jobController, err := controller.New("job-controller", mgr, controller.Options{
Reconciler: jobReconciler,
MaxConcurrentReconciles: config.MaxQuarksJobWorkers,
})
if err != nil {
return err
}
nsPredicate := newNSPredicate(ctx, mgr.GetClient(), config.MonitoredID)
predicate := predicate.Funcs{
// We're only interested in Jobs going from Active to final state (Succeeded or Failed)
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool {
o := e.ObjectNew.(*batchv1.Job)
if !o.GetDeletionTimestamp().IsZero() {
return false
}
if !isEJobJob(e.MetaNew.GetLabels()) {
return false
}
shouldProcessEvent := o.Status.Succeeded == 1 || o.Status.Failed > *o.Spec.BackoffLimit
if shouldProcessEvent {
ctxlog.NewPredicateEvent(o).Debug(
ctx, e.MetaNew, "batchv1.Job",
fmt.Sprintf("Update predicate passed for '%s/%s', existing batchv1.Job has changed to a defined final state",
e.MetaNew.GetNamespace(),
e.MetaNew.GetName()),
)
}
return shouldProcessEvent
},
}
return jobController.Watch(&source.Kind{Type: &batchv1.Job{}}, &handler.EnqueueRequestForObject{}, nsPredicate, predicate)
}
// isEJobJob matches our jobs
func isEJobJob(labels map[string]string) bool {
if _, exists := labels[qjv1a1.LabelQJobName]; exists {
return true
}
return false
}