Skip to content

Commit

Permalink
ociregistry: split PushBlobChunked in two and add offset to resume
Browse files Browse the repository at this point in the history
As a method, PushBlobChunked was getting heavy.
Depending on whether id was empty, we would start a chunked upload,
which in ociclient entails a POST to get a new upload ID,
or we would resume a previous chunked upload,
which in ociclient entails a GET to get the last offset.

For the case where we start a new upload, we now have PushBlobChunked
which no longer takes an id and whose behavior is now pretty simple.

For the case where we resume a previous upload,
we now have PushBlobChunkedResume which does require the previous id.
It also takes an offset, so that ociclient need not perform a GET
to retrieve the last offset if the user kept track of it.
It can be left as -1 to keep the old behavior with a GET.

Adding the offset parameter to the resume method is a crucial detail.
Right now, implementing an OCI registry proxy by joining a user-facing
ociserver with an ociclient pointed at a "backend" registry works.
But, since ociserver implements chunked upload PATCH and PUT requests
as a chunked upload resumption via ociclient,
such a proxy would for example turn POST+PATCH into POST+GET+PATCH.

The interface now allows the end user to keep track of the last offset
alongside the id of the chunked upload it's doing,
meaning that a proxy will soon be capable of avoiding extra GETs.
For now, the ociserver code gains two TODOs to make that change,
for the sake of making the two changes separately.

Finally, I also noticed that ocimem's chunked upload resumption
is surprisingly lenient, allowing the previous ID to not exist at all.
The tests in ociserver seem to rely on this; I've added a TODO
to revisit the matter at some point.

Signed-off-by: Daniel Martí <mvdan@mvdan.cc>
Change-Id: I6d18d00221417a26c29ac5d56a9677d6f018e2c9
Dispatch-Trailer: {"type":"trybot","CL":1170688,"patchset":1,"ref":"refs/changes/88/1170688/1","targetBranch":"main"}
  • Loading branch information
