-
Notifications
You must be signed in to change notification settings - Fork 686
/
k8s.go
140 lines (124 loc) · 4.51 KB
/
k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package agent
import (
"context"
"sync"
"github.com/datawire/dlib/dlog"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)
// CallbackEventType defines the possible callback types of events.
type CallbackEventType string
const (
CallbackEventAdded CallbackEventType = "ADDED"
CallbackEventDeleted CallbackEventType = "DELETED"
CallbackEventUpdated CallbackEventType = "UPDATED"
)
// GenericCallback is used to be returned in the channel managed by the WatchGeneric method.
type GenericCallback struct {
// EventType is the event type that originated this callback.
EventType CallbackEventType
// Obj has the new resource state for this event type. If event type is CallbackEventDeleted
// it will contain the last resource state before being deleted.
Obj *unstructured.Unstructured
// Sotw has the state of the world for all resources of the type being watched.
Sotw []interface{}
}
// DynamicClient is the struct that provides the main functionality of watching
// generic Kubernetes resources that may of may not be available (installed) in
// the cluster.
type DynamicClient struct {
newInformer InformerFunc
di dynamic.Interface
done bool
mux sync.Mutex
}
// NewDynamicClient is the main contructor of DynamicClient
func NewDynamicClient(di dynamic.Interface, informerFn InformerFunc) *DynamicClient {
return &DynamicClient{
newInformer: informerFn,
di: di,
}
}
// Informer holds the operations necessary from a k8s informer in order to
// provide the functionality to watch a generic resource.
type Informer interface {
AddEventHandler(handler cache.ResourceEventHandler)
Run(stopCh <-chan struct{})
ListCache() []interface{}
}
type InformerFunc func(dynamic.Interface, string, *schema.GroupVersionResource) Informer
// K8sInformer is a real Informer implementation.
type K8sInformer struct {
cache.SharedIndexInformer
}
// ListCache will return the current state of the cache store from the Kubernetes
// informer.
func (i *K8sInformer) ListCache() []interface{} {
return i.GetStore().List()
}
// NewK8sInformer builds and returns a real Kubernetes Informer implementation.
func NewK8sInformer(cli dynamic.Interface, ns string, gvr *schema.GroupVersionResource) Informer {
f := dynamicinformer.NewFilteredDynamicSharedInformerFactory(cli, 0, ns, nil)
i := f.ForResource(*gvr).Informer()
return &K8sInformer{
SharedIndexInformer: i,
}
}
func (dc *DynamicClient) sendCallback(callbackChan chan<- *GenericCallback, callback *GenericCallback) {
dc.mux.Lock()
defer dc.mux.Unlock()
if dc.done {
return
}
callbackChan <- callback
}
// WatchGeneric will watch any resource existing in the cluster or not. This is usefull for
// watching CRDs that may or may not be available in the cluster.
func (dc *DynamicClient) WatchGeneric(ctx context.Context, ns string, gvr *schema.GroupVersionResource) <-chan *GenericCallback {
callbackChan := make(chan *GenericCallback)
go func() {
<-ctx.Done()
dc.mux.Lock()
defer dc.mux.Unlock()
dc.done = true
close(callbackChan)
}()
i := dc.newInformer(dc.di, ns, gvr)
i.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
dlog.Debugf(ctx, "WatchGeneric: AddFunc called for resource %q", gvr.String())
new := obj.(*unstructured.Unstructured)
sotw := i.ListCache()
callback := &GenericCallback{EventType: CallbackEventAdded, Obj: new, Sotw: sotw}
dc.sendCallback(callbackChan, callback)
},
UpdateFunc: func(oldObj, newObj interface{}) {
dlog.Debugf(ctx, "WatchGeneric: UpdateFunc called for resource %q", gvr.String())
new := newObj.(*unstructured.Unstructured)
sotw := i.ListCache()
callback := &GenericCallback{EventType: CallbackEventUpdated, Obj: new, Sotw: sotw}
dc.sendCallback(callbackChan, callback)
},
DeleteFunc: func(obj interface{}) {
dlog.Debugf(ctx, "WatchGeneric: DeleteFunc called for resource %q", gvr.String())
var old *unstructured.Unstructured
switch o := obj.(type) {
case cache.DeletedFinalStateUnknown:
old = o.Obj.(*unstructured.Unstructured)
case *unstructured.Unstructured:
old = o
}
sotw := i.ListCache()
callback := &GenericCallback{EventType: CallbackEventDeleted, Obj: old, Sotw: sotw}
dc.sendCallback(callbackChan, callback)
},
},
)
go i.Run(ctx.Done())
dlog.Infof(ctx, "WatchGeneric: Listening for events from resouce %q", gvr.String())
return callbackChan
}