Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): prototype #10045

Merged
merged 15 commits into from
Jun 5, 2024
83 changes: 83 additions & 0 deletions storage/transfermanager/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package transfermanager provides an easy way to parallelize downloads in Google
Cloud Storage.

More information about Google Cloud Storage is available at
https://cloud.google.com/storage/docs.

See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.

NOTE: This package is in alpha. It is not stable, and is likely to change.

# Example usage

// Pass in any client opts or set retry policy here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add something like https://github.com/googleapis/google-cloud-go/blob/main/storage/example_test.go instead perhaps so it can be in compiled code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

client, err := storage.NewClient(ctx) // can also use NewGRPCClient
if err != nil {
// handle error
}

// Create Downloader with desired options, including number of workers,
// part size, per operation timeout, etc.
d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16))
if err != nil {
// handle error
}

// Create local file writer for output.
f, err := os.Create("/path/to/localfile")
if err != nil {
// handle error
}

// Create download input
in := &transfermanager.DownloadObjectInput{
Bucket: "mybucket",
Object: "myblob",
Destination: f,
// Optionally specify params to apply to download.
EncryptionKey: []byte("mykey"),
}

// Can set timeout on this download using context. Note that this download
// may not start immediately if all workers are busy, so this may time out
// before the download even starts. To set a timeout that starts with the
// download, use transfermanager.WithPerOpTimeout(time.Duration).
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

// Add to Downloader.
d.DownloadObject(ctx, in)

// Repeat if desired.

// Wait for all downloads to complete.
d.WaitAndClose()

// Iterate through completed downloads and process results. This can
// also happen async in a go routine as the downloads run.
results := d.Results()
for _, out := range results {
if out.Err != nil {
log.Printf("download of %v failed with error %v", out.Name, out.Err)
} else {
log.Printf("download of %v succeeded", out.Object)
}
}
*/
package transfermanager // import "cloud.google.com/go/storage/transfermanager"
296 changes: 296 additions & 0 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transfermanager

import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"cloud.google.com/go/storage"
)

// Downloader manages a set of parallelized downloads.
type Downloader struct {
client *storage.Client
config *transferManagerConfig
inputs []DownloadObjectInput
results []DownloadOutput
errors []error
inputsMu *sync.Mutex
resultsMu *sync.Mutex
errorsMu *sync.Mutex
work chan *DownloadObjectInput // Piece of work to be executed.
done chan bool // Indicates to finish up work; expecting no more inputs.
workers *sync.WaitGroup // Keeps track of the workers that are currently running.
}

// DownloadObject queues the download of a single object. This will initiate the
// download but is non-blocking; call Downloader.Results to process the result.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) {
input.ctx = ctx
d.addInput(input)
}

// DownloadObjectWithCallback queues the download of a single object. This will
// initiate the download but is non-blocking; use the callback to process the
// result.
// The results of downloads initiated with this method will not be provided in
// the results given in Downloader.Results.
func (d *Downloader) DownloadObjectWithCallback(ctx context.Context, input *DownloadObjectInput, callback func(*DownloadOutput)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior here if someone calls this without using the Callback option when creating a downloader? Or vice versa with DownloadObject? There are some corner cases to think through here.

I'm leaning towards just having one entry point function for DownloadObject and moving the callback to a field on the DownloadObjectInput -- what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to remove the callback Option - right now it's completely unused. We could enforce it's usage (erroring in the output or here if the option does not match the call); as of now callbacks and no callbacks could be used in the same downloader (and it should work - though it's untested).

I lean towards the two separate entry points. I think the distinction in behaviour is big enough that we should make a similar distinction in our surface - I feel like having it just as a field isn't as nice an experience for people using it and encourages more mixing of using both callback and no callbacks. If we do add it as a field, I'd suggest then also adding the output to the results returned by Results(), in addition to calling the callback if available. That way the callback is more of an optional field that would get called and less of something that will cause different behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we'll enforce usage of the option and set callback as a field.

