forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
rest.go
541 lines (486 loc) · 19.4 KB
/
rest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
package imagestreamimport
import (
"fmt"
"net/http"
"time"
"github.com/golang/glog"
gocontext "golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
kapierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
authorizationapi "k8s.io/kubernetes/pkg/apis/authorization"
kapi "k8s.io/kubernetes/pkg/apis/core"
kapihelper "k8s.io/kubernetes/pkg/apis/core/helper"
kapiv1 "k8s.io/kubernetes/pkg/apis/core/v1"
authorizationclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/authorization/internalversion"
imageapiv1 "github.com/openshift/api/image/v1"
authorizationutil "github.com/openshift/origin/pkg/authorization/util"
serverapi "github.com/openshift/origin/pkg/cmd/server/api"
imageapi "github.com/openshift/origin/pkg/image/apis/image"
imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion"
"github.com/openshift/origin/pkg/image/importer"
"github.com/openshift/origin/pkg/image/importer/dockerv1client"
"github.com/openshift/origin/pkg/image/registry/imagestream"
"github.com/openshift/origin/pkg/image/registryclient"
"github.com/openshift/origin/pkg/image/util"
quotautil "github.com/openshift/origin/pkg/quota/util"
)
// ImporterFunc returns an instance of the importer that should be used per invocation.
type ImporterFunc func(r importer.RepositoryRetriever) importer.Interface
// ImporterDockerRegistryFunc returns an instance of a docker client that should be used per invocation of import,
// may be nil if no legacy import capability is required.
type ImporterDockerRegistryFunc func() dockerv1client.Client
// REST implements the RESTStorage interface for ImageStreamImport
type REST struct {
importFn ImporterFunc
streams imagestream.Registry
internalStreams rest.CreaterUpdater
images rest.Creater
isClient imageclient.ImageStreamsGetter
transport http.RoundTripper
insecureTransport http.RoundTripper
clientFn ImporterDockerRegistryFunc
strategy *strategy
sarClient authorizationclient.SubjectAccessReviewInterface
}
var _ rest.Creater = &REST{}
// NewREST returns a REST storage implementation that handles importing images. The clientFn argument is optional
// if v1 Docker Registry importing is not required. Insecure transport is optional, and both transports should not
// include client certs unless you wish to allow the entire cluster to import using those certs.
func NewREST(importFn ImporterFunc, streams imagestream.Registry, internalStreams rest.CreaterUpdater,
images rest.Creater, isClient imageclient.ImageStreamsGetter,
transport, insecureTransport http.RoundTripper,
clientFn ImporterDockerRegistryFunc,
allowedImportRegistries *serverapi.AllowedRegistries,
registryFn imageapi.RegistryHostnameRetriever,
sarClient authorizationclient.SubjectAccessReviewInterface,
) *REST {
return &REST{
importFn: importFn,
streams: streams,
internalStreams: internalStreams,
images: images,
isClient: isClient,
transport: transport,
insecureTransport: insecureTransport,
clientFn: clientFn,
strategy: NewStrategy(allowedImportRegistries, registryFn),
sarClient: sarClient,
}
}
// New is only implemented to make REST implement RESTStorage
func (r *REST) New() runtime.Object {
return &imageapi.ImageStreamImport{}
}
func (r *REST) Create(ctx apirequest.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, _ bool) (runtime.Object, error) {
isi, ok := obj.(*imageapi.ImageStreamImport)
if !ok {
return nil, kapierrors.NewBadRequest(fmt.Sprintf("obj is not an ImageStreamImport: %#v", obj))
}
inputMeta := isi.ObjectMeta
if err := rest.BeforeCreate(r.strategy, ctx, obj); err != nil {
return nil, err
}
if err := createValidation(obj.DeepCopyObject()); err != nil {
return nil, err
}
// Check if the user is allowed to create Images or ImageStreamMappings.
// In case the user is allowed to create them, do not validate the ImageStreamImport
// registry location against the registry whitelist, but instead allow to create any
// image from any registry.
user, ok := apirequest.UserFrom(ctx)
if !ok {
return nil, kapierrors.NewBadRequest("unable to get user from context")
}
createImageSAR := authorizationutil.AddUserToSAR(user, &authorizationapi.SubjectAccessReview{
Spec: authorizationapi.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationapi.ResourceAttributes{
Verb: "create",
Group: imageapi.GroupName,
Resource: "images",
},
},
})
isCreateImage, err := r.sarClient.Create(createImageSAR)
if err != nil {
return nil, err
}
createImageStreamMappingSAR := authorizationutil.AddUserToSAR(user, &authorizationapi.SubjectAccessReview{
Spec: authorizationapi.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationapi.ResourceAttributes{
Verb: "create",
Group: imageapi.GroupName,
Resource: "imagestreammapping",
},
},
})
isCreateImageStreamMapping, err := r.sarClient.Create(createImageStreamMappingSAR)
if err != nil {
return nil, err
}
if !isCreateImage.Status.Allowed && !isCreateImageStreamMapping.Status.Allowed {
if errs := r.strategy.ValidateAllowedRegistries(isi); len(errs) != 0 {
return nil, kapierrors.NewInvalid(imageapi.Kind("ImageStreamImport"), isi.Name, errs)
}
}
namespace, ok := apirequest.NamespaceFrom(ctx)
if !ok {
return nil, kapierrors.NewBadRequest("a namespace must be specified to import images")
}
if r.clientFn != nil {
if client := r.clientFn(); client != nil {
ctx = apirequest.WithValue(ctx, importer.ContextKeyV1RegistryClient, client)
}
}
create := false
stream, err := r.streams.GetImageStream(ctx, isi.Name, &metav1.GetOptions{})
if err != nil {
if !kapierrors.IsNotFound(err) {
return nil, err
}
// consistency check, stream must exist
if len(inputMeta.ResourceVersion) > 0 || len(inputMeta.UID) > 0 {
return nil, err
}
create = true
stream = &imageapi.ImageStream{
ObjectMeta: metav1.ObjectMeta{
Name: isi.Name,
Namespace: namespace,
Generation: 0,
},
}
} else {
if len(inputMeta.ResourceVersion) > 0 && inputMeta.ResourceVersion != stream.ResourceVersion {
glog.V(4).Infof("DEBUG: mismatch between requested ResourceVersion %s and located ResourceVersion %s", inputMeta.ResourceVersion, stream.ResourceVersion)
return nil, kapierrors.NewConflict(imageapi.Resource("imagestream"), inputMeta.Name, fmt.Errorf("the image stream was updated from %q to %q", inputMeta.ResourceVersion, stream.ResourceVersion))
}
if len(inputMeta.UID) > 0 && inputMeta.UID != stream.UID {
glog.V(4).Infof("DEBUG: mismatch between requested UID %s and located UID %s", inputMeta.UID, stream.UID)
return nil, kapierrors.NewNotFound(imageapi.Resource("imagestream"), inputMeta.Name)
}
}
// only load secrets if we need them
credentials := importer.NewLazyCredentialsForSecrets(func() ([]corev1.Secret, error) {
secrets, err := r.isClient.ImageStreams(namespace).Secrets(isi.Name, metav1.ListOptions{})
if err != nil {
return nil, err
}
secretsv1 := make([]corev1.Secret, len(secrets.Items))
for i, secret := range secrets.Items {
err := kapiv1.Convert_core_Secret_To_v1_Secret(&secret, &secretsv1[i], nil)
if err != nil {
utilruntime.HandleError(err)
continue
}
}
return secretsv1, nil
})
importCtx := registryclient.NewContext(r.transport, r.insecureTransport).WithCredentials(credentials)
imports := r.importFn(importCtx)
if err := imports.Import(ctx.(gocontext.Context), isi, stream); err != nil {
return nil, kapierrors.NewInternalError(err)
}
// if we encountered an error loading credentials and any images could not be retrieved with an access
// related error, modify the message.
// TODO: set a status cause
if err := credentials.Err(); err != nil {
for i, image := range isi.Status.Images {
switch image.Status.Reason {
case metav1.StatusReasonUnauthorized, metav1.StatusReasonForbidden:
isi.Status.Images[i].Status.Message = fmt.Sprintf("Unable to load secrets for this image: %v; (%s)", err, image.Status.Message)
}
}
if r := isi.Status.Repository; r != nil {
switch r.Status.Reason {
case metav1.StatusReasonUnauthorized, metav1.StatusReasonForbidden:
r.Status.Message = fmt.Sprintf("Unable to load secrets for this repository: %v; (%s)", err, r.Status.Message)
}
}
}
// TODO: perform the transformation of the image stream and return it with the ISI if import is false
// so that clients can see what the resulting object would look like.
if !isi.Spec.Import {
clearManifests(isi)
return isi, nil
}
if stream.Annotations == nil {
stream.Annotations = make(map[string]string)
}
now := metav1.Now()
_, hasAnnotation := stream.Annotations[imageapi.DockerImageRepositoryCheckAnnotation]
nextGeneration := stream.Generation + 1
original := stream.DeepCopy()
// walk the retrieved images, ensuring each one exists in etcd
importedImages := make(map[string]error)
updatedImages := make(map[string]*imageapi.Image)
if spec := isi.Spec.Repository; spec != nil {
for i, status := range isi.Status.Repository.Images {
if checkImportFailure(status, stream, status.Tag, nextGeneration, now) {
continue
}
image := status.Image
ref, err := imageapi.ParseDockerImageReference(image.DockerImageReference)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse image reference during import: %v", err))
continue
}
from, err := imageapi.ParseDockerImageReference(spec.From.Name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse from reference during import: %v", err))
continue
}
tag := ref.Tag
if len(status.Tag) > 0 {
tag = status.Tag
}
// we've imported a set of tags, ensure spec tag will point to this for later imports
from.ID, from.Tag = "", tag
if updated, ok := r.importSuccessful(ctx, image, stream, tag, from.Exact(), nextGeneration,
now, spec.ImportPolicy, spec.ReferencePolicy, importedImages, updatedImages); ok {
isi.Status.Repository.Images[i].Image = updated
}
}
}
for i, spec := range isi.Spec.Images {
if spec.To == nil {
continue
}
tag := spec.To.Name
// record a failure condition
status := isi.Status.Images[i]
if checkImportFailure(status, stream, tag, nextGeneration, now) {
// ensure that we have a spec tag set
ensureSpecTag(stream, tag, spec.From.Name, spec.ImportPolicy, spec.ReferencePolicy, false)
continue
}
// record success
image := status.Image
if updated, ok := r.importSuccessful(ctx, image, stream, tag, spec.From.Name, nextGeneration,
now, spec.ImportPolicy, spec.ReferencePolicy, importedImages, updatedImages); ok {
isi.Status.Images[i].Image = updated
}
}
// TODO: should we allow partial failure?
for _, err := range importedImages {
if err != nil {
return nil, err
}
}
clearManifests(isi)
// ensure defaulting is applied by round trip converting
// TODO: convert to using versioned types.
external, err := legacyscheme.Scheme.ConvertToVersion(stream, imageapiv1.SchemeGroupVersion)
if err != nil {
return nil, err
}
legacyscheme.Scheme.Default(external)
internal, err := legacyscheme.Scheme.ConvertToVersion(external, imageapi.SchemeGroupVersion)
if err != nil {
return nil, err
}
stream = internal.(*imageapi.ImageStream)
// if and only if we have changes between the original and the imported stream, trigger
// an import
hasChanges := !kapihelper.Semantic.DeepEqual(original, stream)
if create {
stream.Annotations[imageapi.DockerImageRepositoryCheckAnnotation] = now.UTC().Format(time.RFC3339)
glog.V(4).Infof("create new stream: %#v", stream)
obj, err = r.internalStreams.Create(ctx, stream, rest.ValidateAllObjectFunc, false)
} else {
if hasAnnotation && !hasChanges {
glog.V(4).Infof("stream did not change: %#v", stream)
obj, err = original, nil
} else {
if glog.V(4) {
glog.V(4).Infof("updating stream %s", diff.ObjectDiff(original, stream))
}
stream.Annotations[imageapi.DockerImageRepositoryCheckAnnotation] = now.UTC().Format(time.RFC3339)
obj, _, err = r.internalStreams.Update(ctx, stream.Name, rest.DefaultUpdatedObjectInfo(stream), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc)
}
}
if err != nil {
// if we have am admission limit error then record the conditions on the original stream. Quota errors
// will be recorded by the importer.
if quotautil.IsErrorLimitExceeded(err) {
originalStream := original
recordLimitExceededStatus(originalStream, stream, err, now, nextGeneration)
var limitErr error
obj, _, limitErr = r.internalStreams.Update(ctx, stream.Name, rest.DefaultUpdatedObjectInfo(originalStream), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc)
if limitErr != nil {
utilruntime.HandleError(fmt.Errorf("failed to record limit exceeded status in image stream %s/%s: %v", stream.Namespace, stream.Name, limitErr))
}
}
return nil, err
}
isi.Status.Import = obj.(*imageapi.ImageStream)
return isi, nil
}
// recordLimitExceededStatus adds the limit err to any new tag.
func recordLimitExceededStatus(originalStream *imageapi.ImageStream, newStream *imageapi.ImageStream, err error, now metav1.Time, nextGeneration int64) {
for tag := range newStream.Status.Tags {
if _, ok := originalStream.Status.Tags[tag]; !ok {
imageapi.SetTagConditions(originalStream, tag, newImportFailedCondition(err, nextGeneration, now))
}
}
}
func checkImportFailure(status imageapi.ImageImportStatus, stream *imageapi.ImageStream, tag string, nextGeneration int64, now metav1.Time) bool {
if status.Image != nil && status.Status.Status == metav1.StatusSuccess {
return false
}
message := status.Status.Message
if len(message) == 0 {
message = "unknown error prevented import"
}
condition := imageapi.TagEventCondition{
Type: imageapi.ImportSuccess,
Status: kapi.ConditionFalse,
Message: message,
Reason: string(status.Status.Reason),
Generation: nextGeneration,
LastTransitionTime: now,
}
if tag == "" {
if len(status.Tag) > 0 {
tag = status.Tag
} else if status.Image != nil {
if ref, err := imageapi.ParseDockerImageReference(status.Image.DockerImageReference); err == nil {
tag = ref.Tag
}
}
}
if !imageapi.HasTagCondition(stream, tag, condition) {
imageapi.SetTagConditions(stream, tag, condition)
if tagRef, ok := stream.Spec.Tags[tag]; ok {
zero := int64(0)
tagRef.Generation = &zero
stream.Spec.Tags[tag] = tagRef
}
}
return true
}
// ensureSpecTag guarantees that the spec tag is set with the provided from, importPolicy and referencePolicy.
// If reset is passed, the tag will be overwritten.
func ensureSpecTag(stream *imageapi.ImageStream, tag, from string, importPolicy imageapi.TagImportPolicy,
referencePolicy imageapi.TagReferencePolicy, reset bool) imageapi.TagReference {
if stream.Spec.Tags == nil {
stream.Spec.Tags = make(map[string]imageapi.TagReference)
}
specTag, ok := stream.Spec.Tags[tag]
if ok && !reset {
return specTag
}
specTag.From = &kapi.ObjectReference{
Kind: "DockerImage",
Name: from,
}
zero := int64(0)
specTag.Generation = &zero
specTag.ImportPolicy = importPolicy
specTag.ReferencePolicy = referencePolicy
stream.Spec.Tags[tag] = specTag
return specTag
}
// importSuccessful records a successful import into an image stream, setting the spec tag, status tag or conditions, and ensuring
// the image is created in etcd. Images are cached so they are not created multiple times in a row (when multiple tags point to the
// same image), and a failure to persist the image will be summarized before we update the stream. If an image was imported by this
// operation, it *replaces* the imported image (from the remote repository) with the updated image.
func (r *REST) importSuccessful(
ctx apirequest.Context,
image *imageapi.Image, stream *imageapi.ImageStream, tag string, from string, nextGeneration int64, now metav1.Time,
importPolicy imageapi.TagImportPolicy, referencePolicy imageapi.TagReferencePolicy,
importedImages map[string]error, updatedImages map[string]*imageapi.Image,
) (*imageapi.Image, bool) {
r.strategy.PrepareImageForCreate(image)
pullSpec, _ := imageapi.MostAccuratePullSpec(image.DockerImageReference, image.Name, "")
tagEvent := imageapi.TagEvent{
Created: now,
DockerImageReference: pullSpec,
Image: image.Name,
Generation: nextGeneration,
}
if stream.Spec.Tags == nil {
stream.Spec.Tags = make(map[string]imageapi.TagReference)
}
// ensure the spec and status tag match the imported image
changed := imageapi.DifferentTagEvent(stream, tag, tagEvent) || imageapi.DifferentTagGeneration(stream, tag)
specTag, ok := stream.Spec.Tags[tag]
if changed || !ok {
specTag = ensureSpecTag(stream, tag, from, importPolicy, referencePolicy, true)
imageapi.AddTagEventToImageStream(stream, tag, tagEvent)
}
// always reset the import policy
specTag.ImportPolicy = importPolicy
stream.Spec.Tags[tag] = specTag
// import or reuse the image, and ensure tag conditions are set
importErr, alreadyImported := importedImages[image.Name]
if importErr != nil {
imageapi.SetTagConditions(stream, tag, newImportFailedCondition(importErr, nextGeneration, now))
} else {
imageapi.SetTagConditions(stream, tag)
}
// create the image if it does not exist, otherwise cache the updated status from the store for use by other tags
if alreadyImported {
if updatedImage, ok := updatedImages[image.Name]; ok {
return updatedImage, true
}
return nil, false
}
updated, err := r.images.Create(ctx, image, rest.ValidateAllObjectFunc, false)
switch {
case kapierrors.IsAlreadyExists(err):
if err := util.ImageWithMetadata(image); err != nil {
glog.V(4).Infof("Unable to update image metadata during image import when image already exists %q: %v", image.Name, err)
}
updated = image
fallthrough
case err == nil:
updatedImage := updated.(*imageapi.Image)
updatedImages[image.Name] = updatedImage
//isi.Status.Repository.Images[i].Image = updatedImage
importedImages[image.Name] = nil
return updatedImage, true
default:
importedImages[image.Name] = err
}
return nil, false
}
// clearManifests unsets the manifest for each object that does not request it
func clearManifests(isi *imageapi.ImageStreamImport) {
for i := range isi.Status.Images {
if !isi.Spec.Images[i].IncludeManifest {
if isi.Status.Images[i].Image != nil {
isi.Status.Images[i].Image.DockerImageManifest = ""
isi.Status.Images[i].Image.DockerImageConfig = ""
}
}
}
if isi.Spec.Repository != nil && !isi.Spec.Repository.IncludeManifest {
for i := range isi.Status.Repository.Images {
if isi.Status.Repository.Images[i].Image != nil {
isi.Status.Repository.Images[i].Image.DockerImageManifest = ""
isi.Status.Repository.Images[i].Image.DockerImageConfig = ""
}
}
}
}
func newImportFailedCondition(err error, gen int64, now metav1.Time) imageapi.TagEventCondition {
c := imageapi.TagEventCondition{
Type: imageapi.ImportSuccess,
Status: kapi.ConditionFalse,
Message: err.Error(),
Generation: gen,
LastTransitionTime: now,
}
if status, ok := err.(kapierrors.APIStatus); ok {
s := status.Status()
c.Reason, c.Message = string(s.Reason), s.Message
}
return c
}