Skip to content

Commit

Permalink
Don't reuse errgroups, propagate contexts better (#1128)
Browse files Browse the repository at this point in the history
When you Wait() on an errgroup, the underlying context gets cancelled,
so calling Wait() twice will break if you correctly propagate contexts.
We weren't propagating contexts correctly, so this kind of worked
before, but if we actually want to leverage contexts better we should
fix this :)
  • Loading branch information
jonjohnsonjr committed Sep 21, 2021
1 parent b5cf9c4 commit c71ca9b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 39 deletions.
21 changes: 12 additions & 9 deletions pkg/v1/remote/multi_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package remote

import (
"context"
"fmt"
"net/http"

Expand Down Expand Up @@ -133,12 +134,13 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
g, ctx := errgroup.WithContext(o.context)
ctx := o.context
g, gctx := errgroup.WithContext(o.context)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
if err := w.uploadOne(gctx, b); err != nil {
return err
}
}
Expand All @@ -150,8 +152,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
for _, b := range blobs {
select {
case blobChan <- b:
case <-ctx.Done():
return ctx.Err()
case <-gctx.Done():
return gctx.Err()
}
}
return nil
Expand All @@ -160,7 +162,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
return err
}

commitMany := func(m map[name.Reference]Taggable) error {
commitMany := func(ctx context.Context, m map[name.Reference]Taggable) error {
g, ctx := errgroup.WithContext(ctx)
// With all of the constituent elements uploaded, upload the manifests
// to commit the images and indexes, and collect any errors.
type task struct {
Expand All @@ -172,7 +175,7 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
if err := w.commitManifest(t.i, t.ref); err != nil {
if err := w.commitManifest(ctx, t.i, t.ref); err != nil {
return err
}
}
Expand All @@ -189,19 +192,19 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
}
// Push originally requested image manifests. These have no
// dependencies.
if err := commitMany(images); err != nil {
if err := commitMany(ctx, images); err != nil {
return err
}
// Push new manifests from lowest levels up.
for i := len(newManifests) - 1; i >= 0; i-- {
if err := commitMany(newManifests[i]); err != nil {
if err := commitMany(ctx, newManifests[i]); err != nil {
return err
}
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.

return commitMany(indexes)
return commitMany(ctx, indexes)
}

// addIndexBlobs adds blobs to the set of blobs we intend to upload, and
Expand Down
47 changes: 21 additions & 26 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) {
defer close(o.updates)
defer func() { sendError(o.updates, rerr) }()
}
return writeImage(ref, img, o, lastUpdate)
return writeImage(o.context, ref, img, o, lastUpdate)
}

func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Update) error {
func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Update) error {
ls, err := img.Layers()
if err != nil {
return err
Expand All @@ -77,19 +77,19 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
w := writer{
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
context: ctx,
updates: o.updates,
lastUpdate: lastUpdate,
}

// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
g, ctx := errgroup.WithContext(o.context)
g, gctx := errgroup.WithContext(ctx)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
if err := w.uploadOne(gctx, b); err != nil {
return err
}
}
Expand Down Expand Up @@ -128,15 +128,12 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
}
select {
case blobChan <- l:
case <-ctx.Done():
return ctx.Err()
case <-gctx.Done():
return gctx.Err()
}
}
return nil
})
if err := g.Wait(); err != nil {
return err
}

