Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blob: Add Upload and Download methods that may be more efficient for some drivers #3248

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions blob/azureblob/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,9 +815,17 @@
client *blockblob.Client
uploadOpts *azblob.UploadStreamOptions

w *io.PipeWriter
donec chan struct{}
err error
// Ends of an io.Pipe, created when the first byte is written.
pw *io.PipeWriter
pr *io.PipeReader

// Alternatively, upload is set to true when Upload was
// used to upload data.
upload bool

donec chan struct{} // closed when done writing
// The following fields will be written before donec closes:
err error
}

// escapeKey does all required escaping for UTF-8 strings to work with Azure.
Expand Down Expand Up @@ -916,50 +924,64 @@
}, nil
}

// Write appends p to w. User must call Close to close the w after done writing.
// Write appends p to w.pw. User must call Close to close the w after done writing.
func (w *writer) Write(p []byte) (int, error) {
// Avoid opening the pipe for a zero-length write;
// the concrete can do these for empty blobs.
if len(p) == 0 {
return 0, nil
}
if w.w == nil {
pr, pw := io.Pipe()
w.w = pw
if err := w.open(pr); err != nil {
return 0, err
}
if w.pw == nil {
// We'll write into pw and use pr as an io.Reader for the
// Upload call to Azure.
w.pr, w.pw = io.Pipe()
w.open(w.pr, true)
}
return w.w.Write(p)
return w.pw.Write(p)
}

// Upload reads from r. Per the driver, it is guaranteed to be the only
// write call for this writer.
func (w *writer) Upload(r io.Reader) error {
w.upload = true
w.open(r, false)
return nil
}

func (w *writer) open(pr *io.PipeReader) error {
// r may be nil if we're Closing and no data was written.
// If closePipeOnError is true, w.pr will be closed if there's an
// error uploading to Azure.
func (w *writer) open(r io.Reader, closePipeOnError bool) {
go func() {
defer close(w.donec)

var body io.Reader
if pr == nil {
body = http.NoBody
} else {
body = pr
if r == nil {
r = http.NoBody
}
_, w.err = w.client.UploadStream(w.ctx, body, w.uploadOpts)
_, w.err = w.client.UploadStream(w.ctx, r, w.uploadOpts)
if w.err != nil {
if pr != nil {
pr.CloseWithError(w.err)
if closePipeOnError {
w.pr.CloseWithError(w.err)
w.pr = nil
}
return
}
}()
return nil
}

// Close completes the writer and closes it. Any error occurring during write will
// be returned. If a writer is closed before any Write is called, Close will
// create an empty file at the given key.
func (w *writer) Close() error {
if w.w == nil {
w.open(nil)
} else if err := w.w.Close(); err != nil {
return err
if !w.upload {
if w.pr != nil {
defer w.pr.Close()
}
if w.pw == nil {
// We never got any bytes written. We'll write an http.NoBody.
w.open(nil, false)
} else if err := w.pw.Close(); err != nil {
return err

Check warning on line 983 in blob/azureblob/azureblob.go

View check run for this annotation

Codecov / codecov/patch

blob/azureblob/azureblob.go#L983

Added line #L983 was not covered by tests
}
}
<-w.donec
return w.err
Expand Down
273 changes: 273 additions & 0 deletions blob/azureblob/testdata/TestConformance/TestUploadDownload.replay

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading