Skip to content

Commit

Permalink
Set GetBody on blob uploads (#1391)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonjohnsonjr committed Jun 18, 2022
1 parent 9006ebf commit 2a21d4f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
25 changes: 19 additions & 6 deletions pkg/v1/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,16 +337,29 @@ func (r *progressReader) Close() error { return r.rc.Close() }
// streamBlob streams the contents of the blob to the specified location.
// On failure, this will return an error. On success, this will return the location
// header indicating how to commit the streamed blob.
func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocation string) (commitLocation string, rerr error) {
func (w *writer) streamBlob(ctx context.Context, layer v1.Layer, streamLocation string) (commitLocation string, rerr error) {
reset := func() {}
defer func() {
if rerr != nil {
reset()
}
}()
blob, err := layer.Compressed()
if err != nil {
return "", err
}

getBody := layer.Compressed
if w.updates != nil {
var count int64
blob = &progressReader{rc: blob, updates: w.updates, lastUpdate: w.lastUpdate, count: &count}
getBody = func() (io.ReadCloser, error) {
blob, err := layer.Compressed()
if err != nil {
return nil, err
}
return &progressReader{rc: blob, updates: w.updates, lastUpdate: w.lastUpdate, count: &count}, nil
}
reset = func() {
atomic.AddInt64(&w.lastUpdate.Complete, -count)
w.updates <- *w.lastUpdate
Expand All @@ -357,6 +370,10 @@ func (w *writer) streamBlob(ctx context.Context, blob io.ReadCloser, streamLocat
if err != nil {
return "", err
}
if _, ok := layer.(*stream.Layer); !ok {
// We can't retry streaming layers.
req.GetBody = getBody
}
req.Header.Set("Content-Type", "application/octet-stream")

resp, err := w.client.Do(req.WithContext(ctx))
Expand Down Expand Up @@ -467,11 +484,7 @@ func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error {
ctx = redact.NewContext(ctx, "omitting binary blobs from logs")
}

blob, err := l.Compressed()
if err != nil {
return err
}
location, err = w.streamBlob(ctx, blob, location)
location, err = w.streamBlob(ctx, l, location)
if err != nil {
return err
}
Expand Down
12 changes: 2 additions & 10 deletions pkg/v1/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,8 @@ func TestStreamBlob(t *testing.T) {
if err != nil {
t.Fatalf("ConfigLayer: %v", err)
}
blob, err := l.Compressed()
if err != nil {
t.Fatalf("layer.Compressed: %v", err)
}

commitLocation, err := w.streamBlob(context.Background(), blob, streamLocation.String())
commitLocation, err := w.streamBlob(context.Background(), l, streamLocation.String())
if err != nil {
t.Errorf("streamBlob() = %v", err)
}
Expand Down Expand Up @@ -638,12 +634,8 @@ func TestStreamLayer(t *testing.T) {

streamLocation := w.url(expectedPath)
sl := stream.NewLayer(newBlob())
blob, err := sl.Compressed()
if err != nil {
t.Fatalf("layer.Compressed: %v", err)
}

commitLocation, err := w.streamBlob(context.Background(), blob, streamLocation.String())
commitLocation, err := w.streamBlob(context.Background(), sl, streamLocation.String())
if err != nil {
t.Errorf("streamBlob: %v", err)
}
Expand Down

0 comments on commit 2a21d4f

Please sign in to comment.