-
Notifications
You must be signed in to change notification settings - Fork 8
/
create_load_job.go
110 lines (102 loc) · 3.87 KB
/
create_load_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/
package load
import (
"fmt"
"os"
"time"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
alluxiov1alpha1 "github.com/alluxio/k8s-operator/api/v1alpha1"
"github.com/alluxio/k8s-operator/pkg/logger"
"github.com/alluxio/k8s-operator/pkg/utils"
)
func (r *LoadReconciler) createLoadJob(ctx LoadReconcilerReqCtx) (ctrl.Result, error) {
// Update the status before job creation instead of after, because otherwise if the status update fails,
// the reconciler will loop again and create another same job, leading to failure to create duplicated job which is confusing.
ctx.Load.Status.Phase = alluxiov1alpha1.LoadPhaseLoading
_, err := r.updateLoadStatus(ctx)
if err != nil {
logger.Infof("Job is pending because status was not updated successfully")
return ctrl.Result{}, err
}
loadJob, err := getLoadJobFromYaml()
if err != nil {
return ctrl.Result{}, err
}
constructLoadJob(ctx.AlluxioCluster, ctx.Load, loadJob)
if err := r.Create(ctx.Context, loadJob); err != nil {
logger.Errorf("Failed to load data of dataset %s: %v", ctx.NamespacedName.String(), err)
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func getLoadJobFromYaml() (*batchv1.Job, error) {
loadJobYaml, err := os.ReadFile("/opt/alluxio-jobs/load.yaml")
if err != nil {
logger.Errorf("Failed to read load job yaml file at /opt/alluxio-jobs/load.yaml: %v", err)
return nil, err
}
loadJob, _, err := scheme.Codecs.UniversalDeserializer().Decode(loadJobYaml, nil, nil)
if err != nil {
logger.Errorf("Failed to parse load job yaml file: %v", err)
}
return loadJob.(*batchv1.Job), nil
}
func constructLoadJob(alluxio *alluxiov1alpha1.AlluxioCluster, load *alluxiov1alpha1.Load, loadJob *batchv1.Job) {
loadJob.Name = utils.GetLoadJobName(load.Name)
loadJob.Namespace = alluxio.Namespace
var imagePullSecrets []corev1.LocalObjectReference
for _, secret := range alluxio.Spec.ImagePullSecrets {
imagePullSecrets = append(imagePullSecrets, corev1.LocalObjectReference{Name: secret})
}
loadJob.Spec.Template.Spec.ImagePullSecrets = imagePullSecrets
loadJob.Spec.Template.Spec.ServiceAccountName = alluxio.Spec.ServiceAccountName
loadJob.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", alluxio.Spec.Image, alluxio.Spec.ImageTag)
loadJob.Spec.Template.Spec.Containers[0].Command = []string{"go", "run", "/load.go", load.Spec.Path}
alluxioConfigMapName := utils.GetAlluxioConfigMapName(alluxio.Spec.NameOverride, alluxio.Name)
loadConfigMapName := utils.GetLoadConfigmapName(alluxio.Spec.NameOverride, alluxio.Name)
loadJob.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: alluxioConfigMapName,
MountPath: "/opt/alluxio/conf",
},
{
Name: loadConfigMapName,
MountPath: "/load.go",
SubPath: "load.go",
},
}
loadJob.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: alluxioConfigMapName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: alluxioConfigMapName,
},
},
},
},
{
Name: loadConfigMapName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: loadConfigMapName,
},
},
},
},
}
}