/
watch.go
130 lines (122 loc) · 6.15 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package rolebinding
import (
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
)
// Watch watch all rolebinding resources.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) Watch(addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.WithNamespace(metav1.NamespaceAll).WatchByLabel("", addFunc, deleteFunc, modifyFunc)
}
// WatchByNamespace watch all rolebinding resources in the specified namespace.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByNamespace(namespace string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
if len(namespace) == 0 {
namespace = metav1.NamespaceDefault
}
return h.WithNamespace(namespace).WatchByLabel("", addFunc, deleteFunc, modifyFunc)
}
// WatchByName watch a single rolebinding reseource.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByName(name string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
listOptions := metav1.SingleObject(metav1.ObjectMeta{Name: name, Namespace: h.namespace})
listOptions.TimeoutSeconds = new(int64)
return h.watchRoleBinding(listOptions, addFunc, modifyFunc, deleteFunc)
}
// WatchByLabel watch a single or multiple RoleBinding resources selected by the label.
// Multiple labels are separated by ",", label key and value conjunctaed by "=".
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByLabel(labels string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.watchRoleBinding(metav1.ListOptions{LabelSelector: labels, TimeoutSeconds: new(int64)},
addFunc, modifyFunc, deleteFunc)
}
// WatchByField watch a single or multiple RoleBinding resources selected by the field.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByField(field string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
fieldSelector, err := fields.ParseSelector(field)
if err != nil {
return err
}
listOptions := metav1.ListOptions{FieldSelector: fieldSelector.String(), TimeoutSeconds: new(int64)}
return h.watchRoleBinding(listOptions, addFunc, modifyFunc, deleteFunc)
}
// watchRoleBinding watch rolebinding resources according to listOptions.
func (h *Handler) watchRoleBinding(listOptions metav1.ListOptions,
addFunc, modifyFunc, deleteFunc func(obj interface{})) (err error) {
var watcher watch.Interface
// if event channel is closed, it means the server has closed the connection,
// reconnect to kubernetes API server.
for {
if watcher, err = h.clientset.RbacV1().RoleBindings(h.namespace).Watch(h.ctx, listOptions); err != nil {
return err
}
// kubernetes retains the resource event history, which includes this
// initial event, so that when our program first start, we are automatically
// notified of the rolebinding existence and current state.
// There we will not ignore the first resource added event.
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
addFunc(event.Object)
case watch.Modified:
modifyFunc(event.Object)
case watch.Deleted:
deleteFunc(event.Object)
case watch.Bookmark:
log.Debug("watch rolebinding: bookmark")
case watch.Error:
log.Debug("watch rolebinding: error")
}
}
// If event channel is closed, it means the server has closed the connection
log.Debug("watch rolebinding: reconnect to kubernetes")
watcher.Stop()
}
}