forked from kubeflow/training-operator
/
informer.go
123 lines (107 loc) · 3.67 KB
/
informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package tensorflow
import (
"fmt"
"time"
log "github.com/sirupsen/logrus"
metav1unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
restclientset "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1"
"github.com/kubeflow/tf-operator/pkg/apis/tensorflow/validation"
tfjobinformers "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions"
tfjobinformersv1 "github.com/kubeflow/tf-operator/pkg/client/informers/externalversions/tensorflow/v1"
"github.com/kubeflow/tf-operator/pkg/common/util/v1/unstructured"
tflogger "github.com/kubeflow/tf-operator/pkg/logger"
)
const (
resyncPeriod = 30 * time.Second
failedMarshalMsg = "Failed to marshal the object to TFJob: %v"
)
var (
errGetFromKey = fmt.Errorf("failed to get TFJob from key")
errNotExists = fmt.Errorf("the object is not found")
errFailedMarshal = fmt.Errorf("failed to marshal the object to TFJob")
)
func NewUnstructuredTFJobInformer(restConfig *restclientset.Config, namespace string) tfjobinformersv1.TFJobInformer {
dclient, err := dynamic.NewForConfig(restConfig)
if err != nil {
panic(err)
}
resource := schema.GroupVersionResource{
Group: tfv1.GroupName,
Version: tfv1.GroupVersion,
Resource: tfv1.Plural,
}
informer := unstructured.NewTFJobInformer(
resource,
dclient,
namespace,
resyncPeriod,
cache.Indexers{},
)
return informer
}
// NewTFJobInformer returns TFJobInformer from the given factory.
func (tc *TFController) NewTFJobInformer(tfJobInformerFactory tfjobinformers.SharedInformerFactory) tfjobinformersv1.TFJobInformer {
return tfJobInformerFactory.Kubeflow().V1().TFJobs()
}
func (tc *TFController) getTFJobFromName(namespace, name string) (*tfv1.TFJob, error) {
key := fmt.Sprintf("%s/%s", namespace, name)
return tc.getTFJobFromKey(key)
}
func (tc *TFController) getTFJobFromKey(key string) (*tfv1.TFJob, error) {
// Check if the key exists.
obj, exists, err := tc.tfJobInformer.GetIndexer().GetByKey(key)
logger := tflogger.LoggerForKey(key)
if err != nil {
logger.Errorf("Failed to get TFJob '%s' from informer index: %+v", key, err)
return nil, errGetFromKey
}
if !exists {
// This happens after a tfjob was deleted, but the work queue still had an entry for it.
return nil, errNotExists
}
return tfJobFromUnstructured(obj)
}
func tfJobFromUnstructured(obj interface{}) (*tfv1.TFJob, error) {
// Check if the spec is valid.
un, ok := obj.(*metav1unstructured.Unstructured)
if !ok {
log.Errorf("The object in index is not an unstructured; %+v", obj)
return nil, errGetFromKey
}
var tfjob tfv1.TFJob
err := runtime.DefaultUnstructuredConverter.FromUnstructured(un.Object, &tfjob)
logger := tflogger.LoggerForUnstructured(un, tfv1.Kind)
if err != nil {
logger.Errorf(failedMarshalMsg, err)
return nil, errFailedMarshal
}
// This is a simple validation for TFJob to close
// https://github.com/kubeflow/tf-operator/issues/641
// TODO(gaocegege): Add more validation here.
err = validation.ValidateV1TFJobSpec(&tfjob.Spec)
if err != nil {
logger.Errorf(failedMarshalMsg, err)
return nil, errFailedMarshal
}
return &tfjob, nil
}
func unstructuredFromTFJob(obj interface{}, tfJob *tfv1.TFJob) error {
un, ok := obj.(*metav1unstructured.Unstructured)
logger := tflogger.LoggerForJob(tfJob)
if !ok {
logger.Warn("The object in index isn't type Unstructured")
return errGetFromKey
}
var err error
un.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(tfJob)
if err != nil {
logger.Error("The TFJob convert failed")
return err
}
return nil
}