This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
k8s.go
105 lines (97 loc) · 2.86 KB
/
k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package utils
import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var NotTheOwnerError = errors.Errorf("FlytePropeller is not the owner")
// ResourceNvidiaGPU is the name of the Nvidia GPU resource.
const ResourceNvidiaGPU = "nvidia.com/gpu"
func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {
envVars := make([]v1.EnvVar, 0, len(env))
for _, kv := range env {
envVars = append(envVars, v1.EnvVar{Name: kv.Key, Value: kv.Value})
}
return envVars
}
// TODO we should modify the container resources to contain a map of enum values?
// Also we should probably create tolerations / taints, but we could do that as a post process
func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceList, error) {
k8sResources := make(v1.ResourceList, len(resources))
for _, r := range resources {
rVal := r.Value
v, err := resource.ParseQuantity(rVal)
if err != nil {
return nil, errors.Wrap(err, "Failed to parse resource as a valid quantity.")
}
switch r.Name {
case core.Resources_CPU:
if !v.IsZero() {
k8sResources[v1.ResourceCPU] = v
}
case core.Resources_MEMORY:
if !v.IsZero() {
k8sResources[v1.ResourceMemory] = v
}
case core.Resources_STORAGE:
if !v.IsZero() {
k8sResources[v1.ResourceStorage] = v
}
case core.Resources_GPU:
if !v.IsZero() {
k8sResources[ResourceNvidiaGPU] = v
}
}
}
return k8sResources, nil
}
func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error) {
res := &v1.ResourceRequirements{}
if resources == nil {
return res, nil
}
req, err := ToK8sResourceList(resources.Requests)
if err != nil {
return res, err
}
lim, err := ToK8sResourceList(resources.Limits)
if err != nil {
return res, err
}
res.Limits = lim
res.Requests = req
return res, nil
}
func GetWorkflowIDFromObject(obj metav1.Object) (v1alpha1.WorkflowID, error) {
controller := metav1.GetControllerOf(obj)
if controller == nil {
return "", NotTheOwnerError
}
if controller.Kind == v1alpha1.FlyteWorkflowKind {
return obj.GetNamespace() + "/" + controller.Name, nil
}
return "", NotTheOwnerError
}
func GetWorkflowIDFromOwner(reference *metav1.OwnerReference, namespace string) (v1alpha1.WorkflowID, error) {
if reference == nil {
return "", NotTheOwnerError
}
if reference.Kind == v1alpha1.FlyteWorkflowKind {
return namespace + "/" + reference.Name, nil
}
return "", NotTheOwnerError
}
func GetProtoTime(t *metav1.Time) *timestamp.Timestamp {
if t != nil {
pTime, err := ptypes.TimestampProto(t.Time)
if err == nil {
return pTime
}
}
return ptypes.TimestampNow()
}