forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
imagestream_controller.go
276 lines (241 loc) · 9.03 KB
/
imagestream_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package controller
import (
"errors"
"fmt"
"time"
"github.com/golang/glog"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
kapi "k8s.io/kubernetes/pkg/apis/core"
kcontroller "k8s.io/kubernetes/pkg/controller"
imageapi "github.com/openshift/origin/pkg/image/apis/image"
imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion"
imageinformer "github.com/openshift/origin/pkg/image/generated/listers/image/internalversion"
)
var ErrNotImportable = errors.New("requested image cannot be imported")
// Notifier provides information about when the controller makes a decision
type Notifier interface {
// Importing is invoked when the controller is going to import an image stream
Importing(stream *imageapi.ImageStream)
}
type ImageStreamController struct {
// image stream client
client imageclient.ImageInterface
// queue contains replication controllers that need to be synced.
queue workqueue.RateLimitingInterface
syncHandler func(isKey string) error
// lister can list/get image streams from a shared informer's cache
lister imageinformer.ImageStreamLister
// listerSynced makes sure the is store is synced before reconciling streams
listerSynced cache.InformerSynced
// notifier informs other controllers that an import is being performed
notifier Notifier
}
func (c *ImageStreamController) SetNotifier(n Notifier) {
c.notifier = n
}
// Run begins watching and syncing.
func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
glog.Infof("Starting image stream controller")
// Wait for the stream store to sync before starting any work in this controller.
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down image stream controller")
}
func (c *ImageStreamController) addImageStream(obj interface{}) {
if stream, ok := obj.(*imageapi.ImageStream); ok {
c.enqueueImageStream(stream)
}
}
func (c *ImageStreamController) updateImageStream(old, cur interface{}) {
curStream, ok := cur.(*imageapi.ImageStream)
if !ok {
return
}
oldStream, ok := old.(*imageapi.ImageStream)
if !ok {
return
}
// we only compare resource version, since deeper inspection if a stream
// needs to be re-imported happens in syncImageStream
//
// FIXME: this will only be ever true on cache resync
if curStream.ResourceVersion == oldStream.ResourceVersion {
return
}
c.enqueueImageStream(curStream)
}
func (c *ImageStreamController) enqueueImageStream(stream *imageapi.ImageStream) {
key, err := kcontroller.KeyFunc(stream)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for image stream %#v: %v", stream, err))
return
}
c.queue.Add(key)
}
func (c *ImageStreamController) worker() {
for c.processNextWorkItem() {
}
}
func (c *ImageStreamController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(key.(string))
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Error syncing image stream %q: %v", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *ImageStreamController) syncImageStream(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing image stream %q (%v)", key, time.Since(startTime))
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
stream, err := c.lister.ImageStreams(namespace).Get(name)
if apierrs.IsNotFound(err) {
glog.V(4).Infof("ImageStream has been deleted: %v", key)
return nil
}
if err != nil {
return err
}
glog.V(3).Infof("Queued import of stream %s/%s...", stream.Namespace, stream.Name)
return handleImageStream(stream, c.client, c.notifier)
}
// tagImportable is true if the given TagReference is importable by this controller
func tagImportable(tagRef imageapi.TagReference) bool {
return !(tagRef.From == nil || tagRef.From.Kind != "DockerImage" || tagRef.Reference)
}
// tagNeedsImport is true if the observed tag generation for this tag is older than the
// specified tag generation (if no tag generation is specified, the controller does not
// need to import this tag).
func tagNeedsImport(stream *imageapi.ImageStream, tag string, tagRef imageapi.TagReference, importWhenGenerationNil bool) bool {
if !tagImportable(tagRef) {
return false
}
if tagRef.Generation == nil {
return importWhenGenerationNil
}
return *tagRef.Generation > imageapi.LatestObservedTagGeneration(stream, tag)
}
// needsImport returns true if the provided image stream should have tags imported. Partial is returned
// as true if the spec.dockerImageRepository does not need to be refreshed (if only tags have to be imported).
func needsImport(stream *imageapi.ImageStream) (ok bool, partial bool) {
if stream.Annotations == nil || len(stream.Annotations[imageapi.DockerImageRepositoryCheckAnnotation]) == 0 {
if len(stream.Spec.DockerImageRepository) > 0 {
return true, false
}
// for backwards compatibility, if any of our tags are importable, trigger a partial import when the
// annotation is cleared.
for _, tagRef := range stream.Spec.Tags {
if tagImportable(tagRef) {
return true, true
}
}
}
// find any tags with a newer generation than their status
for tag, tagRef := range stream.Spec.Tags {
if tagNeedsImport(stream, tag, tagRef, false) {
return true, true
}
}
return false, false
}
// Processes the given image stream, looking for streams that have DockerImageRepository
// set but have not yet been marked as "ready". If transient errors occur, err is returned but
// the image stream is not modified (so it will be tried again later). If a permanent
// failure occurs the image is marked with an annotation and conditions are set on the status
// tags. The tags of the original spec image are left as is (those are updated through status).
//
// There are 3 scenarios:
//
// 1. spec.DockerImageRepository defined without any tags results in all tags being imported
// from upstream image repository
//
// 2. spec.DockerImageRepository + tags defined - import all tags from upstream image repository,
// and all the specified which (if name matches) will overwrite the default ones.
// Additionally:
// for kind == DockerImage import or reference underlying image, exact tag (not provided means latest),
// for kind != DockerImage reference tag from the same or other ImageStream
//
// 3. spec.DockerImageRepository not defined - import tags per each definition.
//
// Notifier, if passed, will be invoked if the stream is going to be imported.
func handleImageStream(stream *imageapi.ImageStream, client imageclient.ImageInterface, notifier Notifier) error {
ok, partial := needsImport(stream)
if !ok {
return nil
}
glog.V(3).Infof("Importing stream %s/%s partial=%t...", stream.Namespace, stream.Name, partial)
if notifier != nil {
notifier.Importing(stream)
}
isi := &imageapi.ImageStreamImport{
ObjectMeta: metav1.ObjectMeta{
Name: stream.Name,
Namespace: stream.Namespace,
ResourceVersion: stream.ResourceVersion,
UID: stream.UID,
},
Spec: imageapi.ImageStreamImportSpec{Import: true},
}
for tag, tagRef := range stream.Spec.Tags {
if tagImportable(tagRef) &&
(tagNeedsImport(stream, tag, tagRef, true) || !partial) {
isi.Spec.Images = append(isi.Spec.Images, imageapi.ImageImportSpec{
From: kapi.ObjectReference{Kind: "DockerImage", Name: tagRef.From.Name},
To: &kapi.LocalObjectReference{Name: tag},
ImportPolicy: tagRef.ImportPolicy,
ReferencePolicy: tagRef.ReferencePolicy,
})
}
}
if repo := stream.Spec.DockerImageRepository; !partial && len(repo) > 0 {
insecure := stream.Annotations[imageapi.InsecureRepositoryAnnotation] == "true"
isi.Spec.Repository = &imageapi.RepositoryImportSpec{
From: kapi.ObjectReference{Kind: "DockerImage", Name: repo},
ImportPolicy: imageapi.TagImportPolicy{Insecure: insecure},
}
}
result, err := client.ImageStreamImports(stream.Namespace).Create(isi)
if err != nil {
if apierrs.IsNotFound(err) && isStatusErrorKind(err, "imageStream") {
return ErrNotImportable
}
glog.V(4).Infof("Import stream %s/%s partial=%t error: %v", stream.Namespace, stream.Name, partial, err)
} else {
glog.V(5).Infof("Import stream %s/%s partial=%t import: %#v", stream.Namespace, stream.Name, partial, result.Status.Import)
}
return err
}
// isStatusErrorKind returns true if this error describes the provided kind.
func isStatusErrorKind(err error, kind string) bool {
if s, ok := err.(apierrs.APIStatus); ok {
if details := s.Status().Details; details != nil {
return kind == details.Kind
}
}
return false
}