forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.go
196 lines (174 loc) · 6.58 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package controller
import (
"errors"
"github.com/golang/glog"
kapi "k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"github.com/openshift/origin/pkg/client"
"github.com/openshift/origin/pkg/image/api"
)
var ErrNotImportable = errors.New("the specified stream cannot be imported")
type ImportController struct {
streams client.ImageStreamsNamespacer
}
// Notifier provides information about when the controller makes a decision
type Notifier interface {
// Importing is invoked when the controller is going to import an image stream
Importing(stream *api.ImageStream)
}
// NotifierFunc implements Notifier
type NotifierFunc func(stream *api.ImageStream)
// Importing adapts NotifierFunc to Notifier
func (fn NotifierFunc) Importing(stream *api.ImageStream) {
fn(stream)
}
// tagImportable is true if the given TagReference is importable by this controller
func tagImportable(tagRef api.TagReference) bool {
if tagRef.From == nil {
return false
}
if tagRef.From.Kind != "DockerImage" || tagRef.Reference {
return false
}
return true
}
// tagNeedsImport is true if the observed tag generation for this tag is older than the
// specified tag generation (if no tag generation is specified, the controller does not
// need to import this tag).
func tagNeedsImport(stream *api.ImageStream, tag string, tagRef api.TagReference, importWhenGenerationNil bool) bool {
if !tagImportable(tagRef) {
return false
}
if tagRef.Generation == nil {
return importWhenGenerationNil
}
return *tagRef.Generation > api.LatestObservedTagGeneration(stream, tag)
}
// needsImport returns true if the provided image stream should have tags imported. Partial is returned
// as true if the spec.dockerImageRepository does not need to be refreshed (if only tags have to be imported).
func needsImport(stream *api.ImageStream) (ok bool, partial bool) {
if stream.Annotations == nil || len(stream.Annotations[api.DockerImageRepositoryCheckAnnotation]) == 0 {
if len(stream.Spec.DockerImageRepository) > 0 {
return true, false
}
// for backwards compatibility, if any of our tags are importable, trigger a partial import when the
// annotation is cleared.
for _, tagRef := range stream.Spec.Tags {
if tagImportable(tagRef) {
return true, true
}
}
}
// find any tags with a newer generation than their status
for tag, tagRef := range stream.Spec.Tags {
if tagNeedsImport(stream, tag, tagRef, false) {
return true, true
}
}
return false, false
}
// needsScheduling returns true if this image stream has any scheduled tags
func needsScheduling(stream *api.ImageStream) bool {
for _, tagRef := range stream.Spec.Tags {
if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled {
return true
}
}
return false
}
// resetScheduledTags artificially increments the generation on the tags that should be imported.
func resetScheduledTags(stream *api.ImageStream) {
next := stream.Generation + 1
for tag, tagRef := range stream.Spec.Tags {
if tagImportable(tagRef) && tagRef.ImportPolicy.Scheduled {
tagRef.Generation = &next
stream.Spec.Tags[tag] = tagRef
}
}
}
// retryCount is the number of times to retry on a conflict when updating an image stream
const retryCount = 2
// Next processes the given image stream, looking for streams that have DockerImageRepository
// set but have not yet been marked as "ready". If transient errors occur, err is returned but
// the image stream is not modified (so it will be tried again later). If a permanent
// failure occurs the image is marked with an annotation and conditions are set on the status
// tags. The tags of the original spec image are left as is (those are updated through status).
//
// There are 3 scenarios:
//
// 1. spec.DockerImageRepository defined without any tags results in all tags being imported
// from upstream image repository
//
// 2. spec.DockerImageRepository + tags defined - import all tags from upstream image repository,
// and all the specified which (if name matches) will overwrite the default ones.
// Additionally:
// for kind == DockerImage import or reference underlying image, exact tag (not provided means latest),
// for kind != DockerImage reference tag from the same or other ImageStream
//
// 3. spec.DockerImageRepository not defined - import tags per each definition.
//
// Notifier, if passed, will be invoked if the stream is going to be imported.
func (c *ImportController) Next(stream *api.ImageStream, notifier Notifier) error {
ok, partial := needsImport(stream)
if !ok {
return nil
}
glog.V(3).Infof("Importing stream %s/%s partial=%t...", stream.Namespace, stream.Name, partial)
if notifier != nil {
notifier.Importing(stream)
}
isi := &api.ImageStreamImport{
ObjectMeta: kapi.ObjectMeta{
Name: stream.Name,
Namespace: stream.Namespace,
ResourceVersion: stream.ResourceVersion,
UID: stream.UID,
},
Spec: api.ImageStreamImportSpec{Import: true},
}
for tag, tagRef := range stream.Spec.Tags {
if !(partial && tagImportable(tagRef)) && !tagNeedsImport(stream, tag, tagRef, true) {
continue
}
isi.Spec.Images = append(isi.Spec.Images, api.ImageImportSpec{
From: kapi.ObjectReference{Kind: "DockerImage", Name: tagRef.From.Name},
To: &kapi.LocalObjectReference{Name: tag},
ImportPolicy: tagRef.ImportPolicy,
})
}
if repo := stream.Spec.DockerImageRepository; !partial && len(repo) > 0 {
insecure := stream.Annotations[api.InsecureRepositoryAnnotation] == "true"
isi.Spec.Repository = &api.RepositoryImportSpec{
From: kapi.ObjectReference{Kind: "DockerImage", Name: repo},
ImportPolicy: api.TagImportPolicy{Insecure: insecure},
}
}
result, err := c.streams.ImageStreams(stream.Namespace).Import(isi)
if err != nil {
if apierrs.IsNotFound(err) && client.IsStatusErrorKind(err, "imageStream") {
return ErrNotImportable
}
glog.V(4).Infof("Import stream %s/%s partial=%t error: %v", stream.Namespace, stream.Name, partial, err)
} else {
glog.V(5).Infof("Import stream %s/%s partial=%t import: %#v", stream.Namespace, stream.Name, partial, result.Status.Import)
}
return err
}
func (c *ImportController) NextTimedByName(namespace, name string) error {
stream, err := c.streams.ImageStreams(namespace).Get(name)
if err != nil {
if apierrs.IsNotFound(err) {
return ErrNotImportable
}
return err
}
return c.NextTimed(stream)
}
func (c *ImportController) NextTimed(stream *api.ImageStream) error {
if !needsScheduling(stream) {
return ErrNotImportable
}
resetScheduledTags(stream)
glog.V(3).Infof("Scheduled import of stream %s/%s...", stream.Namespace, stream.Name)
return c.Next(stream, nil)
}