Skip to content

Commit

Permalink
ckpt
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent committed May 9, 2023
1 parent aeb26ed commit c33435a
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 21 deletions.
74 changes: 74 additions & 0 deletions blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,33 @@ func (r *Reader) WriteTo(w io.Writer) (int64, error) {
return nw, err
}

// downloadAndClose is similar to WriteTo, but ensures it's the only read.
// This pattern is more optimal for some drivers.
func (r *Reader) downloadAndClose(w io.Writer) (err error) {
if r.bytesRead != 0 {

Check warning on line 243 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L242-L243

Added lines #L242 - L243 were not covered by tests
// Shouldn't happen.
return gcerr.Newf(gcerr.Internal, nil, "blob: downloadAndClose isn't the first read")

Check warning on line 245 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L245

Added line #L245 was not covered by tests
}
driverDownloader, ok := r.r.(driver.Downloader)
if ok {
err = driverDownloader.Download(w)
} else {
_, err = r.WriteTo(w)

Check warning on line 251 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L247-L251

Added lines #L247 - L251 were not covered by tests
}
cerr := r.Close()
if err == nil && cerr != nil {
err = cerr

Check warning on line 255 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L253-L255

Added lines #L253 - L255 were not covered by tests
}
return err

Check warning on line 257 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L257

Added line #L257 was not covered by tests
}

