-
Notifications
You must be signed in to change notification settings - Fork 5
/
watch.go
112 lines (105 loc) · 5.04 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package node
import (
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
)
// Watch watch all node resources.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) Watch(addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.WatchByLabel("", addFunc, deleteFunc, modifyFunc)
}
// WatchByName watch a single node reseource.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByName(name string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
listOptions := metav1.SingleObject(metav1.ObjectMeta{Name: name})
listOptions.TimeoutSeconds = new(int64)
return h.watchNode(listOptions, addFunc, modifyFunc, deleteFunc)
}
// WatchByLabel watch a single or multiple Node resources selected by the label.
// Multiple labels are separated by ",", label key and value conjunctaed by "=".
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByLabel(labels string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.watchNode(metav1.ListOptions{LabelSelector: labels, TimeoutSeconds: new(int64)},
addFunc, modifyFunc, deleteFunc)
}
// WatchByField watch a single or multiple Node resources selected by the field.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByField(field string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
fieldSelector, err := fields.ParseSelector(field)
if err != nil {
return err
}
listOptions := metav1.ListOptions{FieldSelector: fieldSelector.String(), TimeoutSeconds: new(int64)}
return h.watchNode(listOptions, addFunc, modifyFunc, deleteFunc)
}
// watchNode watch node resources according to listOptions.
func (h *Handler) watchNode(listOptions metav1.ListOptions,
addFunc, modifyFunc, deleteFunc func(obj interface{})) (err error) {
var watcher watch.Interface
// if event channel is closed, it means the server has closed the connection,
// reconnect to kubernetes API server.
for {
if watcher, err = h.clientset.CoreV1().Nodes().Watch(h.ctx, listOptions); err != nil {
return err
}
// kubernetes retains the resource event history, which includes this
// initial event, so that when our program first start, we are automatically
// notified of the node existence and current state.
// There we will not ignore the first resource added event.
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
addFunc(event.Object)
case watch.Modified:
modifyFunc(event.Object)
case watch.Deleted:
deleteFunc(event.Object)
case watch.Bookmark:
log.Debug("watch node: bookmark")
case watch.Error:
log.Debug("watch node: error")
}
}
// If event channel is closed, it means the server has closed the connection
log.Debug("watch node: reconnect to kubernetes")
watcher.Stop()
}
}