/
watcher.go
58 lines (50 loc) · 1.44 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package podwatcher
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kubeinformer "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/flomesh-io/fsm/pkg/cni/config"
)
type watcher struct {
Client kubernetes.Interface
CurrentNodeName string
OnAddFunc func(obj interface{})
OnUpdateFunc func(oldObj, newObj interface{})
OnDeleteFunc func(obj interface{})
Stop chan struct{}
}
func (w *watcher) start() error {
selectByNode := ""
if !config.IsKind {
selectByNode = fields.OneTermEqualSelector("spec.nodeName", w.CurrentNodeName).String()
}
kubeInformerFactory := kubeinformer.NewFilteredSharedInformerFactory(
w.Client, 30*time.Second, metav1.NamespaceAll,
func(o *metav1.ListOptions) {
o.FieldSelector = selectByNode
},
)
_, _ = kubeInformerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: w.OnAddFunc,
UpdateFunc: w.OnUpdateFunc,
DeleteFunc: w.OnDeleteFunc,
})
kubeInformerFactory.Start(w.Stop)
return nil
}
func (w *watcher) shutdown() {
close(w.Stop)
}
func newWatcher(watch watcher) *watcher {
return &watcher{
Client: watch.Client,
CurrentNodeName: watch.CurrentNodeName,
OnAddFunc: watch.OnAddFunc,
OnUpdateFunc: watch.OnUpdateFunc,
OnDeleteFunc: watch.OnDeleteFunc,
Stop: make(chan struct{}),
}
}