forked from openshift/origin
/
shared_informer.go
150 lines (120 loc) · 5.16 KB
/
shared_informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package shared
import (
"reflect"
"sync"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/informers"
oclient "github.com/openshift/origin/pkg/client"
)
type InformerFactory interface {
// Start starts informers that can start AFTER the API server and controllers have started
Start(stopCh <-chan struct{})
// StartCore starts core informers that must initialize in order for the API server to start
StartCore(stopCh <-chan struct{})
ClusterPolicies() ClusterPolicyInformer
ClusterPolicyBindings() ClusterPolicyBindingInformer
Policies() PolicyInformer
PolicyBindings() PolicyBindingInformer
DeploymentConfigs() DeploymentConfigInformer
BuildConfigs() BuildConfigInformer
ImageStreams() ImageStreamInformer
SecurityContextConstraints() SecurityContextConstraintsInformer
ClusterResourceQuotas() ClusterResourceQuotaInformer
KubernetesInformers() informers.SharedInformerFactory
// TODO switch to the generated upstream informers once the kube 1.6 rebase is
// in
ReplicationControllers() ReplicationControllerInformer
}
// ListerWatcherOverrides allows a caller to specify special behavior for particular ListerWatchers
// For instance, authentication and authorization types need to go direct to etcd, not through an API server
type ListerWatcherOverrides interface {
// GetListerWatcher returns back a ListerWatcher for a given resource or nil if
// no particular ListerWatcher was specified for the type
GetListerWatcher(resource unversioned.GroupResource) cache.ListerWatcher
}
type DefaultListerWatcherOverrides map[unversioned.GroupResource]cache.ListerWatcher
func (o DefaultListerWatcherOverrides) GetListerWatcher(resource unversioned.GroupResource) cache.ListerWatcher {
return o[resource]
}
func NewInformerFactory(kubeInformers informers.SharedInformerFactory, kubeClient kclientset.Interface, originClient oclient.Interface, customListerWatchers ListerWatcherOverrides, defaultResync time.Duration) InformerFactory {
return &sharedInformerFactory{
kubeInformers: kubeInformers,
kubeClient: kubeClient,
originClient: originClient,
customListerWatchers: customListerWatchers,
defaultResync: defaultResync,
informers: map[reflect.Type]cache.SharedIndexInformer{},
coreInformers: map[reflect.Type]cache.SharedIndexInformer{},
startedInformers: map[reflect.Type]bool{},
startedCoreInformers: map[reflect.Type]bool{},
}
}
type sharedInformerFactory struct {
kubeInformers informers.SharedInformerFactory
kubeClient kclientset.Interface
originClient oclient.Interface
customListerWatchers ListerWatcherOverrides
defaultResync time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
coreInformers map[reflect.Type]cache.SharedIndexInformer
startedInformers map[reflect.Type]bool
startedCoreInformers map[reflect.Type]bool
lock sync.Mutex
}
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
func (f *sharedInformerFactory) StartCore(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.coreInformers {
if !f.startedCoreInformers[informerType] {
go informer.Run(stopCh)
f.startedCoreInformers[informerType] = true
}
}
}
func (f *sharedInformerFactory) ClusterPolicies() ClusterPolicyInformer {
return &clusterPolicyInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterPolicyBindings() ClusterPolicyBindingInformer {
return &clusterPolicyBindingInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) Policies() PolicyInformer {
return &policyInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) PolicyBindings() PolicyBindingInformer {
return &policyBindingInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) DeploymentConfigs() DeploymentConfigInformer {
return &deploymentConfigInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) BuildConfigs() BuildConfigInformer {
return &buildConfigInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ImageStreams() ImageStreamInformer {
return &imageStreamInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) SecurityContextConstraints() SecurityContextConstraintsInformer {
return &securityContextConstraintsInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ClusterResourceQuotas() ClusterResourceQuotaInformer {
return &clusterResourceQuotaInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) KubernetesInformers() informers.SharedInformerFactory {
return f.kubeInformers
}
// TODO switch to upstream generated informers once kube 1.6 is in and remove these.
func (f *sharedInformerFactory) ReplicationControllers() ReplicationControllerInformer {
return &replicationControllerInformer{sharedInformerFactory: f}
}