/
image_pull.go
395 lines (367 loc) · 15.5 KB
/
image_pull.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
gocontext "context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/containerd/containerd/content"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
containerdrootfs "github.com/containerd/containerd/rootfs"
"github.com/golang/glog"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// For image management:
// 1) We have an in-memory metadata index to:
// a. Maintain ImageID -> RepoTags, ImageID -> RepoDigset relationships; ImageID
// is the digest of image config, which conforms to oci image spec.
// b. Cache constant and useful information such as image chainID, config etc.
// c. An image will be added into the in-memory metadata only when it's successfully
// pulled and unpacked.
//
// 2) We use containerd image metadata store and content store:
// a. To resolve image reference (digest/tag) locally. During pulling image, we
// normalize the image reference provided by user, and put it into image metadata
// store with resolved descriptor. For the other operations, if image id is provided,
// we'll access the in-memory metadata index directly; if image reference is
// provided, we'll normalize it, resolve it in containerd image metadata store
// to get the image id.
// b. As the backup of in-memory metadata in 1). During startup, the in-memory
// metadata could be re-constructed from image metadata store + content store.
//
// Several problems with current approach:
// 1) An entry in containerd image metadata store doesn't mean a "READY" (successfully
// pulled and unpacked) image. E.g. during pulling, the client gets killed. In that case,
// if we saw an image without snapshots or with in-complete contents during startup,
// should we re-pull the image? Or should we remove the entry?
//
// 2) Containerd suggests user to add entry before pulling the image. However if
// an error occurrs during the pulling, should we remove the entry from metadata
// store? Or should we leave it there until next startup (resource leakage)?
//
// 3) CRI-containerd only exposes "READY" (successfully pulled and unpacked) images
// to the user, which are maintained in the in-memory metadata index. However, it's
// still possible that someone else removes the content or snapshot by-pass cri-containerd,
// how do we detect that and update the in-memory metadata correspondingly? Always
// check whether corresponding snapshot is ready when reporting image status?
//
// 4) Is the content important if we cached necessary information in-memory
// after we pull the image? How to manage the disk usage of contents? If some
// contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config.
// TODO(mikebrow): harden api (including figuring out at what layer we should be blocking on duplicate requests.)
func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (retRes *runtime.PullImageResponse, retErr error) {
glog.V(2).Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth())
defer func() {
if retErr == nil {
glog.V(2).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), retRes.GetImageRef())
}
}()
image := r.GetImage().GetImage()
// TODO(random-liu): [P1] Schema 1 image is not supported in containerd now, we need to support
// it for backward compatiblity.
// TODO(mikebrow): add truncIndex for image id
imageID, repoTag, repoDigest, err := c.pullImage(ctx, image)
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %v", image, err)
}
glog.V(4).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q", image, imageID,
repoTag, repoDigest)
_, err = c.imageMetadataStore.Get(imageID)
if err != nil && !metadata.IsNotExistError(err) {
return nil, fmt.Errorf("failed to get image %q metadata: %v", imageID, err)
}
// There is a known race here because the image metadata could be created after `Get`.
// TODO(random-liu): [P1] Do not use metadata store. Use simple in-memory data structure to
// maintain the id -> information index. And use the container image store as backup and
// recover in-memory state during startup.
if err == nil {
// Update existing image metadata.
if err := c.imageMetadataStore.Update(imageID, func(m metadata.ImageMetadata) (metadata.ImageMetadata, error) {
updateImageMetadata(&m, repoTag, repoDigest)
return m, nil
}); err != nil {
return nil, fmt.Errorf("failed to update image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
}
// Get image information.
chainID, size, config, err := c.getImageInfo(ctx, image)
if err != nil {
return nil, fmt.Errorf("failed to get image %q information: %v", image, err)
}
// NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image metadata, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the
// image.
// Create corresponding image metadata.
newMeta := metadata.ImageMetadata{
ID: imageID,
ChainID: chainID.String(),
Size: size,
Config: config,
}
// Add the image reference used into repo tags. Note if the image is pulled with
// repo digest, it will also be added in to repo tags, which is fine.
updateImageMetadata(&newMeta, repoTag, repoDigest)
if err := c.imageMetadataStore.Create(newMeta); err != nil {
return nil, fmt.Errorf("failed to create image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
}
// resourceSet is the helper struct to help tracking all resources associated
// with an image.
type resourceSet struct {
sync.Mutex
resources map[string]struct{}
}
func newResourceSet() *resourceSet {
return &resourceSet{resources: make(map[string]struct{})}
}
func (r *resourceSet) add(resource string) {
r.Lock()
defer r.Unlock()
r.resources[resource] = struct{}{}
}
// all returns an array of all resources added.
func (r *resourceSet) all() map[string]struct{} {
r.Lock()
defer r.Unlock()
resources := make(map[string]struct{})
for resource := range r.resources {
resources[resource] = struct{}{}
}
return resources
}
// pullImage pulls image and returns image id (config digest), repoTag and repoDigest.
func (c *criContainerdService) pullImage(ctx context.Context, rawRef string) (
// TODO(random-liu): Replace with client.Pull.
string, string, string, error) {
namedRef, err := normalizeImageRef(rawRef)
if err != nil {
return "", "", "", fmt.Errorf("failed to parse image reference %q: %v", rawRef, err)
}
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
ref := namedRef.String()
if ref != rawRef {
glog.V(4).Infof("PullImage using normalized image ref: %q", ref)
}
// Resolve the image reference to get descriptor and fetcher.
resolver := docker.NewResolver(docker.ResolverOptions{
// TODO(random-liu): Add authentication by setting credentials.
// TODO(random-liu): Handle https.
PlainHTTP: true,
Client: http.DefaultClient,
})
_, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
}
fetcher, err := resolver.Fetcher(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to get fetcher for ref %q: %v", ref, err)
}
// Currently, the resolved image name is the same with ref in docker resolver,
// but they may be different in the future.
// TODO(random-liu): Always resolve image reference and use resolved image name in
// the system.
glog.V(4).Infof("Start downloading resources for image %q", ref)
resources := newResourceSet()
resourceTrackHandler := containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) (
[]imagespec.Descriptor, error) {
resources.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
})
// Fetch all image resources into content store.
// Dispatch a handler which will run a sequence of handlers to:
// 1) track all resources associated using a customized handler;
// 2) fetch the object using a FetchHandler;
// 3) recurse through any sub-layers via a ChildrenHandler.
// Support schema1 image.
var (
schema1Converter *schema1.Converter
handler containerdimages.Handler
)
if desc.MediaType == containerdimages.MediaTypeDockerSchema1Manifest {
schema1Converter = schema1.NewConverter(c.contentStoreService, fetcher)
handler = containerdimages.Handlers(
resourceTrackHandler,
schema1Converter,
)
} else {
handler = containerdimages.Handlers(
resourceTrackHandler,
remotes.FetchHandler(c.contentStoreService, fetcher),
containerdimages.ChildrenHandler(c.contentStoreService),
)
}
if err := containerdimages.Dispatch(ctx, handler, desc); err != nil {
// Dispatch returns error when requested resources are locked.
// In that case, we should start waiting and checking the pulling
// progress.
// TODO(random-liu): Check specific resource locked error type.
glog.V(5).Infof("Dispatch for %q returns error: %v", ref, err)
}
// Wait for the image pulling to finish
if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
return "", "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
}
glog.V(4).Infof("Finished downloading resources for image %q", ref)
if schema1Converter != nil {
desc, err = schema1Converter.Convert(ctx)
if err != nil {
return "", "", "", fmt.Errorf("failed to convert schema 1 image %q: %v", ref, err)
}
}
// In the future, containerd will rely on the information in the image store to perform image
// garbage collection.
// For now, we simply use it to store and retrieve information required for pulling an image.
// @stevvooe said we should `Put` before downloading content, However:
// 1) Containerd client put image metadata after downloading;
// 2) We need desc returned by schema1 converter.
// So just put the image metadata after downloading now.
// TODO(random-liu): Fix the potential garbage collection race.
repoDigest, repoTag := getRepoDigestAndTag(namedRef, desc.Digest, schema1Converter != nil)
if ref != repoTag && ref != repoDigest {
return "", "", "", fmt.Errorf("unexpected repo tag %q and repo digest %q for %q", repoTag, repoDigest, ref)
}
for _, r := range []string{repoTag, repoDigest} {
if r == "" {
continue
}
if err := c.imageStoreService.Put(ctx, r, desc); err != nil {
return "", "", "", fmt.Errorf("failed to put image reference %q desc %v into containerd image store: %v",
r, desc, err)
}
}
// Do not cleanup if following operations fail so as to make resumable download possible.
// TODO(random-liu): Replace with image.Unpack.
// Unpack the image layers into snapshots.
image, err := c.imageStoreService.Get(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
}
// Read the image manifest from content store.
manifestDigest := image.Target.Digest
p, err := content.ReadBlob(ctx, c.contentStoreService, manifestDigest)
if err != nil {
return "", "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
}
var manifest imagespec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return "", "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
manifestDigest, err)
}
diffIDs, err := image.RootFS(ctx, c.contentStoreService)
if err != nil {
return "", "", "", fmt.Errorf("failed to get image rootfs: %v", err)
}
if len(diffIDs) != len(manifest.Layers) {
return "", "", "", fmt.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]containerdrootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = imagespec.Descriptor{
// TODO: derive media type from compressed type
MediaType: imagespec.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
if _, err := containerdrootfs.ApplyLayers(ctx, layers, c.snapshotService, c.diffService); err != nil {
return "", "", "", fmt.Errorf("failed to apply layers %+v: %v", layers, err)
}
// TODO(random-liu): Considering how to deal with the disk usage of content.
configDesc, err := image.Config(ctx, c.contentStoreService)
if err != nil {
return "", "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
}
// Use config digest as imageID to conform to oci image spec, and also add image id as
// image reference.
imageID := configDesc.Digest.String()
if err := c.imageStoreService.Put(ctx, imageID, desc); err != nil {
return "", "", "", fmt.Errorf("failed to put image id %q into containerd image store: %v",
imageID, err)
}
return imageID, repoTag, repoDigest, nil
}
// waitDownloadingPollInterval is the interval to check resource downloading progress.
const waitDownloadingPollInterval = 200 * time.Millisecond
// waitForResourcesDownloading waits for all resource downloading to finish.
func (c *criContainerdService) waitForResourcesDownloading(ctx context.Context, resources map[string]struct{}) error {
ticker := time.NewTicker(waitDownloadingPollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// TODO(random-liu): Use better regexp when containerd `MakeRefKey` contains more
// information.
statuses, err := c.contentStoreService.Status(ctx, "")
if err != nil {
return fmt.Errorf("failed to get content status: %v", err)
}
pulling := false
// TODO(random-liu): Move Dispatch into a separate goroutine, so that we could report
// image pulling progress concurrently.
for _, status := range statuses {
_, ok := resources[status.Ref]
if ok {
glog.V(5).Infof("Pulling resource %q with progress %d/%d",
status.Ref, status.Offset, status.Total)
pulling = true
}
}
if !pulling {
return nil
}
case <-ctx.Done():
// TODO(random-liu): Abort ongoing pulling if cancelled.
return fmt.Errorf("image resources pulling is cancelled")
}
}
}
// insertToStringSlice is a helper function to insert a string into the string slice
// if the string is not in the slice yet.
func insertToStringSlice(ss []string, s string) []string {
found := false
for _, str := range ss {
if s == str {
found = true
break
}
}
if !found {
ss = append(ss, s)
}
return ss
}
// updateImageMetadata updates existing image meta with new repoTag and repoDigest.
func updateImageMetadata(meta *metadata.ImageMetadata, repoTag, repoDigest string) {
if repoTag != "" {
meta.RepoTags = insertToStringSlice(meta.RepoTags, repoTag)
}
if repoDigest != "" {
meta.RepoDigests = insertToStringSlice(meta.RepoDigests, repoDigest)
}
}