-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathwatch.go
99 lines (81 loc) · 2.08 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package k8s
import (
"github.com/dbunion/com/scheduler"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
type k8sWatcher struct {
watcher watch.Interface
done chan struct{}
}
// NewWatcher - create new k8s watcher
func NewWatcher(w watch.Interface) scheduler.Interface {
return &k8sWatcher{
watcher: w,
done: make(chan struct{}),
}
}
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
func (w *k8sWatcher) Stop() {
w.watcher.Stop()
w.done <- struct{}{}
}
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
func (w *k8sWatcher) ResultChan() <-chan scheduler.WatchEvent {
event := make(chan scheduler.WatchEvent)
go func() {
for {
select {
case e := <-w.watcher.ResultChan():
if e.Type == watch.Error {
continue
}
event <- w.processEvent(e)
case <-w.done:
return
}
}
}()
return event
}
func (w *k8sWatcher) processEvent(e watch.Event) scheduler.WatchEvent {
var event scheduler.WatchEvent
var object scheduler.Object
event.Type = scheduler.EventType(e.Type)
if cf, ok := e.Object.(*v1.ConfigMap); ok {
object = convertToConfig(cf)
}
if ns, ok := e.Object.(*v1.Namespace); ok {
object = convertToNamespace(ns)
}
if node, ok := e.Object.(*v1.Node); ok {
object = convertToNode(node)
}
if pod, ok := e.Object.(*v1.Pod); ok {
object = convertToPod(pod)
}
if rc, ok := e.Object.(*v1.ReplicationController); ok {
object = convertToRC(rc)
}
if svc, ok := e.Object.(*v1.Service); ok {
object = convertToService(svc)
}
if dpl, ok := e.Object.(*appsv1.Deployment); ok {
object = convertToDeployment(dpl)
}
if rs, ok := e.Object.(*appsv1.ReplicaSet); ok {
object = convertToReplicaSet(rs)
}
if sts, ok := e.Object.(*appsv1.StatefulSet); ok {
object = convertToSTS(sts)
}
if ds, ok := e.Object.(*appsv1.DaemonSet); ok {
object = convertToDaemonSet(ds)
}
event.Object = object
return event
}