mvdan authored and porcuepine committed Oct 13, 2023
1 parent cbb5014 commit ef8bbf1
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 96 deletions.
3 changes: 1 addition & 2 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/cockroachdb/apd/v2 v2.0.1/go.mod h1:DDxRlzC2lo3/vSlmSoS7JkqbbrARPuFOGr0B9pvN3Gw=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down Expand Up @@ -53,7 +54,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand All @@ -79,7 +79,6 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
Expand Down
46 changes: 27 additions & 19 deletions ociregistry/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,24 @@ import (
type Funcs struct {
NewError func(ctx context.Context, methodName, repo string) error

GetBlob_ func(ctx context.Context, repo string, digest Digest) (BlobReader, error)
GetBlobRange_ func(ctx context.Context, repo string, digest Digest, offset0, offset1 int64) (BlobReader, error)
GetManifest_ func(ctx context.Context, repo string, digest Digest) (BlobReader, error)
GetTag_ func(ctx context.Context, repo string, tagName string) (BlobReader, error)
ResolveBlob_ func(ctx context.Context, repo string, digest Digest) (Descriptor, error)
ResolveManifest_ func(ctx context.Context, repo string, digest Digest) (Descriptor, error)
ResolveTag_ func(ctx context.Context, repo string, tagName string) (Descriptor, error)
PushBlob_ func(ctx context.Context, repo string, desc Descriptor, r io.Reader) (Descriptor, error)
PushBlobChunked_ func(ctx context.Context, repo string, id string, chunkSize int) (BlobWriter, error)
MountBlob_ func(ctx context.Context, fromRepo, toRepo string, digest Digest) (Descriptor, error)
PushManifest_ func(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (Descriptor, error)
DeleteBlob_ func(ctx context.Context, repo string, digest Digest) error
DeleteManifest_ func(ctx context.Context, repo string, digest Digest) error
DeleteTag_ func(ctx context.Context, repo string, name string) error
Repositories_ func(ctx context.Context) Iter[string]
Tags_ func(ctx context.Context, repo string) Iter[string]
Referrers_ func(ctx context.Context, repo string, digest Digest, artifactType string) Iter[Descriptor]
GetBlob_ func(ctx context.Context, repo string, digest Digest) (BlobReader, error)
GetBlobRange_ func(ctx context.Context, repo string, digest Digest, offset0, offset1 int64) (BlobReader, error)
GetManifest_ func(ctx context.Context, repo string, digest Digest) (BlobReader, error)
GetTag_ func(ctx context.Context, repo string, tagName string) (BlobReader, error)
ResolveBlob_ func(ctx context.Context, repo string, digest Digest) (Descriptor, error)
ResolveManifest_ func(ctx context.Context, repo string, digest Digest) (Descriptor, error)
ResolveTag_ func(ctx context.Context, repo string, tagName string) (Descriptor, error)
PushBlob_ func(ctx context.Context, repo string, desc Descriptor, r io.Reader) (Descriptor, error)
PushBlobChunked_ func(ctx context.Context, repo string, chunkSize int) (BlobWriter, error)
PushBlobChunkedResume_ func(ctx context.Context, repo, id string, offset int64, chunkSize int) (BlobWriter, error)
MountBlob_ func(ctx context.Context, fromRepo, toRepo string, digest Digest) (Descriptor, error)
PushManifest_ func(ctx context.Context, repo string, tag string, contents []byte, mediaType string) (Descriptor, error)
DeleteBlob_ func(ctx context.Context, repo string, digest Digest) error
DeleteManifest_ func(ctx context.Context, repo string, digest Digest) error
DeleteTag_ func(ctx context.Context, repo string, name string) error
Repositories_ func(ctx context.Context) Iter[string]
Tags_ func(ctx context.Context, repo string) Iter[string]
Referrers_ func(ctx context.Context, repo string, digest Digest, artifactType string) Iter[Descriptor]
}

// This blesses Funcs as the canonical Interface implementation.
Expand Down Expand Up @@ -122,9 +123,16 @@ func (f *Funcs) PushBlob(ctx context.Context, repo string, desc Descriptor, r io
return Descriptor{}, f.newError(ctx, "PushBlob", repo)
}

func (f *Funcs) PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (BlobWriter, error) {
func (f *Funcs) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (BlobWriter, error) {
if f != nil && f.PushBlobChunked_ != nil {
return f.PushBlobChunked_(ctx, repo, id, chunkSize)
return f.PushBlobChunked_(ctx, repo, chunkSize)
}
return nil, f.newError(ctx, "PushBlobChunked", repo)
}

func (f *Funcs) PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (BlobWriter, error) {
if f != nil && f.PushBlobChunked_ != nil {
return f.PushBlobChunkedResume_(ctx, repo, id, offset, chunkSize)
}
return nil, f.newError(ctx, "PushBlobChunked", repo)
}
Expand Down
32 changes: 26 additions & 6 deletions ociregistry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,33 @@ type Writer interface {
PushBlob(ctx context.Context, repo string, desc Descriptor, r io.Reader) (Descriptor, error)

// PushBlobChunked starts to push a blob to the given repository.
// The returned BlobWriter can be used to stream the upload and resume on temporary errors.
// If id is non-zero, it should be the value returned from BlobWriter.ID
// from a previous PushBlobChunked call and will be used to resume that blob
// write.
// The returned [BlobWriter] can be used to stream the upload and resume on temporary errors.
//
// The chunkSize parameter provides a hint for the chunk size to use
// when writing to the registry. If it's zero, a suitable default will be chosen.
// It might be larger if the underlying registry requires that.
//
// The context remains active as long as the BlobWriter is around: if it's
// cancelled, it should cause any blocked BlobWriter operations to terminate.
PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (BlobWriter, error)
PushBlobChunked(ctx context.Context, repo string, chunkSize int) (BlobWriter, error)

// If id is non-zero, it should be the value returned from [BlobWriter.ID]
// from a previous PushBlobChunked call and will be used to resume that blob write.
// When resuming a blob write, you should typically provide the previous [BlobWriter.Size] as the offset.
// If that's not possible, use an offset of -1 to continue where the last write left off.

// PushBlobChunkedResume resumes a previous push of a blob started with PushBlobChunked.
// The id should be the value returned from [BlobWriter.ID] from the previous push.
// and the offset should be the value returned from [BlobWriter.Size].
//
// The offset and chunkSize should similarly be obtained from the previous [BlobWriter]
// via the [BlobWriter.Size] and [BlobWriter.ChunkSize] methods.
// Alternatively, set offset to -1 to continue where the last write left off,
// and to only use chunkSize as a hint like in PushBlobChunked.
//
// The context remains active as long as the BlobWriter is around: if it's
// cancelled, it should cause any blocked BlobWriter operations to terminate.
PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (BlobWriter, error)

// MountBlob makes a blob with the given digest that's in fromRepo available
// in toRepo and returns its canonical descriptor.
Expand Down Expand Up @@ -216,7 +231,7 @@ type Lister interface {
Referrers(ctx context.Context, repo string, digest Digest, artifactType string) Iter[Descriptor]
}

// BlobWriter provides a handle for inserting data into a blob store.
// BlobWriter provides a handle for uploading a blob to a registry.
type BlobWriter interface {
// Write writes more data to the blob. When resuming, the
// caller must start writing data from Size bytes into the content.
Expand All @@ -229,6 +244,11 @@ type BlobWriter interface {
// Size returns the number of bytes written to this blob.
Size() int64

// ChunkSize returns the maximum number of bytes to upload at a single time.
// This number must meet the minimum given by the registry
// and should otherwise follow the hint given by the user.
ChunkSize() int64

// ID returns the opaque identifier for this writer. The returned value
// can be passed to PushBlobChunked to resume the write.
// It is only valid before Write has been called or after Close has
Expand Down
102 changes: 63 additions & 39 deletions ociregistry/ociclient/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,59 +133,77 @@ func (c *client) PushBlob(ctx context.Context, repo string, desc ociregistry.Des
// weigh up in-memory cost vs round-trip overhead.
const defaultChunkSize = 64 * 1024

func (c *client) PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (ociregistry.BlobWriter, error) {
func (c *client) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
if chunkSize <= 0 {
chunkSize = defaultChunkSize
}
if id == "" {
resp, err := c.doRequest(ctx, &ocirequest.Request{
Kind: ocirequest.ReqBlobStartUpload,
Repo: repo,
}, http.StatusAccepted)
if err != nil {
return nil, err
}
resp.Body.Close()
location, err := locationFromResponse(resp)
if err != nil {
return nil, err
}
return &blobWriter{
ctx: ctx,
client: c,
chunkSize: chunkSizeFromResponse(resp, chunkSize),
chunk: make([]byte, 0, chunkSize),
location: location,
}, nil
}
// Try to find what offset we're meant to be writing at
// by doing a GET to the location.
req, err := http.NewRequest("GET", id, nil)
resp, err := c.doRequest(ctx, &ocirequest.Request{
Kind: ocirequest.ReqBlobStartUpload,
Repo: repo,
}, http.StatusAccepted)
if err != nil {
return nil, err
}
resp, err := c.do(req, http.StatusNoContent)
if err != nil {
return nil, fmt.Errorf("cannot recover chunk offset: %v", err)
}
resp.Body.Close()
location, err := locationFromResponse(resp)
if err != nil {
return nil, fmt.Errorf("cannot get location from response: %v", err)
return nil, err
}
rangeStr := resp.Header.Get("Range")
p0, p1, ok := parseRange(rangeStr)
if !ok {
return nil, fmt.Errorf("invalid range %q in response", rangeStr)
return &blobWriter{
ctx: ctx,
client: c,
chunkSize: chunkSizeFromResponse(resp, chunkSize),
chunk: make([]byte, 0, chunkSize),
location: location,
}, nil
}

func (c *client) PushBlobChunkedResume(ctx context.Context, repo string, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
if id == "" {
return nil, fmt.Errorf("id must be set to resume a chunked upload")
}
if p0 != 0 {
return nil, fmt.Errorf("range %q does not start with 0", rangeStr)
var location *url.URL
switch {
case offset == -1:
// Try to find what offset we're meant to be writing at
// by doing a GET to the location.
req, err := http.NewRequest("GET", id, nil)
if err != nil {
return nil, err
}
resp, err := c.do(req, http.StatusNoContent)
if err != nil {
return nil, fmt.Errorf("cannot recover chunk offset: %v", err)
}
location, err = locationFromResponse(resp)
if err != nil {
return nil, fmt.Errorf("cannot get location from response: %v", err)
}
rangeStr := resp.Header.Get("Range")
p0, p1, ok := parseRange(rangeStr)
if !ok {
return nil, fmt.Errorf("invalid range %q in response", rangeStr)
}
if p0 != 0 {
return nil, fmt.Errorf("range %q does not start with 0", rangeStr)
}
chunkSize = chunkSizeFromResponse(resp, chunkSize)
offset = p1
case offset < 0:
return nil, fmt.Errorf("invalid offset; must be -1, 0, or positive")
default:
var err error
location, err = url.Parse(id) // Note that this mirrors [BlobWriter.ID].
if err != nil {
return nil, fmt.Errorf("provided ID is not a valid location URL")
}
}
return &blobWriter{
ctx: ctx,
client: c,
chunkSize: chunkSizeFromResponse(resp, chunkSize),
size: p1,
flushed: p1,
chunkSize: chunkSize,
size: offset,
flushed: offset,
location: location,
}, nil
}
Expand Down Expand Up @@ -287,6 +305,12 @@ func (w *blobWriter) Size() int64 {
return w.size
}

func (w *blobWriter) ChunkSize() int64 {
w.mu.Lock()
defer w.mu.Unlock()
return w.size
}

func (w *blobWriter) ID() string {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down
24 changes: 21 additions & 3 deletions ociregistry/ocidebug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,22 @@ func (r *logger) PushBlob(ctx context.Context, repoName string, desc ociregistry
return desc, err
}

func (r *logger) PushBlobChunked(ctx context.Context, repoName string, resumeID string, chunkSize int) (ociregistry.BlobWriter, error) {
func (r *logger) PushBlobChunked(ctx context.Context, repoName string, chunkSize int) (ociregistry.BlobWriter, error) {
bwid := fmt.Sprintf("bw%d", atomic.AddInt32(&blobWriterID, 1))
r.logf("PushBlobChunked %s resumeID=%q chunkSize=%d {", repoName, resumeID, chunkSize)
w, err := r.r.PushBlobChunked(ctx, repoName, resumeID, chunkSize)
r.logf("PushBlobChunked %s chunkSize=%d {", repoName, chunkSize)
w, err := r.r.PushBlobChunked(ctx, repoName, chunkSize)
r.logf("} -> %T(%s), %v", w, bwid, err)
return blobWriter{
id: bwid,
w: w,
r: r,
}, err
}

func (r *logger) PushBlobChunkedResume(ctx context.Context, repoName, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
bwid := fmt.Sprintf("bw%d", atomic.AddInt32(&blobWriterID, 1))
r.logf("PushBlobChunkedResume %s id=%q chunkSize=%d {", repoName, id, chunkSize)
w, err := r.r.PushBlobChunkedResume(ctx, repoName, id, offset, chunkSize)
r.logf("} -> %T(%s), %v", w, bwid, err)
return blobWriter{
id: bwid,
Expand Down Expand Up @@ -209,6 +221,12 @@ func (w blobWriter) Size() int64 {
return size
}

func (w blobWriter) ChunkSize() int64 {
chunkSize := w.w.ChunkSize()
w.logf("ChunkSize -> %v", chunkSize)
return chunkSize
}

func (w blobWriter) Close() error {
w.logf("Close {")
err := w.w.Close()
Expand Down
8 changes: 6 additions & 2 deletions ociregistry/ocifilter/prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ func (r *subRegistry) PushBlob(ctx context.Context, repo string, desc ociregistr
return r.r.PushBlob(ctx, r.repo(repo), desc, rd)
}

func (r *subRegistry) PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (ociregistry.BlobWriter, error) {
return r.r.PushBlobChunked(ctx, r.repo(repo), id, chunkSize)
func (r *subRegistry) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
return r.r.PushBlobChunked(ctx, r.repo(repo), chunkSize)
}

func (r *subRegistry) PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
return r.r.PushBlobChunkedResume(ctx, r.repo(repo), id, offset, chunkSize)
}

func (r *subRegistry) MountBlob(ctx context.Context, fromRepo, toRepo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
Expand Down
11 changes: 9 additions & 2 deletions ociregistry/ocifilter/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,18 @@ func (r *selectRegistry) PushBlob(ctx context.Context, repo string, desc ociregi
return r.r.PushBlob(ctx, repo, desc, rd)
}

func (r *selectRegistry) PushBlobChunked(ctx context.Context, repo string, id string, chunkSize int) (ociregistry.BlobWriter, error) {
func (r *selectRegistry) PushBlobChunked(ctx context.Context, repo string, chunkSize int) (ociregistry.BlobWriter, error) {
if !r.allow(repo) {
return nil, ociregistry.ErrDenied
}
return r.r.PushBlobChunked(ctx, repo, id, chunkSize)
return r.r.PushBlobChunked(ctx, repo, chunkSize)
}

func (r *selectRegistry) PushBlobChunkedResume(ctx context.Context, repo, id string, offset int64, chunkSize int) (ociregistry.BlobWriter, error) {
if !r.allow(repo) {
return nil, ociregistry.ErrDenied
}
return r.r.PushBlobChunkedResume(ctx, repo, id, offset, chunkSize)
}

func (r *selectRegistry) MountBlob(ctx context.Context, fromRepo, toRepo string, digest ociregistry.Digest) (ociregistry.Descriptor, error) {
Expand Down
4 changes: 4 additions & 0 deletions ociregistry/ocimem/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (b *Buffer) Size() int64 {
return int64(len(b.buf))
}

func (b *Buffer) ChunkSize() int64 {
return 8 << 10 // 8KiB; not really important
}

// GetBlob returns any committed data and is descriptor. It returns an error
// if the data hasn't been committed or there was an error doing so.
func (b *Buffer) GetBlob() (ociregistry.Descriptor, []byte, error) {
Expand Down
Loading

0 comments on commit ef8bbf1

Please sign in to comment.