forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
docker_registry_service.go
410 lines (345 loc) · 14 KB
/
docker_registry_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package controllers
import (
"encoding/json"
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
kclientset "k8s.io/client-go/kubernetes"
listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/credentialprovider"
)
// DockerRegistryServiceControllerOptions contains options for the DockerRegistryServiceController
type DockerRegistryServiceControllerOptions struct {
// Resync is the time.Duration at which to fully re-list services.
// If zero, re-list will be delayed as long as possible
Resync time.Duration
// ClusterDNSSuffix is the suffix for in cluster DNS that can be added to service names
ClusterDNSSuffix string
DockercfgController *DockercfgController
// AdditionalRegistryURLs is a list of URLs that are always included
AdditionalRegistryURLs []string
// DockerURLsInitialized is used to send a signal to the DockercfgController that it has the correct set of docker urls
DockerURLsInitialized chan struct{}
}
type serviceLocation struct {
namespace string
name string
}
var serviceLocations = []serviceLocation{
{namespace: "default", name: "docker-registry"},
{namespace: "openshift-image-registry", name: "registry"},
}
// NewDockerRegistryServiceController returns a new *DockerRegistryServiceController.
func NewDockerRegistryServiceController(secrets informers.SecretInformer, serviceInformer informers.ServiceInformer, cl kclientset.Interface, options DockerRegistryServiceControllerOptions) *DockerRegistryServiceController {
e := &DockerRegistryServiceController{
client: cl,
additionalRegistryURLs: options.AdditionalRegistryURLs,
clusterDNSSuffix: options.ClusterDNSSuffix,
dockercfgController: options.DockercfgController,
registryLocationQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
secretsToUpdate: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
dockerURLsInitialized: options.DockerURLsInitialized,
}
// we're only watching two of these, but we already watch all services for the service serving cert signer
// and this correctly handles namespaces coming and going
serviceInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Service:
for _, location := range serviceLocations {
if t.Namespace == location.namespace && t.Name == location.name {
return true
}
}
}
return false
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e.enqueueRegistryLocationQueue()
},
UpdateFunc: func(old, cur interface{}) {
e.enqueueRegistryLocationQueue()
},
DeleteFunc: func(obj interface{}) {
e.enqueueRegistryLocationQueue()
},
}},
)
e.servicesSynced = serviceInformer.Informer().HasSynced
e.serviceLister = serviceInformer.Lister()
e.syncRegistryLocationHandler = e.syncRegistryLocationChange
e.secretCache = secrets.Informer().GetIndexer()
e.secretsSynced = secrets.Informer().GetController().HasSynced
e.syncSecretHandler = e.syncSecretUpdate
return e
}
// DockerRegistryServiceController manages ServiceToken secrets for Service objects
type DockerRegistryServiceController struct {
client kclientset.Interface
// clusterDNSSuffix is the suffix for in cluster DNS that can be added to service names
clusterDNSSuffix string
// additionalRegistryURLs is a list of URLs that are always included
additionalRegistryURLs []string
dockercfgController *DockercfgController
serviceLister listers.ServiceLister
servicesSynced func() bool
syncRegistryLocationHandler func() error
secretCache cache.Store
secretsSynced func() bool
syncSecretHandler func(key string) error
registryURLs sets.String
registryURLLock sync.RWMutex
registryLocationQueue workqueue.RateLimitingInterface
secretsToUpdate workqueue.RateLimitingInterface
dockerURLsInitialized chan struct{}
// initialSecretsCheckDone is used to indicate that the controller should perform a full resync of all secrets
// regardless of whether the registry location changed or not. This check is usually done on controller start
// to verify the content of dockercfg entries in secrets
initialSecretsCheckDone bool
}
// Runs controller loops and returns immediately
func (e *DockerRegistryServiceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.registryLocationQueue.ShutDown()
glog.Infof("Starting DockerRegistryServiceController controller")
defer glog.Infof("Shutting down DockerRegistryServiceController controller")
// Wait for the store to sync before starting any work in this controller.
ready := make(chan struct{})
go e.waitForDockerURLs(ready, stopCh)
select {
case <-ready:
case <-stopCh:
return
}
glog.V(1).Infof("caches synced")
go wait.Until(e.watchForDockerURLChanges, time.Second, stopCh)
for i := 0; i < workers; i++ {
go wait.Until(e.watchForDockercfgSecretUpdates, time.Second, stopCh)
}
<-stopCh
}
// enqueue adds to our queue. We only have one entry, but we never have to check it since we already know the things
// we're watching for.
func (e *DockerRegistryServiceController) enqueueRegistryLocationQueue() {
e.registryLocationQueue.Add("check")
}
// waitForDockerURLs waits until all information required for fully determining the set of the internal docker registry
// hostnames and IPs are complete before continuing
// Once that work is done, the dockerconfig controller will be released to do work.
func (e *DockerRegistryServiceController) waitForDockerURLs(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Wait for the stores to fill
if !cache.WaitForCacheSync(stopCh, e.servicesSynced, e.secretsSynced) {
return
}
// after syncing, determine the current state and assume that we're up to date for it if you don't do this,
// you'll get an initial storm as you mess with all the dockercfg secrets every time you startup
urls := e.getDockerRegistryLocations()
e.setRegistryURLs(urls...)
e.dockercfgController.SetDockerURLs(urls...)
close(e.dockerURLsInitialized)
close(ready)
return
}
func (e *DockerRegistryServiceController) setRegistryURLs(registryURLs ...string) {
e.registryURLLock.Lock()
defer e.registryURLLock.Unlock()
e.registryURLs = sets.NewString(registryURLs...)
}
func (e *DockerRegistryServiceController) getRegistryURLs() sets.String {
e.registryURLLock.RLock()
defer e.registryURLLock.RUnlock()
// return a copy to avoid any concurrent modification issues
return sets.NewString(e.registryURLs.List()...)
}
// watchForDockerURLChanges runs a worker thread that just dequeues and processes items related to a docker URL change
func (e *DockerRegistryServiceController) watchForDockerURLChanges() {
workFn := func() bool {
key, quit := e.registryLocationQueue.Get()
if quit {
return true
}
defer e.registryLocationQueue.Done(key)
if err := e.syncRegistryLocationHandler(); err == nil {
// this means the request was successfully handled. We should "forget" the item so that any retry
// later on is reset
e.registryLocationQueue.Forget(key)
} else {
// if we had an error it means that we didn't handle it, which means that we want to requeue the work
utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
e.registryLocationQueue.AddRateLimited(key)
}
return false
}
for {
if workFn() {
return
}
}
}
// getDockerRegistryLocations returns the dns form and the ip form of the secret
func (e *DockerRegistryServiceController) getDockerRegistryLocations() []string {
ret := append([]string{}, e.additionalRegistryURLs...)
for _, location := range serviceLocations {
ret = append(ret, getDockerRegistryLocations(e.serviceLister, location, e.clusterDNSSuffix)...)
}
glog.V(4).Infof("found docker registry urls: %v", ret)
return ret
}
func getDockerRegistryLocations(lister listers.ServiceLister, location serviceLocation, clusterDNSSuffix string) []string {
service, err := lister.Services(location.namespace).Get(location.name)
if err != nil {
return []string{}
}
hasClusterIP := (len(service.Spec.ClusterIP) > 0) && (net.ParseIP(service.Spec.ClusterIP) != nil)
if hasClusterIP && len(service.Spec.Ports) > 0 {
ret := []string{
net.JoinHostPort(service.Spec.ClusterIP, fmt.Sprintf("%d", service.Spec.Ports[0].Port)),
net.JoinHostPort(fmt.Sprintf("%s.%s.svc", service.Name, service.Namespace), fmt.Sprintf("%d", service.Spec.Ports[0].Port)),
}
if len(clusterDNSSuffix) > 0 {
ret = append(ret, net.JoinHostPort(fmt.Sprintf("%s.%s.svc."+clusterDNSSuffix, service.Name, service.Namespace), fmt.Sprintf("%d", service.Spec.Ports[0].Port)))
}
return ret
}
return []string{}
}
// syncRegistryLocationChange goes through all service account dockercfg secrets and updates them to point at a new docker-registry location
func (e *DockerRegistryServiceController) syncRegistryLocationChange() error {
newLocations := e.getDockerRegistryLocations()
newDockerRegistryLocations := sets.NewString(newLocations...)
existingURLs := e.getRegistryURLs()
if existingURLs.Equal(newDockerRegistryLocations) && e.initialSecretsCheckDone {
glog.V(3).Infof("No effective update: %v", newDockerRegistryLocations)
return nil
}
glog.V(1).Infof("Updating registry URLs from %v to %v", existingURLs, newDockerRegistryLocations)
// make sure that new dockercfg secrets get the correct locations
e.dockercfgController.SetDockerURLs(newDockerRegistryLocations.List()...)
e.setRegistryURLs(newDockerRegistryLocations.List()...)
e.initialSecretsCheckDone = true
// we've changed the docker registry URL. Add items to the work queue for all known secrets
// new secrets will already get the updated value.
for _, obj := range e.secretCache.List() {
switch t := obj.(type) {
case *v1.Secret:
if t.Type != v1.SecretTypeDockercfg {
continue
}
if t.Annotations == nil {
continue
}
// Do not manage dockercfg secrets we haven't created (eg. secrets created by user for private repositories).
if _, hasTokenSecret := t.Annotations[ServiceAccountTokenSecretNameKey]; !hasTokenSecret {
continue
}
default:
utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj))
continue
}
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
continue
}
e.secretsToUpdate.Add(key)
}
return nil
}
// watchForDockercfgSecretUpdates watches the work queue for entries that indicate that it should modify dockercfg secrets with new
// docker registry URLs
func (e *DockerRegistryServiceController) watchForDockercfgSecretUpdates() {
workFn := func() bool {
key, quit := e.secretsToUpdate.Get()
if quit {
return true
}
defer e.secretsToUpdate.Done(key)
if err := e.syncSecretHandler(key.(string)); err == nil {
// this means the request was successfully handled. We should "forget" the item so that any retry
// later on is reset
e.secretsToUpdate.Forget(key)
} else {
// if we had an error it means that we didn't handle it, which means that we want to requeue the work
utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
e.secretsToUpdate.AddRateLimited(key)
}
return false
}
for {
if workFn() {
return
}
}
}
func (e *DockerRegistryServiceController) syncSecretUpdate(key string) error {
obj, exists, err := e.secretCache.GetByKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to retrieve secret %v from store: %v", key, err))
return err
}
if !exists {
return nil
}
dockerRegistryURLs := e.getRegistryURLs()
sharedDockercfgSecret := obj.(*v1.Secret)
dockercfg := &credentialprovider.DockerConfig{}
// an error here doesn't matter. If we can't deserialize this, we'll replace it with one that works.
json.Unmarshal(sharedDockercfgSecret.Data[v1.DockerConfigKey], dockercfg)
dockercfgMap := map[string]credentialprovider.DockerConfigEntry(*dockercfg)
existingDockercfgSecretLocations := sets.StringKeySet(dockercfgMap)
// if the existingDockercfgSecretLocations haven't changed, don't make an update and check the next one
if existingDockercfgSecretLocations.Equal(dockerRegistryURLs) {
return nil
}
// we need to update it, make a copy
dockercfgSecret := obj.(runtime.Object).DeepCopyObject().(*v1.Secret)
dockerCredentials := dockercfgSecret.Annotations[ServiceAccountTokenValueAnnotation]
if len(dockerCredentials) == 0 && len(existingDockercfgSecretLocations) > 0 {
dockerCredentials = dockercfgMap[existingDockercfgSecretLocations.List()[0]].Password
}
if len(dockerCredentials) == 0 {
tokenSecretKey := dockercfgSecret.Namespace + "/" + dockercfgSecret.Annotations[ServiceAccountTokenSecretNameKey]
tokenSecret, exists, err := e.secretCache.GetByKey(tokenSecretKey)
if !exists {
utilruntime.HandleError(fmt.Errorf("cannot determine SA token due to missing secret: %v", tokenSecretKey))
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot determine SA token: %v", err))
return nil
}
dockerCredentials = string(tokenSecret.(*v1.Secret).Data[v1.ServiceAccountTokenKey])
}
newDockercfgMap := credentialprovider.DockerConfig{}
for key := range dockerRegistryURLs {
newDockercfgMap[key] = credentialprovider.DockerConfigEntry{
Username: "serviceaccount",
Password: dockerCredentials,
Email: "serviceaccount@example.org",
}
}
dockercfgContent, err := json.Marshal(&newDockercfgMap)
if err != nil {
utilruntime.HandleError(err)
return nil
}
dockercfgSecret.Data[v1.DockerConfigKey] = dockercfgContent
if _, err := e.client.Core().Secrets(dockercfgSecret.Namespace).Update(dockercfgSecret); err != nil {
return err
}
return nil
}