This repository has been archived by the owner on Apr 3, 2022. It is now read-only.
/
watcher.go
116 lines (96 loc) · 2.5 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package clb
import (
"net"
"strconv"
"time"
"google.golang.org/grpc/naming"
"github.com/golang/glog"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
const (
opAdd = "Add"
opDel = "Del"
)
func (clb *Clb) watchPods() {
resyncPeriod := 10 * time.Minute
//Setup an informer to call functions when the watchlist changes
clb.indexer, clb.controller = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: clb.listFunc,
WatchFunc: clb.watchFunc,
},
&v1.Pod{},
resyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: clb.process,
DeleteFunc: clb.process,
UpdateFunc: func(oldObj, newObj interface{}) {
clb.process(newObj)
},
},
cache.Indexers{},
)
stop := make(chan struct{})
go clb.controller.Run(stop)
}
func (clb *Clb) process(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
return
}
target := pod.Labels["app"]
addr := hostPort(pod, clb.ports[target])
opn, op := opAdd, naming.Add
if pod.GetDeletionTimestamp() != nil {
opn, op = opDel, naming.Delete
}
if op != naming.Delete && verifyPodReady(pod) == false {
return
}
glog.Infof("[clb] %s, %s, %s, %s\n", target, pod.Name, pod.Status.PodIP, opn)
clb.updates[target] <- []*naming.Update{{Op: op, Addr: addr}}
}
func (clb *Clb) listFunc(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = clb.selector
return clb.clientset.CoreV1().Pods(clb.namespace).List(options)
}
func (clb *Clb) watchFunc(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = clb.selector
return clb.clientset.CoreV1().Pods(clb.namespace).Watch(options)
}
func verifyPodReady(pod *v1.Pod) bool {
if len(pod.Status.PodIP) == 0 {
return false
}
if len(pod.Status.ContainerStatuses) == 0 {
return false
}
for i := 0; i < len(pod.Status.ContainerStatuses); i++ {
if pod.Status.ContainerStatuses[i].State.Running == nil {
return false
}
}
return true
}
func hostPort(pod *v1.Pod, portName string) string {
port := findPort(pod, portName)
if len(port) == 0 {
glog.Errorf("[clb] Unable to find a '%s' port on %s", portName, pod.Name)
port = "8080"
}
return net.JoinHostPort(pod.Status.PodIP, port)
}
func findPort(pod *v1.Pod, portName string) string {
for i := range pod.Spec.Containers {
for _, port := range pod.Spec.Containers[i].Ports {
if port.Name == portName {
return strconv.Itoa(int(port.ContainerPort))
}
}
}
return ""
}