forked from cilium/cilium
/
informer.go
95 lines (83 loc) · 2.71 KB
/
informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Copyright 2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package informer
import (
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
type ConvertFunc func(obj interface{}) interface{}
// NewInformer is a copy of k8s.io/client-go/tools/cache/NewInformer with a new
// argument which converts an object into another object that can be stored in
// the local cache.
func NewInformer(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h cache.ResourceEventHandler,
convertFunc ConvertFunc,
) (cache.Store, cache.Controller) {
// This will hold the client state, as we know it.
clientState := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
return clientState, NewInformerWithStore(lw, objType, resyncPeriod, h, convertFunc, clientState)
}
// NewInformerWithStore uses the same arguments as NewInformer for which a
// caller can also set a cache.Store.
func NewInformerWithStore(
lw cache.ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h cache.ResourceEventHandler,
convertFunc ConvertFunc,
clientState cache.Store,
) cache.Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState)
cfg := &cache.Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
obj := convertFunc(d.Object)
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case cache.Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
}
return nil
},
}
return cache.New(cfg)
}