/
watch.go
130 lines (122 loc) · 6.2 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package serviceaccount
import (
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
)
// Watch watch all serviceaccount resources.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) Watch(addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.WithNamespace(metav1.NamespaceAll).WatchByLabel("", addFunc, deleteFunc, modifyFunc)
}
// WatchByNamespace watch all serviceaccount resources in the specified namespace.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByNamespace(namespace string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
if len(namespace) == 0 {
namespace = metav1.NamespaceDefault
}
return h.WithNamespace(namespace).WatchByLabel("", addFunc, deleteFunc, modifyFunc)
}
// WatchByName watch a single serviceaccount reseource.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByName(name string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
listOptions := metav1.SingleObject(metav1.ObjectMeta{Name: name, Namespace: h.namespace})
listOptions.TimeoutSeconds = new(int64)
return h.watchServiceAccount(listOptions, addFunc, modifyFunc, deleteFunc)
}
// WatchByLabel watch a single or multiple ServiceAccount resources selected by the label.
// Multiple labels are separated by ",", label key and value conjunctaed by "=".
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByLabel(labels string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.watchServiceAccount(metav1.ListOptions{LabelSelector: labels, TimeoutSeconds: new(int64)},
addFunc, modifyFunc, deleteFunc)
}
// WatchByField watch a single or multiple ServiceAccount resources selected by the field.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByField(field string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
fieldSelector, err := fields.ParseSelector(field)
if err != nil {
return err
}
listOptions := metav1.ListOptions{FieldSelector: fieldSelector.String(), TimeoutSeconds: new(int64)}
return h.watchServiceAccount(listOptions, addFunc, modifyFunc, deleteFunc)
}
// watchServiceAccount watch serviceaccount resources according to listOptions.
func (h *Handler) watchServiceAccount(listOptions metav1.ListOptions,
addFunc, modifyFunc, deleteFunc func(obj interface{})) (err error) {
var watcher watch.Interface
// if event channel is closed, it means the server has closed the connection,
// reconnect to kubernetes API server.
for {
if watcher, err = h.clientset.CoreV1().ServiceAccounts(h.namespace).Watch(h.ctx, listOptions); err != nil {
return err
}
// kubernetes retains the resource event history, which includes this
// initial event, so that when our program first start, we are automatically
// notified of the serviceaccount existence and current state.
// There we will not ignore the first resource added event.
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
addFunc(event.Object)
case watch.Modified:
modifyFunc(event.Object)
case watch.Deleted:
deleteFunc(event.Object)
case watch.Bookmark:
log.Debug("watch serviceaccount: bookmark")
case watch.Error:
log.Debug("watch serviceaccount: error")
}
}
// If event channel is closed, it means the server has closed the connection
log.Debug("watch serviceaccount: reconnect to kubernetes")
watcher.Stop()
}
}