if l, err := partial.ConfigLayer(img); err != nil {
// We can't read the ConfigLayer, possibly because of streaming layers,
Expand All @@ -151,13 +148,13 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
if err != nil {
return err
}
if err := w.uploadOne(l); err != nil {
if err := w.uploadOne(ctx, l); err != nil {
return err
}
} else {
// We *can* read the ConfigLayer, so upload it concurrently with the layers.
g.Go(func() error {
return w.uploadOne(l)
return w.uploadOne(gctx, l)
})

// Wait for the layers + config.
Expand All @@ -168,7 +165,7 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd

// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitManifest(img, ref)
return w.commitManifest(ctx, img, ref)
}

// writer writes the elements of an image to a remote image reference.
Expand Down Expand Up @@ -428,7 +425,7 @@ var backoff = retry.Backoff{
}

// uploadOne performs a complete upload of a single layer.
func (w *writer) uploadOne(l v1.Layer) error {
func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error {
var from, mount string
if h, err := l.Digest(); err == nil {
// If we know the digest, this isn't a streaming layer. Do an existence
Expand All @@ -455,8 +452,6 @@ func (w *writer) uploadOne(l v1.Layer) error {
}
}

ctx := w.context

tryUpload := func() error {
location, mounted, err := w.initiateUpload(from, mount)
if err != nil {
Expand Down Expand Up @@ -515,7 +510,7 @@ type withLayer interface {
Layer(v1.Hash) (v1.Layer, error)
}

func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, options ...Option) error {
index, err := ii.IndexManifest()
if err != nil {
return err
Expand Down Expand Up @@ -544,15 +539,15 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
if err != nil {
return err
}
if err := w.writeIndex(ref, ii); err != nil {
if err := w.writeIndex(ctx, ref, ii); err != nil {
return err
}
case types.OCIManifestSchema1, types.DockerManifestSchema2:
img, err := ii.Image(desc.Digest)
if err != nil {
return err
}
if err := writeImage(ref, img, o, w.lastUpdate); err != nil {
if err := writeImage(ctx, ref, img, o, w.lastUpdate); err != nil {
return err
}
default:
Expand All @@ -562,7 +557,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
if err != nil {
return err
}
if err := w.uploadOne(layer); err != nil {
if err := w.uploadOne(ctx, layer); err != nil {
return err
}
}
Expand All @@ -571,7 +566,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt

// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitManifest(ii, ref)
return w.commitManifest(ctx, ii, ref)
}

type withMediaType interface {
Expand Down Expand Up @@ -617,7 +612,7 @@ func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
}

// commitManifest does a PUT of the image's manifest.
func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
func (w *writer) commitManifest(ctx context.Context, t Taggable, ref name.Reference) error {
tryUpload := func() error {
raw, desc, err := unpackTaggable(t)
if err != nil {
Expand All @@ -633,7 +628,7 @@ func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
}
req.Header.Set("Content-Type", string(desc.MediaType))

resp, err := w.client.Do(req.WithContext(w.context))
resp, err := w.client.Do(req.WithContext(ctx))
if err != nil {
return err
}
Expand Down Expand Up @@ -708,7 +703,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr e
defer func() { sendError(o.updates, rerr) }()
}

return w.writeIndex(ref, ii, options...)
return w.writeIndex(o.context, ref, ii, options...)
}

// countImage counts the total size of all layers + config blob + manifest for
Expand Down Expand Up @@ -851,7 +846,7 @@ func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr e
}
w.lastUpdate = &v1.Update{Total: size}
}
return w.uploadOne(layer)
return w.uploadOne(o.context, layer)
}

// Tag adds a tag to the given Taggable via PUT /v2/.../manifests/<tag>
Expand Down Expand Up @@ -903,5 +898,5 @@ func Put(ref name.Reference, t Taggable, options ...Option) error {
context: o.context,
}

return w.commitManifest(t, ref)
return w.commitManifest(o.context, t, ref)
}
11 changes: 7 additions & 4 deletions pkg/v1/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ func TestUploadOne(t *testing.T) {
initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo)
streamPath := "/path/to/upload"
commitPath := "/path/to/commit"
ctx := context.Background()

uploaded := false
w, closer, err := setupWriter(expectedRepo, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -705,11 +706,11 @@ func TestUploadOne(t *testing.T) {
Layer: l,
Reference: w.repo.Digest(h.String()),
}
if err := w.uploadOne(ml); err != nil {
if err := w.uploadOne(ctx, ml); err != nil {
t.Errorf("uploadOne() = %v", err)
}
// Hit the existing blob path.
if err := w.uploadOne(l); err != nil {
if err := w.uploadOne(ctx, l); err != nil {
t.Errorf("uploadOne() = %v", err)
}
}
Expand All @@ -719,6 +720,7 @@ func TestUploadOneStreamedLayer(t *testing.T) {
initiatePath := fmt.Sprintf("/v2/%s/blobs/uploads/", expectedRepo)
streamPath := "/path/to/upload"
commitPath := "/path/to/commit"
ctx := context.Background()

w, closer, err := setupWriter(expectedRepo, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
Expand Down Expand Up @@ -754,7 +756,7 @@ func TestUploadOneStreamedLayer(t *testing.T) {
wantDigest := "sha256:3d7c465be28d9e1ed810c42aeb0e747b44441424f566722ba635dc93c947f30e"
wantDiffID := "sha256:27dd1f61b867b6a0f6e9d8a41c43231de52107e53ae424de8f847b821db4b711"
l := stream.NewLayer(newBlob())
if err := w.uploadOne(l); err != nil {
if err := w.uploadOne(ctx, l); err != nil {
t.Fatalf("uploadOne: %v", err)
}

Expand All @@ -777,6 +779,7 @@ func TestUploadOneStreamedLayer(t *testing.T) {

func TestCommitImage(t *testing.T) {
img := setupImage(t)
ctx := context.Background()

expectedRepo := "foo/bar"
expectedPath := fmt.Sprintf("/v2/%s/manifests/latest", expectedRepo)
Expand Down Expand Up @@ -813,7 +816,7 @@ func TestCommitImage(t *testing.T) {
}
defer closer.Close()

if err := w.commitManifest(img, w.repo.Tag("latest")); err != nil {
if err := w.commitManifest(ctx, img, w.repo.Tag("latest")); err != nil {
t.Error("commitManifest() = ", err)
}
}
Expand Down

0 comments on commit c71ca9b

Please sign in to comment.