// readFromWriteTo is a helper for ReadFrom and WriteTo.
// It reads data from r and writes to w, until EOF or a read/write error.
// It returns the number of bytes read from r and the number of bytes
// written to w.
func readFromWriteTo(r io.Reader, w io.Writer) (int64, int64, error) {
// Note: can't use io.Copy because it will try to use r.WriteTo
// or w.WriteTo, which is recursive in this context.
buf := make([]byte, 1024)
var totalRead, totalWritten int64
for {
Expand Down Expand Up @@ -458,6 +480,26 @@ func (w *Writer) ReadFrom(r io.Reader) (int64, error) {
return nr, err
}

// uploadAndClose is similar to ReadFrom, but ensures it's the only write.
// This pattern is more optimal for some drivers.
func (w *Writer) uploadAndClose(r io.Reader) (err error) {
if w.bytesWritten != 0 {

Check warning on line 486 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L485-L486

Added lines #L485 - L486 were not covered by tests
// Shouldn't happen.
return gcerr.Newf(gcerr.Internal, nil, "blob: uploadAndClose must be the first write")

Check warning on line 488 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L488

Added line #L488 was not covered by tests
}
driverUploader, ok := w.w.(driver.Uploader)
if ok {
err = driverUploader.Upload(r)
} else {
_, err = w.ReadFrom(r)

Check warning on line 494 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L490-L494

Added lines #L490 - L494 were not covered by tests
}
cerr := w.Close()
if err == nil && cerr != nil {
err = cerr

Check warning on line 498 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L496-L498

Added lines #L496 - L498 were not covered by tests
}
return err

Check warning on line 500 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L500

Added line #L500 was not covered by tests
}

// ListOptions sets options for listing blobs via Bucket.List.
type ListOptions struct {
// Prefix indicates that only blobs with a key starting with this prefix
Expand Down Expand Up @@ -644,6 +686,8 @@ func (b *Bucket) ErrorAs(err error, i interface{}) bool {

// ReadAll is a shortcut for creating a Reader via NewReader with nil
// ReaderOptions, and reading the entire blob.
//
// Using Download may be more efficient.
func (b *Bucket) ReadAll(ctx context.Context, key string) (_ []byte, err error) {
b.mu.RLock()
defer b.mu.RUnlock()
Expand All @@ -658,6 +702,20 @@ func (b *Bucket) ReadAll(ctx context.Context, key string) (_ []byte, err error)
return ioutil.ReadAll(r)
}

// Download writes the content of a blob into an io.Writer w.
func (b *Bucket) Download(ctx context.Context, key string, w io.Writer, opts *ReaderOptions) error {
b.mu.RLock()
defer b.mu.RUnlock()
if b.closed {
return errClosed

Check warning on line 710 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L706-L710

Added lines #L706 - L710 were not covered by tests
}
r, err := b.NewReader(ctx, key, opts)
if err != nil {
return err

Check warning on line 714 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L712-L714

Added lines #L712 - L714 were not covered by tests
}
return r.downloadAndClose(w)

Check warning on line 716 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L716

Added line #L716 was not covered by tests
}

// List returns a ListIterator that can be used to iterate over blobs in a
// bucket, in lexicographical order of UTF-8 encoded keys. The underlying
// implementation fetches results in pages.
Expand Down Expand Up @@ -934,6 +992,8 @@ func (b *Bucket) newRangeReader(ctx context.Context, key string, offset, length
//
// If opts.ContentMD5 is not set, WriteAll will compute the MD5 of p and use it
// as the ContentMD5 option for the Writer it creates.
//
// Using Upload may be more efficient.
func (b *Bucket) WriteAll(ctx context.Context, key string, p []byte, opts *WriterOptions) (err error) {
realOpts := new(WriterOptions)
if opts != nil {
Expand All @@ -954,6 +1014,20 @@ func (b *Bucket) WriteAll(ctx context.Context, key string, p []byte, opts *Write
return w.Close()
}

// Upload reads from an io.Reader r and writes into a blob.
//
// opts.ContentType is required.
func (b *Bucket) Upload(ctx context.Context, key string, r io.Reader, opts *WriterOptions) error {
if opts == nil || opts.ContentType == "" {
return gcerr.Newf(gcerr.InvalidArgument, nil, "blob: Upload requires WriterOptions.ContentType")

Check warning on line 1022 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L1020-L1022

Added lines #L1020 - L1022 were not covered by tests
}
w, err := b.NewWriter(ctx, key, opts)
if err != nil {
return err

Check warning on line 1026 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L1024-L1026

Added lines #L1024 - L1026 were not covered by tests
}
return w.uploadAndClose(r)

Check warning on line 1028 in blob/blob.go

View check run for this annotation

Codecov / codecov/patch

blob/blob.go#L1028

Added line #L1028 was not covered by tests
}

// NewWriter returns a Writer that writes to the blob stored at key.
// A nil WriterOptions is treated the same as the zero value.
//
Expand Down
22 changes: 22 additions & 0 deletions blob/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,24 @@ type Reader interface {
As(interface{}) bool
}

// Downloader has an optional extra method for readers.
// It is similar to io.WriteTo, but without the count of bytes returned.
type Downloader interface {
// Download is similar to io.WriteTo, but without the count of bytes returned.
Download(w io.Writer) error
}

// Writer writes an object to the blob.
type Writer interface {
io.WriteCloser
}

// Uploader has an optional extra method for writers.
type Uploader interface {
// Upload is similar to io.ReadFrom, but without the count of bytes returned.
Upload(r io.Reader) error
}

// WriterOptions controls behaviors of Writer.
type WriterOptions struct {
// BufferSize changes the default size in byte of the maximum part Writer can
Expand Down Expand Up @@ -263,6 +276,11 @@ type Bucket interface {
// exist, NewRangeReader must return an error for which ErrorCode returns
// gcerrors.NotFound.
// opts is guaranteed to be non-nil.
//
// The returned Reader *may* also implement Downloader if the underlying
// implementation can take advantage of that. The Download call is guaranteed
// to be the only call to the Reader. For such readers, offset will always
// be 0 and length will always be -1.
NewRangeReader(ctx context.Context, key string, offset, length int64, opts *ReaderOptions) (Reader, error)

// NewTypedWriter returns Writer that writes to an object associated with key.
Expand All @@ -279,6 +297,10 @@ type Bucket interface {
//
// Implementations should abort an ongoing write if ctx is later canceled,
// and do any necessary cleanup in Close. Close should then return ctx.Err().
//
// The returned Writer *may* also implement Uploader if the underlying
// implementation can take advantage of that. The Upload call is guaranteed
// to be the only non-Close call to the Writer..
NewTypedWriter(ctx context.Context, key, contentType string, opts *WriterOptions) (Writer, error)

// Copy copies the object associated with srcKey to dstKey.
Expand Down
8 changes: 8 additions & 0 deletions blob/fileblob/fileblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,14 @@ func (r *reader) Read(p []byte) (int, error) {
return r.r.Read(p)
}

func (r *reader) Download(w io.Writer) error {

Check warning on line 625 in blob/fileblob/fileblob.go

View check run for this annotation

Codecov / codecov/patch

blob/fileblob/fileblob.go#L625

Added line #L625 was not covered by tests
// This should always work because r.r was created from a File.
// It's only not a WriterTo when we wrap it with a LimitReader,
// which is guaranteed not to happen by the driver interface.
_, err := r.r.(io.WriterTo).WriteTo(w)
return err

Check warning on line 630 in blob/fileblob/fileblob.go

View check run for this annotation

Codecov / codecov/patch

blob/fileblob/fileblob.go#L629-L630

Added lines #L629 - L630 were not covered by tests
}

func (r *reader) Close() error {
if r.c == nil {
return nil
Expand Down
8 changes: 8 additions & 0 deletions blob/memblob/memblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ func (r *reader) Read(p []byte) (int, error) {
return r.r.Read(p)
}

func (r *reader) WriteTo(w io.Writer) error {

Check warning on line 259 in blob/memblob/memblob.go

View check run for this annotation

Codecov / codecov/patch

blob/memblob/memblob.go#L259

Added line #L259 was not covered by tests
// This should always work because r.r was created from a bytes.Reader.
// It's only not a WriterTo when we wrap it with a LimitReader,
// which is guaranteed not to happen by the driver interface.
_, err := r.r.(io.WriterTo).WriteTo(w)
return err

Check warning on line 264 in blob/memblob/memblob.go

View check run for this annotation

Codecov / codecov/patch

blob/memblob/memblob.go#L263-L264

Added lines #L263 - L264 were not covered by tests
}

func (r *reader) Close() error {
return nil
}
Expand Down
61 changes: 40 additions & 21 deletions blob/s3blob/s3blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ func (r *reader) Attributes() *driver.ReaderAttributes {

// writer writes an S3 object, it implements io.WriteCloser.
type writer struct {
w *io.PipeWriter // created when the first byte is written
pw *io.PipeWriter // created when the first byte is written
pr *io.PipeReader
upload bool

ctx context.Context
useV2 bool
Expand All @@ -287,12 +289,11 @@ func (w *writer) Write(p []byte) (int, error) {
if len(p) == 0 {
return 0, nil
}
if w.w == nil {
if w.pw == nil {
// We'll write into pw and use pr as an io.Reader for the
// Upload call to S3.
pr, pw := io.Pipe()
w.w = pw
if err := w.open(pr); err != nil {
w.pr, w.pw = io.Pipe()
if err := w.open(w.pr, true); err != nil {
return 0, err
}
}
Expand All @@ -301,33 +302,46 @@ func (w *writer) Write(p []byte) (int, error) {
return 0, w.err
default:
}
return w.w.Write(p)
return w.pw.Write(p)
}

// pr may be nil if we're Closing and no data was written.
func (w *writer) open(pr *io.PipeReader) error {
// ReadFrom reads from r. Per the driver, it is guaranteed to be the only
// write call for this writer.
func (w *writer) Upload(r io.Reader) error {
w.upload = true
if err := w.open(r, false); err != nil {
return err

Check warning on line 313 in blob/s3blob/s3blob.go

View check run for this annotation

Codecov / codecov/patch

blob/s3blob/s3blob.go#L310-L313

Added lines #L310 - L313 were not covered by tests
}
select {
case <-w.donec:
return w.err
default:

Check warning on line 318 in blob/s3blob/s3blob.go

View check run for this annotation

Codecov / codecov/patch

blob/s3blob/s3blob.go#L315-L318

Added lines #L315 - L318 were not covered by tests
}
return nil

Check warning on line 320 in blob/s3blob/s3blob.go

View check run for this annotation

Codecov / codecov/patch

blob/s3blob/s3blob.go#L320

Added line #L320 was not covered by tests
}

// pr may be nil if we're Closing and no data was written.
func (w *writer) open(r io.Reader, closeReaderOnError bool) error {
go func() {
defer close(w.donec)

body := io.Reader(pr)
if pr == nil {
if r == nil {
// AWS doesn't like a nil Body.
body = http.NoBody
r = http.NoBody
}
var err error
if w.useV2 {
w.reqV2.Body = body
w.reqV2.Body = r
_, err = w.uploaderV2.Upload(w.ctx, w.reqV2)
} else {
w.req.Body = body
w.req.Body = r
_, err = w.uploader.UploadWithContext(w.ctx, w.req)
}
if err != nil {
w.err = err
if pr != nil {
pr.CloseWithError(err)
if closeReaderOnError {
w.pr.CloseWithError(err)
}
w.err = err
return
}
}()
Expand All @@ -338,11 +352,16 @@ func (w *writer) open(pr *io.PipeReader) error {
// will be returned. If a writer is closed before any Write is called, Close
// will create an empty file at the given key.
func (w *writer) Close() error {
if w.w == nil {
// We never got any bytes written. We'll write an http.NoBody.
w.open(nil)
} else if err := w.w.Close(); err != nil {
return err
if !w.upload {
if w.pr != nil {
// defer w.pr.Close()
}
if w.pw == nil {
// We never got any bytes written. We'll write an http.NoBody.
w.open(nil, false)
} else if err := w.pw.Close(); err != nil {
return err

Check warning on line 363 in blob/s3blob/s3blob.go

View check run for this annotation

Codecov / codecov/patch

blob/s3blob/s3blob.go#L363

Added line #L363 was not covered by tests
}
}
<-w.donec
return w.err
Expand Down

0 comments on commit c33435a

Please sign in to comment.