-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserving.go
111 lines (101 loc) · 2.47 KB
/
serving.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package types
import (
v1 "k8s.io/api/core/v1"
"fmt"
"github.com/kubeflow/arena/pkg/util"
log "github.com/sirupsen/logrus"
app_v1 "k8s.io/api/apps/v1"
"k8s.io/client-go/kubernetes"
)
type Serving struct {
Name string
ServeType string
Namespace string
Version string
pods []v1.Pod
deploy app_v1.Deployment
client *kubernetes.Clientset
}
var SERVING_CHARTS = map[string]string{
"tensorflow-serving-0.2.0": "Tensorflow",
"tensorrt-inference-server-0.0.1": "TensorRT",
}
var SERVING_TYPE = map[string]string{
"tf-serving": "Tensorflow",
"trt-serving": "TensorRT",
}
func NewServingJob(client *kubernetes.Clientset, deploy app_v1.Deployment, allPods []v1.Pod) Serving {
servingTypeLabel := deploy.Labels["servingType"]
serviceVersion := deploy.Labels["serviceVersion"]
servingName := deploy.Labels["servingName"]
servingType := "Tensorflow"
if serveType, ok := SERVING_TYPE[servingTypeLabel]; ok {
servingType = serveType
}
serving := Serving{
Name: servingName,
client: client,
ServeType: servingType,
Namespace: deploy.Namespace,
Version: serviceVersion,
deploy: deploy,
}
for _, pod := range allPods {
if IsPodControllerByDeploment(pod, deploy) {
serving.pods = append(serving.pods, pod)
}
}
return serving
}
func (s Serving) GetName() string {
if s.Version != "" {
return fmt.Sprintf("%s-%s", s.Name, s.Version)
}
return s.Name
}
func (s Serving) AllPods() []v1.Pod {
return s.pods
}
func (s Serving) GetClusterIP() string {
serviceName := fmt.Sprintf("%s-%s", s.deploy.Labels["release"], s.deploy.Labels["app"])
allServices, err := util.AcquireServingServices(s.Namespace, s.client)
if err != nil {
log.Errorf("failed to list serving services, err: %++v", err)
return "N/A"
}
for _, service := range allServices {
if service.Name == serviceName {
return service.Spec.ClusterIP
}
}
return "N/A"
}
func (s Serving) GetStatus() string {
hasPendingPod := false
for _, pod := range s.pods {
if pod.Status.Phase == v1.PodPending {
log.Debugf("pod %s is pending", pod.Name)
hasPendingPod = true
break
}
if hasPendingPod {
return "PENDING"
}
}
return "RUNNING"
}
func IsPodControllerByDeploment(pod v1.Pod, deploy app_v1.Deployment) bool {
if len(pod.OwnerReferences) == 0 {
return false
}
podLabel := pod.GetLabels()
if len(podLabel) == 0 {
return false
}
for key, value := range deploy.Spec.Selector.MatchLabels {
if podLabel[key] != value {
return false
}
}
return true
}