This repository has been archived by the owner on Oct 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
job_reconciler.go
161 lines (141 loc) · 5.83 KB
/
job_reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package quarksjob
import (
"context"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
qjv1a1 "code.cloudfoundry.org/quarks-job/pkg/kube/apis/quarksjob/v1alpha1"
"code.cloudfoundry.org/quarks-utils/pkg/config"
"code.cloudfoundry.org/quarks-utils/pkg/ctxlog"
"code.cloudfoundry.org/quarks-utils/pkg/versionedsecretstore"
)
const (
// DeleteKind specify the kind of deleting resource.
DeleteKind = "pod"
)
// NewJobReconciler returns a new Reconciler
func NewJobReconciler(ctx context.Context, config *config.Config, mgr manager.Manager) (reconcile.Reconciler, error) {
versionedSecretStore := versionedsecretstore.NewVersionedSecretStore(mgr.GetClient())
return &ReconcileJob{
ctx: ctx,
config: config,
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
versionedSecretStore: versionedSecretStore,
}, nil
}
// ReconcileJob reconciles an Job object
type ReconcileJob struct {
ctx context.Context
client client.Client
scheme *runtime.Scheme
config *config.Config
versionedSecretStore versionedsecretstore.VersionedSecretStore
}
// Reconcile reads that state of the cluster for a Job object that is owned by an QuarksJob and
// makes changes based on the state read and what is in the QuarksJob.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
func (r *ReconcileJob) Reconcile(request reconcile.Request) (reconcile.Result, error) {
instance := &batchv1.Job{}
// Set the ctx to be Background, as the top-level context for incoming requests.
ctx, cancel := context.WithTimeout(r.ctx, r.config.CtxTimeOut)
defer cancel()
ctxlog.Infof(ctx, "Reconciling job output '%s' in the QuarksJob context", request.NamespacedName)
err := r.client.Get(ctx, request.NamespacedName, instance)
if err != nil {
if apierrors.IsNotFound(err) {
// Do not requeue, job is probably deleted.
ctxlog.Infof(ctx, "Failed to find job '%s', not retrying: %s", request.NamespacedName, err)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
ctxlog.Errorf(ctx, "Failed to get job '%s': %s", request.NamespacedName, err)
return reconcile.Result{}, err
}
// Get the job's quarks job parent
parentName := ""
for _, owner := range instance.GetOwnerReferences() {
if *owner.Controller {
parentName = owner.Name
}
}
if parentName == "" {
err = ctxlog.WithEvent(instance, "NotFoundError").Errorf(ctx, "could not find parent quarksJob reference for Job '%s'", request.NamespacedName)
return reconcile.Result{}, err
}
qj := qjv1a1.QuarksJob{}
err = r.client.Get(ctx, types.NamespacedName{Name: parentName, Namespace: instance.GetNamespace()}, &qj)
if err != nil {
return reconcile.Result{}, errors.Wrapf(err, "getting parent quarksJob in Job Reconciler for job '%s/%s'", request.Namespace, instance.GetName())
}
// Delete Job if it succeeded
if instance.Status.Succeeded == 1 {
ctxlog.WithEvent(&qj, "DeletingJob").Infof(ctx, "Deleting succeeded job '%s/%s'", request.Namespace, instance.Name)
err = r.client.Delete(ctx, instance)
if err != nil {
_ = ctxlog.WithEvent(instance, "DeleteError").Errorf(ctx, "Cannot delete succeeded job: '%s'", err)
}
if d, ok := instance.Spec.Template.Labels["delete"]; ok {
if d == DeleteKind {
pod, err := r.jobPod(ctx, instance.Name, instance.GetNamespace())
if err != nil {
_ = ctxlog.WithEvent(instance, "NotFoundError").Errorf(ctx, "Cannot find job's pod: '%s'", err)
return reconcile.Result{}, nil
}
ctxlog.WithEvent(&qj, "DeletingJobsPod").Infof(ctx, "Deleting succeeded job's pod '%s/%s'", pod.Namespace, pod.Name)
err = r.client.Delete(ctx, pod)
if err != nil {
_ = ctxlog.WithEvent(instance, "DeleteError").Errorf(ctx, "Cannot delete succeeded job's pod: '%s'", err)
}
}
}
// Update QuarksJob status
qj.Status.Completed = true
err := r.client.Status().Update(ctx, &qj)
if err != nil {
_ = ctxlog.WithEvent(&qj, "UpdateError").Errorf(ctx, "Failed to update quarks job status '%s' (%s): %s", qj.GetNamespacedName(), qj.ResourceVersion, err)
return reconcile.Result{Requeue: false}, nil
}
}
return reconcile.Result{}, nil
}
// jobPod gets the job's pod. Only single-pod jobs are supported when persisting the output, so we just get the first one.
func (r *ReconcileJob) jobPod(ctx context.Context, name string, namespace string) (*corev1.Pod, error) {
list := &corev1.PodList{}
err := r.client.List(
ctx,
list,
client.InNamespace(namespace),
client.MatchingLabels(map[string]string{"job-name": name}),
)
if err != nil {
return nil, errors.Wrapf(err, "Listing job's '%s/%s' pods failed.", namespace, name)
}
if len(list.Items) == 0 {
return nil, errors.Errorf("Job '%s/%s' does not own any pods?", namespace, name)
}
// If there is only one job pod, then return index 0 pod.
latestPod := list.Items[0]
if len(list.Items) > 1 {
// If there are more than one job pods, then return the latest
// created job pod. There will be multiple job pods when job pods
// fail.
latestTimeStamp := list.Items[0].GetCreationTimestamp().UTC()
for podIndex, pod := range list.Items {
if latestTimeStamp.Before(pod.GetCreationTimestamp().UTC()) {
latestTimeStamp = pod.GetCreationTimestamp().UTC()
latestPod = list.Items[podIndex]
}
}
}
ctxlog.Infof(ctx, "Considering job pod '%s/%s' for persisting output", namespace, latestPod.GetName())
return &latestPod, nil
}