forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
rest.go
179 lines (155 loc) · 5.99 KB
/
rest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package imagestreammapping
import (
"context"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metainternal "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/rest"
kapihelper "k8s.io/kubernetes/pkg/apis/core/helper"
imagegroup "github.com/openshift/api/image"
imageapi "github.com/openshift/origin/pkg/image/apis/image"
"github.com/openshift/origin/pkg/image/apiserver/registry/image"
"github.com/openshift/origin/pkg/image/apiserver/registry/imagestream"
"github.com/openshift/origin/pkg/image/apiserver/registryhostname"
)
// maxRetriesOnConflict is the maximum retry count for Create calls which
// result in resource conflicts.
const maxRetriesOnConflict = 10
// REST implements the RESTStorage interface in terms of an image registry and
// image stream registry. It only supports the Create method and is used
// to simplify adding a new Image and tag to an ImageStream.
type REST struct {
imageRegistry image.Registry
imageStreamRegistry imagestream.Registry
strategy Strategy
}
var _ rest.Creater = &REST{}
var _ rest.Scoper = &REST{}
// NewREST returns a new REST.
func NewREST(imageRegistry image.Registry, imageStreamRegistry imagestream.Registry, registry registryhostname.RegistryHostnameRetriever) *REST {
return &REST{
imageRegistry: imageRegistry,
imageStreamRegistry: imageStreamRegistry,
strategy: NewStrategy(registry),
}
}
// New returns a new ImageStreamMapping for use with Create.
func (r *REST) New() runtime.Object {
return &imageapi.ImageStreamMapping{}
}
func (s *REST) NamespaceScoped() bool {
return true
}
// Create registers a new image (if it doesn't exist) and updates the
// specified ImageStream's tags. If attempts to update the ImageStream fail
// with a resource conflict, the update will be retried if the newer
// ImageStream has no tag diffs from the previous state. If tag diffs are
// detected, the conflict error is returned.
func (s *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, _ bool) (runtime.Object, error) {
if err := rest.BeforeCreate(s.strategy, ctx, obj); err != nil {
return nil, err
}
if err := createValidation(obj.DeepCopyObject()); err != nil {
return nil, err
}
mapping := obj.(*imageapi.ImageStreamMapping)
stream, err := s.findStreamForMapping(ctx, mapping)
if err != nil {
return nil, err
}
image := mapping.Image
tag := mapping.Tag
if len(tag) == 0 {
tag = imageapi.DefaultImageTag
}
imageCreateErr := s.imageRegistry.CreateImage(ctx, &image)
if imageCreateErr != nil && !errors.IsAlreadyExists(imageCreateErr) {
return nil, imageCreateErr
}
// prefer dockerImageReference set on image for the tagEvent if the image is new
ref := image.DockerImageReference
if errors.IsAlreadyExists(imageCreateErr) && image.Annotations[imageapi.ManagedByOpenShiftAnnotation] == "true" {
// the image is managed by us and, most probably, tagged in some other image stream
// let's make the reference local to this stream
if streamRef, err := imageapi.DockerImageReferenceForStream(stream); err == nil {
streamRef.ID = image.Name
ref = streamRef.Exact()
} else {
glog.V(4).Infof("Failed to get dockerImageReference for stream %s/%s: %v", stream.Namespace, stream.Name, err)
}
}
next := imageapi.TagEvent{
Created: metav1.Now(),
DockerImageReference: ref,
Image: image.Name,
}
err = wait.ExponentialBackoff(wait.Backoff{Steps: maxRetriesOnConflict}, func() (bool, error) {
lastEvent := imageapi.LatestTaggedImage(stream, tag)
next.Generation = stream.Generation
if !imageapi.AddTagEventToImageStream(stream, tag, next) {
// nothing actually changed
return true, nil
}
imageapi.UpdateTrackingTags(stream, tag, next)
_, err := s.imageStreamRegistry.UpdateImageStreamStatus(ctx, stream)
if err == nil {
return true, nil
}
if !errors.IsConflict(err) {
return false, err
}
// If the update conflicts, get the latest stream and check for tag
// updates. If the latest tag hasn't changed, retry.
latestStream, findLatestErr := s.findStreamForMapping(ctx, mapping)
if findLatestErr != nil {
return false, findLatestErr
}
// no previous tag
if lastEvent == nil {
// The tag hasn't changed, so try again with the updated stream.
stream = latestStream
return false, nil
}
// check for tag change
newerEvent := imageapi.LatestTaggedImage(latestStream, tag)
// generation and creation time differences are ignored
lastEvent.Generation = newerEvent.Generation
lastEvent.Created = newerEvent.Created
if kapihelper.Semantic.DeepEqual(lastEvent, newerEvent) {
// The tag hasn't changed, so try again with the updated stream.
stream = latestStream
return false, nil
}
// The tag changed, so return the conflict error back to the client.
return false, err
})
if err != nil {
return nil, err
}
return &metav1.Status{Status: metav1.StatusSuccess}, nil
}
// findStreamForMapping retrieves an ImageStream whose DockerImageRepository matches dockerRepo.
func (s *REST) findStreamForMapping(ctx context.Context, mapping *imageapi.ImageStreamMapping) (*imageapi.ImageStream, error) {
if len(mapping.Name) > 0 {
return s.imageStreamRegistry.GetImageStream(ctx, mapping.Name, &metav1.GetOptions{})
}
if len(mapping.DockerImageRepository) != 0 {
list, err := s.imageStreamRegistry.ListImageStreams(ctx, &metainternal.ListOptions{})
if err != nil {
return nil, err
}
for i := range list.Items {
if mapping.DockerImageRepository == list.Items[i].Spec.DockerImageRepository {
return &list.Items[i], nil
}
}
return nil, errors.NewInvalid(imagegroup.Kind("ImageStreamMapping"), "", field.ErrorList{
field.NotFound(field.NewPath("dockerImageStream"), mapping.DockerImageRepository),
})
}
return nil, errors.NewNotFound(imagegroup.Resource("imagestream"), "")
}