input.ctx = ctx
input.callback = &callback
d.addInput(input)
}

// WaitAndClose waits for all outstanding downloads to complete. The Downloader
// must not be used for any more downloads after this has been called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearer to say that no new downloads can be added, perhaps? Also are we enforcing this somehow?

I think we should add unit tests for this behavior (and for stuff like the callback thing I mentioned).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't enforcing it, but it should panic when trying to send on the closed channels. I can change the wording (and maybe also mention that it will panic) and add a test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to error instead of panicking.

// WaitAndClose returns an error wrapping all errors that were encountered by
// the Downloader when downloading objects. These errors are also returned in
// the DownloadOutput for the failing download.
func (d *Downloader) WaitAndClose() error {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
d.done <- true
d.workers.Wait()

if len(d.errors) > 0 {
err := errors.Join(d.errors...)
return fmt.Errorf("transfermanager: at least one error encountered downloading objects:\n%w", err)
}
return nil
}

// Results returns all the results of the downloads completed since the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could probably enforce that this can only be called after the full job is complete (given that we are keeping WaitAndClose as a single func). Or we could even just return the slice from WaitAndClose. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of just returning it in WaitAndClose, but hesitated given the async case - seems cleaner in that case to not return the results in WaitAndClose (since presumably it'd be empty).

We could enforce it only be called until the full job is complete by returning an error and/or an empty array if called before WaitAndClose... but then it may just be cleaner to return it in WaitAndClose. The way it is now should not cause any error if users do call it several times or before WaitAndClose, but could be confusing for some users. We could also always return the whole slice, but that has the same problems.

I think that, if we weigh both async and sync equally (and we don't want users grabbing results part way through) returning results in WaitAndClose is better, especially if we can assume people using async would be more aware of what they are doing and reading the docs, that would mention that results are empty for their case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I'll change this to return results directly in WaitAndClose

// time it was called. Call WaitAndClose before calling Results to wait for all
// downloads to complete. The results are not guaranteed to be in any order.
// Results will not return results for downloads initiated with a callback.
func (d *Downloader) Results() []DownloadOutput {
d.resultsMu.Lock()
r := make([]DownloadOutput, len(d.results))
copy(r, d.results)
d.results = []DownloadOutput{}
d.resultsMu.Unlock()

return r
}

// sendInputsToWorkChan listens continuously to the inputs slice until d.done.
// It will send all items in inputs to the d.work chan.
// Once it receives from d.done, it drains the remaining items in the inputs
// (sending them to d.work) and then closes the d.work chan.
func (d *Downloader) sendInputsToWorkChan() {
for {
select {
case <-d.done:
d.drainInput()
close(d.work)
return
default:
d.drainInput()
}
}
}

// drainInput consumes everything in the inputs slice and sends it to the work chan.
// It will block if there are not enough workers to consume every input, until all
// inputs are received on the work chan(ie. they're dispatched to an available worker).
func (d *Downloader) drainInput() {
for {
d.inputsMu.Lock()
if len(d.inputs) < 1 {
d.inputsMu.Unlock()
return
}
input := d.inputs[0]
d.inputs = d.inputs[1:]
d.inputsMu.Unlock()
d.work <- &input
}
}

func (d *Downloader) addInput(input *DownloadObjectInput) {
d.inputsMu.Lock()
d.inputs = append(d.inputs, *input)
d.inputsMu.Unlock()
}

func (d *Downloader) addResult(result *DownloadOutput) {
d.resultsMu.Lock()
d.results = append(d.results, *result)
d.resultsMu.Unlock()
}

func (d *Downloader) error(err error) {
d.errorsMu.Lock()
d.errors = append(d.errors, err)
d.errorsMu.Unlock()
}

// downloadWorker continuously processes downloads until the work channel is closed.
func (d *Downloader) downloadWorker() {
for {
input, ok := <-d.work
if !ok {
break // no more work; exit
}

// TODO: break down the input into smaller pieces if necessary; maybe as follows:
// Only request partSize data to begin with. If no error and we haven't finished
// reading the object, enqueue the remaining pieces of work and mark in the
// out var the amount of shards to wait for.
out := input.downloadShard(d.client, d.config.perOperationTimeout)

// Keep track of any error that occurred.
if out.Err != nil {
d.error(out.Err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wrap this error to include the bucket/object name? I don't see that added in downloadShard either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Technically they would have the information since it's in the output, but I see no harm including it here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good; yeah if it's in the top-level output then you can see which individual result to inspect as well.

}

// Either execute the callback, or append to results.
if input.callback != nil {
(*input.callback)(out)
} else {
d.addResult(out)
}
}
d.workers.Done()
}

// NewDownloader creates a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error) {
d := &Downloader{
client: c,
config: initTransferManagerConfig(opts...),
inputs: []DownloadObjectInput{},
results: []DownloadOutput{},
errors: []error{},
inputsMu: &sync.Mutex{},
resultsMu: &sync.Mutex{},
errorsMu: &sync.Mutex{},
work: make(chan *DownloadObjectInput),
done: make(chan bool),
workers: &sync.WaitGroup{},
}

// Start a listener to send work through.
go d.sendInputsToWorkChan()

// Start workers.
for i := 0; i < d.config.numWorkers; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah something we can test out later.

d.workers.Add(1)
go d.downloadWorker()
}

return d, nil
}

