forked from distribution/distribution
-
Notifications
You must be signed in to change notification settings - Fork 0
/
linkedblobstore.go
415 lines (335 loc) · 12 KB
/
linkedblobstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
package storage
import (
"fmt"
"net/http"
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/storage/driver"
"github.com/docker/distribution/uuid"
)
// linkPathFunc describes a function that can resolve a link based on the
// repository name and digest.
type linkPathFunc func(name string, dgst digest.Digest) (string, error)
// linkedBlobStore provides a full BlobService that namespaces the blobs to a
// given repository. Effectively, it manages the links in a given repository
// that grant access to the global blob store.
type linkedBlobStore struct {
*blobStore
registry *registry
blobServer distribution.BlobServer
blobAccessController distribution.BlobDescriptorService
repository distribution.Repository
ctx context.Context // only to be used where context can't come through method args
deleteEnabled bool
resumableDigestEnabled bool
// linkPathFns specifies one or more path functions allowing one to
// control the repository blob link set to which the blob store
// dispatches. This is required because manifest and layer blobs have not
// yet been fully merged. At some point, this functionality should be
// removed an the blob links folder should be merged. The first entry is
// treated as the "canonical" link location and will be used for writes.
linkPathFns []linkPathFunc
}
var _ distribution.BlobStore = &linkedBlobStore{}
func (lbs *linkedBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return lbs.blobAccessController.Stat(ctx, dgst)
}
func (lbs *linkedBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return nil, err
}
return lbs.blobStore.Get(ctx, canonical.Digest)
}
func (lbs *linkedBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return nil, err
}
return lbs.blobStore.Open(ctx, canonical.Digest)
}
func (lbs *linkedBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
canonical, err := lbs.Stat(ctx, dgst) // access check
if err != nil {
return err
}
if canonical.MediaType != "" {
// Set the repository local content type.
w.Header().Set("Content-Type", canonical.MediaType)
}
return lbs.blobServer.ServeBlob(ctx, w, r, canonical.Digest)
}
func (lbs *linkedBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
dgst := digest.FromBytes(p)
// Place the data in the blob store first.
desc, err := lbs.blobStore.Put(ctx, mediaType, p)
if err != nil {
context.GetLogger(ctx).Errorf("error putting into main store: %v", err)
return distribution.Descriptor{}, err
}
if err := lbs.blobAccessController.SetDescriptor(ctx, dgst, desc); err != nil {
return distribution.Descriptor{}, err
}
// TODO(stevvooe): Write out mediatype if incoming differs from what is
// returned by Put above. Note that we should allow updates for a given
// repository.
return desc, lbs.linkBlob(ctx, desc)
}
// createOptions is a collection of blob creation modifiers relevant to general
// blob storage intended to be configured by the BlobCreateOption.Apply method.
type createOptions struct {
Mount struct {
ShouldMount bool
From reference.Canonical
}
}
type optionFunc func(interface{}) error
func (f optionFunc) Apply(v interface{}) error {
return f(v)
}
// WithMountFrom returns a BlobCreateOption which designates that the blob should be
// mounted from the given canonical reference.
func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
return optionFunc(func(v interface{}) error {
opts, ok := v.(*createOptions)
if !ok {
return fmt.Errorf("unexpected options type: %T", v)
}
opts.Mount.ShouldMount = true
opts.Mount.From = ref
return nil
})
}
// Writer begins a blob write session, returning a handle.
func (lbs *linkedBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
context.GetLogger(ctx).Debug("(*linkedBlobStore).Writer")
var opts createOptions
for _, option := range options {
err := option.Apply(&opts)
if err != nil {
return nil, err
}
}
if opts.Mount.ShouldMount {
desc, err := lbs.mount(ctx, opts.Mount.From.Name(), opts.Mount.From.Digest())
if err == nil {
// Mount successful, no need to initiate an upload session
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
}
}
uuid := uuid.Generate().String()
startedAt := time.Now().UTC()
path, err := pathFor(uploadDataPathSpec{
name: lbs.repository.Name(),
id: uuid,
})
if err != nil {
return nil, err
}
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
name: lbs.repository.Name(),
id: uuid,
})
if err != nil {
return nil, err
}
// Write a startedat file for this upload
if err := lbs.blobStore.driver.PutContent(ctx, startedAtPath, []byte(startedAt.Format(time.RFC3339))); err != nil {
return nil, err
}
return lbs.newBlobUpload(ctx, uuid, path, startedAt)
}
func (lbs *linkedBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
context.GetLogger(ctx).Debug("(*linkedBlobStore).Resume")
startedAtPath, err := pathFor(uploadStartedAtPathSpec{
name: lbs.repository.Name(),
id: id,
})
if err != nil {
return nil, err
}
startedAtBytes, err := lbs.blobStore.driver.GetContent(ctx, startedAtPath)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
return nil, distribution.ErrBlobUploadUnknown
default:
return nil, err
}
}
startedAt, err := time.Parse(time.RFC3339, string(startedAtBytes))
if err != nil {
return nil, err
}
path, err := pathFor(uploadDataPathSpec{
name: lbs.repository.Name(),
id: id,
})
if err != nil {
return nil, err
}
return lbs.newBlobUpload(ctx, id, path, startedAt)
}
func (lbs *linkedBlobStore) Delete(ctx context.Context, dgst digest.Digest) error {
if !lbs.deleteEnabled {
return distribution.ErrUnsupported
}
// Ensure the blob is available for deletion
_, err := lbs.blobAccessController.Stat(ctx, dgst)
if err != nil {
return err
}
err = lbs.blobAccessController.Clear(ctx, dgst)
if err != nil {
return err
}
return nil
}
func (lbs *linkedBlobStore) mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
repo, err := lbs.registry.Repository(ctx, sourceRepo)
if err != nil {
return distribution.Descriptor{}, err
}
stat, err := repo.Blobs(ctx).Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
desc := distribution.Descriptor{
Size: stat.Size,
// NOTE(stevvooe): The central blob store firewalls media types from
// other users. The caller should look this up and override the value
// for the specific repository.
MediaType: "application/octet-stream",
Digest: dgst,
}
return desc, lbs.linkBlob(ctx, desc)
}
// newBlobUpload allocates a new upload controller with the given state.
func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string, startedAt time.Time) (distribution.BlobWriter, error) {
fw, err := newFileWriter(ctx, lbs.driver, path)
if err != nil {
return nil, err
}
bw := &blobWriter{
blobStore: lbs,
id: uuid,
startedAt: startedAt,
digester: digest.Canonical.New(),
bufferedFileWriter: *fw,
resumableDigestEnabled: lbs.resumableDigestEnabled,
}
return bw, nil
}
// linkBlob links a valid, written blob into the registry under the named
// repository for the upload controller.
func (lbs *linkedBlobStore) linkBlob(ctx context.Context, canonical distribution.Descriptor, aliases ...digest.Digest) error {
dgsts := append([]digest.Digest{canonical.Digest}, aliases...)
// TODO(stevvooe): Need to write out mediatype for only canonical hash
// since we don't care about the aliases. They are generally unused except
// for tarsum but those versions don't care about mediatype.
// Don't make duplicate links.
seenDigests := make(map[digest.Digest]struct{}, len(dgsts))
// only use the first link
linkPathFn := lbs.linkPathFns[0]
for _, dgst := range dgsts {
if _, seen := seenDigests[dgst]; seen {
continue
}
seenDigests[dgst] = struct{}{}
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
if err != nil {
return err
}
if err := lbs.blobStore.link(ctx, blobLinkPath, canonical.Digest); err != nil {
return err
}
}
return nil
}
type linkedBlobStatter struct {
*blobStore
repository distribution.Repository
// linkPathFns specifies one or more path functions allowing one to
// control the repository blob link set to which the blob store
// dispatches. This is required because manifest and layer blobs have not
// yet been fully merged. At some point, this functionality should be
// removed an the blob links folder should be merged. The first entry is
// treated as the "canonical" link location and will be used for writes.
linkPathFns []linkPathFunc
}
var _ distribution.BlobDescriptorService = &linkedBlobStatter{}
func (lbs *linkedBlobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
var (
resolveErr error
target digest.Digest
)
// try the many link path functions until we get success or an error that
// is not PathNotFoundError.
for _, linkPathFn := range lbs.linkPathFns {
var err error
target, err = lbs.resolveWithLinkFunc(ctx, dgst, linkPathFn)
if err == nil {
break // success!
}
switch err := err.(type) {
case driver.PathNotFoundError:
resolveErr = distribution.ErrBlobUnknown // move to the next linkPathFn, saving the error
default:
return distribution.Descriptor{}, err
}
}
if resolveErr != nil {
return distribution.Descriptor{}, resolveErr
}
if target != dgst {
// Track when we are doing cross-digest domain lookups. ie, sha512 to sha256.
context.GetLogger(ctx).Warnf("looking up blob with canonical target: %v -> %v", dgst, target)
}
// TODO(stevvooe): Look up repository local mediatype and replace that on
// the returned descriptor.
return lbs.blobStore.statter.Stat(ctx, target)
}
func (lbs *linkedBlobStatter) Clear(ctx context.Context, dgst digest.Digest) (err error) {
// clear any possible existence of a link described in linkPathFns
for _, linkPathFn := range lbs.linkPathFns {
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
if err != nil {
return err
}
err = lbs.blobStore.driver.Delete(ctx, blobLinkPath)
if err != nil {
switch err := err.(type) {
case driver.PathNotFoundError:
continue // just ignore this error and continue
default:
return err
}
}
}
return nil
}
// resolveTargetWithFunc allows us to read a link to a resource with different
// linkPathFuncs to let us try a few different paths before returning not
// found.
func (lbs *linkedBlobStatter) resolveWithLinkFunc(ctx context.Context, dgst digest.Digest, linkPathFn linkPathFunc) (digest.Digest, error) {
blobLinkPath, err := linkPathFn(lbs.repository.Name(), dgst)
if err != nil {
return "", err
}
return lbs.blobStore.readlink(ctx, blobLinkPath)
}
func (lbs *linkedBlobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
// The canonical descriptor for a blob is set at the commit phase of upload
return nil
}
// blobLinkPath provides the path to the blob link, also known as layers.
func blobLinkPath(name string, dgst digest.Digest) (string, error) {
return pathFor(layerLinkPathSpec{name: name, digest: dgst})
}
// manifestRevisionLinkPath provides the path to the manifest revision link.
func manifestRevisionLinkPath(name string, dgst digest.Digest) (string, error) {
return pathFor(manifestRevisionLinkPathSpec{name: name, revision: dgst})
}