Skip to content

Commit

Permalink
Update copy to discard over truncate
Browse files Browse the repository at this point in the history
Prevents the copy method from calling discard on the
writer when the reader is not seekable. Instead,
the copy method will discard up to the offset.
Truncate is a more expensive operation since any
bytes that are truncated already have their hash calculated
and are stored on disk in the backend. Re-writing bytes
which were truncated requires transfering the data over
GRPC again and re-computing the hash up to the point of
truncation.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
  • Loading branch information
dmcgowan committed Jan 29, 2018
1 parent d7a0e70 commit 45e7aa5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
33 changes: 17 additions & 16 deletions content/helpers.go
Expand Up @@ -3,6 +3,7 @@ package content
import (
"context"
"io"
"io/ioutil"
"sync"

"github.com/containerd/containerd/errdefs"
Expand Down Expand Up @@ -76,14 +77,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size)
if err != nil {
if !isUnseekable(err) {
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}

// reader is unseekable, try to move the writer back to the start.
if err := cw.Truncate(0); err != nil {
return errors.Wrapf(err, "content writer truncate failed")
}
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
}

Expand All @@ -103,14 +97,9 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
return nil
}

var errUnseekable = errors.New("seek not supported")

func isUnseekable(err error) bool {
return errors.Cause(err) == errUnseekable
}

// seekReader attempts to seek the reader to the given offset, either by
// resolving `io.Seeker` or by detecting `io.ReaderAt`.
// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
// up to the given offset.
func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
// attempt to resolve r as a seeker and setup the offset.
seeker, ok := r.(io.Seeker)
Expand All @@ -134,5 +123,17 @@ func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
return sr, nil
}

return r, errors.Wrapf(errUnseekable, "seek to offset %v failed", offset)
// well then, let's just discard up to the offset
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)

n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf)
if err != nil {
return nil, errors.Wrap(err, "failed to discard to offset")
}
if n != offset {
return nil, errors.Errorf("unable to discard to offset")
}

return r, nil
}
6 changes: 3 additions & 3 deletions content/helpers_test.go
Expand Up @@ -42,9 +42,9 @@ func TestCopy(t *testing.T) {
},
{
name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foo"), size: 3},
writer: fakeWriter{status: Status{Offset: 8}},
expected: "foo",
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{status: Status{Offset: 3}},
expected: "bar",
},
{
name: "commit already exists",
Expand Down
1 change: 0 additions & 1 deletion content/testsuite/testsuite.go
Expand Up @@ -403,7 +403,6 @@ func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64,
if err := rf(ctx, w, b, limit, size, d); err != nil {
t.Fatalf("Resume failed: %+v", err)
}

postCommit := time.Now()

if err := w.Close(); err != nil {
Expand Down

0 comments on commit 45e7aa5

Please sign in to comment.