// DownloadRange specifies the object range.
type DownloadRange struct {
// Offset is the starting offset (inclusive) from with the object is read.
// If offset is negative, the object is read abs(offset) bytes from the end,
// and length must also be negative to indicate all remaining bytes will be read.
Offset int64
// Length is the number of bytes to read.
// If length is negative or larger than the object size, the object is read
// until the end.
Length int64
}

// DownloadObjectInput is the input for a single object to download.
type DownloadObjectInput struct {
// Required fields
Bucket string
Object string
Destination io.WriterAt

// Optional fields
Generation *int64
Conditions *storage.Conditions
EncryptionKey []byte
Range *DownloadRange // if specified, reads only a range

ctx context.Context
callback *func(*DownloadOutput)
}

// downloadShard will read a specific object into in.Destination.
// If timeout is less than 0, no timeout is set.
// TODO: download a single shard instead of the entire object.
func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration) (out *DownloadOutput) {
out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object}

// Set timeout.
ctx := in.ctx
if timeout > 0 {
c, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = c
}

// Set options on the object.
o := client.Bucket(in.Bucket).Object(in.Object)

if in.Conditions != nil {
o = o.If(*in.Conditions)
}
if in.Generation != nil {
o = o.Generation(*in.Generation)
}
if len(in.EncryptionKey) > 0 {
o = o.Key(in.EncryptionKey)
}

var offset, length int64 = 0, -1 // get the entire object by default

if in.Range != nil {
offset, length = in.Range.Offset, in.Range.Length
}

// Read.
r, err := o.NewRangeReader(ctx, offset, length)
if err != nil {
out.Err = err
return
}

// TODO: write at a specific offset.
off := io.NewOffsetWriter(in.Destination, 0)
_, err = io.Copy(off, r)
if err != nil {
out.Err = err
r.Close()
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to look into whether we should be attempting to close the writer if possible -- seems annoying for end users to have to clean up. Though, I guess we don't guarantee that a close method exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the writer provided through in.Destination? The io.WriterAt interface and for that matter io. OffsetWriter do not have a Close method, as you say. In any case, I feel like it'd be bad practice to close a writer we don't own?


if err = r.Close(); err != nil {
out.Err = err
return
}

out.Attrs = &r.Attrs
return
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
type DownloadOutput struct {
Bucket string
Object string
Err error // error occurring during download
Attrs *storage.ReaderObjectAttrs // attributes of downloaded object, if successful
}
Loading