-
Notifications
You must be signed in to change notification settings - Fork 2
/
syncer.go
120 lines (99 loc) · 3.13 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package batch
import (
"github.com/equinor/radix-operator/pkg/apis/kube"
radixv1 "github.com/equinor/radix-operator/pkg/apis/radix/v1"
radixlabels "github.com/equinor/radix-operator/pkg/apis/utils/labels"
radixclient "github.com/equinor/radix-operator/pkg/client/clientset/versioned"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)
type Syncer interface {
OnSync() error
}
func NewSyncer(kubeclient kubernetes.Interface,
kubeutil *kube.Kube,
radixclient radixclient.Interface,
batch *radixv1.RadixBatch) Syncer {
return &syncer{
kubeclient: kubeclient,
kubeutil: kubeutil,
radixclient: radixclient,
batch: batch,
}
}
type syncer struct {
kubeclient kubernetes.Interface
kubeutil *kube.Kube
radixclient radixclient.Interface
batch *radixv1.RadixBatch
}
func (s *syncer) OnSync() error {
if err := s.restoreStatus(); err != nil {
return err
}
if isBatchDone(s.batch) {
return nil
}
return s.syncStatus(s.reconcile())
}
func (s *syncer) reconcile() error {
const syncStatusForEveryNumberOfBatchJobsReconciled = 10
rd, jobComponent, err := s.getRadixDeploymentAndJobComponent()
if err != nil {
return err
}
existingJobs, err := s.kubeutil.ListJobsWithSelector(s.batch.GetNamespace(), s.batchIdentifierLabel().String())
if err != nil {
return err
}
existingServices, err := s.kubeutil.ListServicesWithSelector(s.batch.GetNamespace(), s.batchIdentifierLabel().String())
if err != nil {
return err
}
for i, batchJob := range s.batch.Spec.Jobs {
if err := s.reconcileService(&batchJob, rd, jobComponent, existingServices); err != nil {
return err
}
if err := s.reconcileKubeJob(&batchJob, rd, jobComponent, existingJobs); err != nil {
return err
}
if i%syncStatusForEveryNumberOfBatchJobsReconciled == 0 {
if err := s.syncStatus(nil); err != nil {
return err
}
}
}
return nil
}
func (s *syncer) getRadixDeploymentAndJobComponent() (*radixv1.RadixDeployment, *radixv1.RadixDeployJobComponent, error) {
rd, err := s.getRadixDeployment()
if err != nil {
if errors.IsNotFound(err) {
return nil, nil, newReconcileRadixDeploymentNotFoundError(s.batch.Spec.RadixDeploymentJobRef.Name)
}
return nil, nil, err
}
jobComponent := rd.GetJobComponentByName(s.batch.Spec.RadixDeploymentJobRef.Job)
if jobComponent == nil {
return nil, nil, newReconcileRadixDeploymentJobSpecNotFoundError(rd.GetName(), s.batch.Spec.RadixDeploymentJobRef.Job)
}
return rd, jobComponent, nil
}
func (s *syncer) getRadixDeployment() (*radixv1.RadixDeployment, error) {
return s.kubeutil.GetRadixDeployment(s.batch.GetNamespace(), s.batch.Spec.RadixDeploymentJobRef.Name)
}
func (s *syncer) batchIdentifierLabel() labels.Set {
return radixlabels.Merge(
radixlabels.ForBatchName(s.batch.GetName()),
)
}
func (s *syncer) batchJobIdentifierLabel(batchJobName, appName string) labels.Set {
return radixlabels.Merge(
radixlabels.ForApplicationName(appName),
radixlabels.ForComponentName(s.batch.Spec.RadixDeploymentJobRef.Job),
s.batchIdentifierLabel(),
radixlabels.ForJobScheduleJobType(),
radixlabels.ForBatchJobName(batchJobName),
)
}