/
image_import.go
408 lines (358 loc) · 12.5 KB
/
image_import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
package containerd
import (
"bufio"
"bytes"
"context"
"encoding/json"
"io"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
cerrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/builder/dockerfile"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/image"
"github.com/docker/docker/internal/compatcontext"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/pools"
"github.com/google/uuid"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// ImportImage imports an image, getting the archived layer data from layerReader.
// Layer archive is imported as-is if the compression is gzip or zstd.
// Uncompressed, xz and bzip2 archives are recompressed into gzip.
// The image is tagged with the given reference.
// If the platform is nil, the default host platform is used.
// The message is used as the history comment.
// Image configuration is derived from the dockerfile instructions in changes.
func (i *ImageService) ImportImage(ctx context.Context, ref reference.Named, platform *ocispec.Platform, msg string, layerReader io.Reader, changes []string) (image.ID, error) {
refString := ""
if ref != nil {
refString = ref.String()
}
logger := logrus.WithField("ref", refString)
ctx, release, err := i.client.WithLease(ctx)
if err != nil {
return "", errdefs.System(err)
}
defer func() {
if err := release(compatcontext.WithoutCancel(ctx)); err != nil {
logger.WithError(err).Warn("failed to release lease created for import")
}
}()
if platform == nil {
def := platforms.DefaultSpec()
platform = &def
}
imageConfig, err := dockerfile.BuildFromConfig(ctx, &container.Config{}, changes, platform.OS)
if err != nil {
logger.WithError(err).Debug("failed to process changes")
return "", errdefs.InvalidParameter(err)
}
cs := i.client.ContentStore()
compressedDigest, uncompressedDigest, mt, err := saveArchive(ctx, cs, layerReader)
if err != nil {
logger.WithError(err).Debug("failed to write layer blob")
return "", err
}
logger = logger.WithFields(logrus.Fields{
"compressedDigest": compressedDigest,
"uncompressedDigest": uncompressedDigest,
})
size, err := fillUncompressedLabel(ctx, cs, compressedDigest, uncompressedDigest)
if err != nil {
logger.WithError(err).Debug("failed to set uncompressed label on the compressed blob")
return "", err
}
compressedRootfsDesc := ocispec.Descriptor{
MediaType: mt,
Digest: compressedDigest,
Size: size,
}
ociCfg := containerConfigToOciImageConfig(imageConfig)
createdAt := time.Now()
config := ocispec.Image{
Platform: *platform,
Created: &createdAt,
Author: "",
Config: ociCfg,
RootFS: ocispec.RootFS{
Type: "layers",
DiffIDs: []digest.Digest{uncompressedDigest},
},
History: []ocispec.History{
{
Created: &createdAt,
CreatedBy: "",
Author: "",
Comment: msg,
EmptyLayer: false,
},
},
}
configDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageConfig, config, nil)
if err != nil {
return "", err
}
manifest := ocispec.Manifest{
MediaType: ocispec.MediaTypeImageManifest,
Versioned: specs.Versioned{
SchemaVersion: 2,
},
Config: configDesc,
Layers: []ocispec.Descriptor{
compressedRootfsDesc,
},
}
manifestDesc, err := storeJson(ctx, cs, ocispec.MediaTypeImageManifest, manifest, map[string]string{
"containerd.io/gc.ref.content.config": configDesc.Digest.String(),
"containerd.io/gc.ref.content.l.0": compressedDigest.String(),
})
if err != nil {
return "", err
}
id := image.ID(manifestDesc.Digest.String())
img := images.Image{
Name: refString,
Target: manifestDesc,
CreatedAt: createdAt,
}
if img.Name == "" {
img.Name = danglingImageName(manifestDesc.Digest)
}
err = i.saveImage(ctx, img)
if err != nil {
logger.WithError(err).Debug("failed to save image")
return "", err
}
err = i.unpackImage(ctx, img, *platform)
if err != nil {
logger.WithError(err).Debug("failed to unpack image")
} else {
i.LogImageEvent(id.String(), id.String(), "import")
}
return id, err
}
// saveArchive saves the archive from bufRd to the content store, compressing it if necessary.
// Returns compressed blob digest, digest of the uncompressed data and media type of the stored blob.
func saveArchive(ctx context.Context, cs content.Store, layerReader io.Reader) (digest.Digest, digest.Digest, string, error) {
// Wrap the reader in buffered reader to allow peeks.
p := pools.BufioReader32KPool
bufRd := p.Get(layerReader)
defer p.Put(bufRd)
compression, err := detectCompression(bufRd)
if err != nil {
return "", "", "", err
}
var uncompressedReader io.Reader = bufRd
switch compression {
case archive.Gzip, archive.Zstd:
// If the input is already a compressed layer, just save it as is.
mediaType := ocispec.MediaTypeImageLayerGzip
if compression == archive.Zstd {
mediaType = ocispec.MediaTypeImageLayerZstd
}
compressedDigest, uncompressedDigest, err := writeCompressedBlob(ctx, cs, mediaType, bufRd)
if err != nil {
return "", "", "", err
}
return compressedDigest, uncompressedDigest, mediaType, nil
case archive.Bzip2, archive.Xz:
r, err := archive.DecompressStream(bufRd)
if err != nil {
return "", "", "", errdefs.InvalidParameter(err)
}
defer r.Close()
uncompressedReader = r
fallthrough
case archive.Uncompressed:
mediaType := ocispec.MediaTypeImageLayerGzip
compression := archive.Gzip
compressedDigest, uncompressedDigest, err := compressAndWriteBlob(ctx, cs, compression, mediaType, uncompressedReader)
if err != nil {
return "", "", "", err
}
return compressedDigest, uncompressedDigest, mediaType, nil
}
return "", "", "", errdefs.InvalidParameter(errors.New("unsupported archive compression"))
}
// writeCompressedBlob writes the blob and simultaneously computes the digest of the uncompressed data.
func writeCompressedBlob(ctx context.Context, cs content.Store, mediaType string, bufRd *bufio.Reader) (digest.Digest, digest.Digest, error) {
pr, pw := io.Pipe()
defer pw.Close()
defer pr.Close()
c := make(chan digest.Digest)
// Start copying the blob to the content store from the pipe and tee it to the pipe.
go func() {
compressedDigest, err := writeBlobAndReturnDigest(ctx, cs, mediaType, io.TeeReader(bufRd, pw))
pw.CloseWithError(err)
c <- compressedDigest
}()
digester := digest.Canonical.Digester()
// Decompress the piped blob.
decompressedStream, err := archive.DecompressStream(pr)
if err == nil {
// Feed the digester with decompressed data.
_, err = io.Copy(digester.Hash(), decompressedStream)
decompressedStream.Close()
}
pr.CloseWithError(err)
compressedDigest := <-c
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", "", errdefs.Cancelled(err)
}
return "", "", errdefs.System(err)
}
uncompressedDigest := digester.Digest()
return compressedDigest, uncompressedDigest, nil
}
// compressAndWriteBlob compresses the uncompressedReader and stores it in the content store.
func compressAndWriteBlob(ctx context.Context, cs content.Store, compression archive.Compression, mediaType string, uncompressedLayerReader io.Reader) (digest.Digest, digest.Digest, error) {
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()
compressor, err := archive.CompressStream(pw, compression)
if err != nil {
return "", "", errdefs.InvalidParameter(err)
}
defer compressor.Close()
writeChan := make(chan digest.Digest)
// Start copying the blob to the content store from the pipe.
go func() {
dgst, err := writeBlobAndReturnDigest(ctx, cs, mediaType, pr)
pr.CloseWithError(err)
writeChan <- dgst
}()
// Copy archive to the pipe and tee it to a digester.
// This will feed the pipe the above goroutine is reading from.
uncompressedDigester := digest.Canonical.Digester()
readFromInputAndDigest := io.TeeReader(uncompressedLayerReader, uncompressedDigester.Hash())
_, err = io.Copy(compressor, readFromInputAndDigest)
compressor.Close()
pw.CloseWithError(err)
compressedDigest := <-writeChan
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", "", errdefs.Cancelled(err)
}
return "", "", errdefs.System(err)
}
return compressedDigest, uncompressedDigester.Digest(), err
}
// writeBlobAndReturnDigest writes a blob to the content store and returns the digest.
func writeBlobAndReturnDigest(ctx context.Context, cs content.Store, mt string, reader io.Reader) (digest.Digest, error) {
digester := digest.Canonical.Digester()
if err := content.WriteBlob(ctx, cs, uuid.New().String(), io.TeeReader(reader, digester.Hash()), ocispec.Descriptor{MediaType: mt}); err != nil {
return "", errdefs.System(err)
}
return digester.Digest(), nil
}
// saveImage creates an image in the ImageService or updates it if it exists.
func (i *ImageService) saveImage(ctx context.Context, img images.Image) error {
is := i.client.ImageService()
if _, err := is.Update(ctx, img); err != nil {
if cerrdefs.IsNotFound(err) {
if _, err := is.Create(ctx, img); err != nil {
return errdefs.Unknown(err)
}
} else {
return errdefs.Unknown(err)
}
}
return nil
}
// unpackImage unpacks the image into the snapshotter.
func (i *ImageService) unpackImage(ctx context.Context, img images.Image, platform ocispec.Platform) error {
c8dImg := containerd.NewImageWithPlatform(i.client, img, platforms.Only(platform))
unpacked, err := c8dImg.IsUnpacked(ctx, i.snapshotter)
if err != nil {
return err
}
if !unpacked {
err = c8dImg.Unpack(ctx, i.snapshotter)
}
return err
}
// detectCompression dectects the reader compression type.
func detectCompression(bufRd *bufio.Reader) (archive.Compression, error) {
bs, err := bufRd.Peek(10)
if err != nil && err != io.EOF {
// Note: we'll ignore any io.EOF error because there are some odd
// cases where the layer.tar file will be empty (zero bytes) and
// that results in an io.EOF from the Peek() call. So, in those
// cases we'll just treat it as a non-compressed stream and
// that means just create an empty layer.
// See Issue 18170
return archive.Uncompressed, errdefs.Unknown(err)
}
return archive.DetectCompression(bs), nil
}
// fillUncompressedLabel sets the uncompressed digest label on the compressed blob metadata
// and returns the compressed blob size.
func fillUncompressedLabel(ctx context.Context, cs content.Store, compressedDigest digest.Digest, uncompressedDigest digest.Digest) (int64, error) {
info, err := cs.Info(ctx, compressedDigest)
if err != nil {
return 0, errdefs.Unknown(errors.Wrapf(err, "couldn't open previously written blob"))
}
size := info.Size
info.Labels = map[string]string{"containerd.io/uncompressed": uncompressedDigest.String()}
_, err = cs.Update(ctx, info, "labels.*")
if err != nil {
return 0, errdefs.System(errors.Wrapf(err, "couldn't set uncompressed label"))
}
return size, nil
}
// storeJson marshals the provided object as json and stores it.
func storeJson(ctx context.Context, cs content.Ingester, mt string, obj interface{}, labels map[string]string) (ocispec.Descriptor, error) {
configData, err := json.Marshal(obj)
if err != nil {
return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
}
configDigest := digest.FromBytes(configData)
if err != nil {
return ocispec.Descriptor{}, errdefs.InvalidParameter(err)
}
desc := ocispec.Descriptor{
MediaType: mt,
Digest: configDigest,
Size: int64(len(configData)),
}
var opts []content.Opt
if labels != nil {
opts = append(opts, content.WithLabels(labels))
}
err = content.WriteBlob(ctx, cs, configDigest.String(), bytes.NewReader(configData), desc, opts...)
if err != nil {
return ocispec.Descriptor{}, errdefs.System(err)
}
return desc, nil
}
func containerConfigToOciImageConfig(cfg *container.Config) ocispec.ImageConfig {
ociCfg := ocispec.ImageConfig{
User: cfg.User,
Env: cfg.Env,
Entrypoint: cfg.Entrypoint,
Cmd: cfg.Cmd,
Volumes: cfg.Volumes,
WorkingDir: cfg.WorkingDir,
Labels: cfg.Labels,
StopSignal: cfg.StopSignal,
ArgsEscaped: cfg.ArgsEscaped,
}
if len(cfg.ExposedPorts) > 0 {
ociCfg.ExposedPorts = map[string]struct{}{}
for k, v := range cfg.ExposedPorts {
ociCfg.ExposedPorts[string(k)] = v
}
}
return ociCfg
}