-
Notifications
You must be signed in to change notification settings - Fork 5
/
watch.go
152 lines (143 loc) · 7.04 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package dynamic
import (
utilrestmapper "github.com/forbearing/k8s/util/restmapper"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/watch"
)
// Watch watch all k8s resource with the specified kind.
// You should always specify the GroupVersionKind with WithGVK() method.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) Watch(addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.WithNamespace(metav1.NamespaceAll).WatchByLabel("", addFunc, modifyFunc, deleteFunc)
}
// WatchByNamespace watch all k8s resource with the specified kind in the specified namespace.
// You should always specify the GroupVersionKind with WithGVK() method.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByNamespace(namespace string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
if len(namespace) == 0 {
namespace = metav1.NamespaceDefault
}
return h.WithNamespace(namespace).WatchByLabel("", addFunc, modifyFunc, deleteFunc)
}
// WatchByName watch a single k8s resource with the specified Kind.
// You should always specify the GroupVersionKind with WithGVK() method.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByName(name string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
listOptions := metav1.SingleObject(metav1.ObjectMeta{Name: name, Namespace: h.namespace})
listOptions.TimeoutSeconds = new(int64)
return h.watchUnstructuredObj(listOptions, addFunc, modifyFunc, deleteFunc)
}
// WatchByLabel watch a single or multiple k8s resource with the specified Kind
// and selected by the labels. Multiple labels are separated by ",",
// label key and value conjunctaed by "=".
// You should always specify the GroupVersionKind with WithGVK() method.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByLabel(labels string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
return h.watchUnstructuredObj(
metav1.ListOptions{LabelSelector: labels, TimeoutSeconds: new(int64)},
addFunc, modifyFunc, deleteFunc)
}
// WatchByField watch a single or multiple k8s resources with specified Kind
// and selected by the field.
// You should always specify the GroupVersionKind with WithGVK() method.
//
// Object as the parameter of addFunc, modifyFunc, deleteFunc:
// * If Event.Type is Added or Modified: the new state of the object.
// * If Event.Type is Deleted: the state of the object immediately before deletion.
// * If Event.Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Event.Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
func (h *Handler) WatchByField(field string, addFunc, modifyFunc, deleteFunc func(obj interface{})) error {
fieldSelector, err := fields.ParseSelector(field)
if err != nil {
return err
}
listOptions := metav1.ListOptions{FieldSelector: fieldSelector.String(), TimeoutSeconds: new(int64)}
return h.watchUnstructuredObj(listOptions, addFunc, modifyFunc, deleteFunc)
}
// watchUnstructuredObj watch k8s object according to the listOptions.
func (h *Handler) watchUnstructuredObj(listOptions metav1.ListOptions,
addFunc, modifyFunc, deleteFunc func(obj interface{})) (err error) {
if h.gvr, err = utilrestmapper.GVKToGVR(h.restMapper, h.gvk); err != nil {
return err
}
if h.isNamespaced, err = utilrestmapper.IsNamespaced(h.restMapper, h.gvk); err != nil {
return err
}
var watcher watch.Interface
// If event channel is closed, it means the server has closed the connection,
// reconnect to kubernetes API server.
for {
if h.isNamespaced {
if watcher, err = h.dynamicClient.Resource(h.gvr).Namespace(h.namespace).Watch(h.ctx, listOptions); err != nil {
return err
}
} else {
if watcher, err = h.dynamicClient.Resource(h.gvr).Watch(h.ctx, listOptions); err != nil {
return err
}
}
// Kubernetes retains the resource event history, which includes this
// initial event, so that when our program first start, we are automatically
// notified of the deployment existence and current state.
// There we will not ignore the first resource added event.
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
addFunc(event.Object)
case watch.Modified:
modifyFunc(event.Object)
case watch.Deleted:
deleteFunc(event.Object)
case watch.Bookmark:
log.Debugf("watch %s: bookmark", h.gvr.Resource)
case watch.Error:
log.Debugf("watch %s: error", h.gvr.Resource)
}
}
// If event channel is closed, it means the server has closed the connection
log.Debugf("watch %s: reconnect to kubernetes", h.gvr.Resource)
watcher.Stop()
}
}