Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): prototype #10045

Merged
merged 15 commits into from
Jun 5, 2024
Merged

Conversation

BrennaEpp
Copy link
Contributor

No description provided.

@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Apr 25, 2024
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few initial comments, overall looks like a good start

type Downloader struct {
client *storage.Client
config *transferManagerConfig
work chan *DownloadObjectInput // Piece of work to be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this should be send-only and output should be receive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are sending and receiving from both channels in different places in the downloader. Unidirectional channels could be used in subcomponents or if we were providing the channel to the user, but I don't see how we could implement this with unidirectional channels - if we only received from output, who would send us the output (and vice-versa for work)?

storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
return crc32cHash.Sum32(), w.Close()
}

type testWriter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't have to be in this PR, but we should add a version of this (well, some kind of DownloaderBuffer that implements WriterAt) to the library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd have to look more into that. This is a very barebones implementation that is likely not at all efficient (and doesn't really work as a WriterAt yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's fine for now.

storage/transfermanager/option.go Outdated Show resolved Hide resolved
}

// Start workers in background.
for i := 0; i < d.config.numWorkers; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah something we can test out later.

@BrennaEpp BrennaEpp marked this pull request as ready for review May 13, 2024 23:59
@BrennaEpp BrennaEpp requested review from a team as code owners May 14, 2024 00:01
@BrennaEpp BrennaEpp changed the title draft(storage/transfermanager): prototype feat(storage/transfermanager): prototype May 15, 2024
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good overall. We can chat tomorrow about a few of the remaining sync/async API changes perhaps?


# Example usage

// Pass in any client opts or set retry policy here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add something like https://github.com/googleapis/google-cloud-go/blob/main/storage/example_test.go instead perhaps so it can be in compiled code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// result.
// The results of downloads initiated with this method will not be provided in
// the results given in Downloader.Results.
func (d *Downloader) DownloadObjectWithCallback(ctx context.Context, input *DownloadObjectInput, callback func(*DownloadOutput)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected behavior here if someone calls this without using the Callback option when creating a downloader? Or vice versa with DownloadObject? There are some corner cases to think through here.

I'm leaning towards just having one entry point function for DownloadObject and moving the callback to a field on the DownloadObjectInput -- what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to remove the callback Option - right now it's completely unused. We could enforce it's usage (erroring in the output or here if the option does not match the call); as of now callbacks and no callbacks could be used in the same downloader (and it should work - though it's untested).

I lean towards the two separate entry points. I think the distinction in behaviour is big enough that we should make a similar distinction in our surface - I feel like having it just as a field isn't as nice an experience for people using it and encourages more mixing of using both callback and no callbacks. If we do add it as a field, I'd suggest then also adding the output to the results returned by Results(), in addition to calling the callback if available. That way the callback is more of an optional field that would get called and less of something that will cause different behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, we'll enforce usage of the option and set callback as a field.

// Waits for all outstanding downloads to complete. The Downloader must not be
// used to download more objects or directories after this has been called.
// WaitAndClose waits for all outstanding downloads to complete. The Downloader
// must not be used for any more downloads after this has been called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearer to say that no new downloads can be added, perhaps? Also are we enforcing this somehow?

I think we should add unit tests for this behavior (and for stuff like the callback thing I mentioned).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't enforcing it, but it should panic when trying to send on the closed channels. I can change the wording (and maybe also mention that it will panic) and add a test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to error instead of panicking.

func (d *Downloader) Results() *DownloadOutputIterator {
return &DownloadOutputIterator{
output: d.output,
// Results returns all the results of the downloads completed since the last
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could probably enforce that this can only be called after the full job is complete (given that we are keeping WaitAndClose as a single func). Or we could even just return the slice from WaitAndClose. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of just returning it in WaitAndClose, but hesitated given the async case - seems cleaner in that case to not return the results in WaitAndClose (since presumably it'd be empty).

We could enforce it only be called until the full job is complete by returning an error and/or an empty array if called before WaitAndClose... but then it may just be cleaner to return it in WaitAndClose. The way it is now should not cause any error if users do call it several times or before WaitAndClose, but could be confusing for some users. We could also always return the whole slice, but that has the same problems.

I think that, if we weigh both async and sync equally (and we don't want users grabbing results part way through) returning results in WaitAndClose is better, especially if we can assume people using async would be more aware of what they are doing and reading the docs, that would mention that results are empty for their case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I'll change this to return results directly in WaitAndClose


// Keep track of any error that occurred.
if out.Err != nil {
d.error(out.Err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wrap this error to include the bucket/object name? I don't see that added in downloadShard either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Technically they would have the information since it's in the output, but I see no harm including it here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good; yeah if it's in the top-level output then you can see which individual result to inspect as well.

storage/transfermanager/integration_test.go Show resolved Hide resolved
storage/transfermanager/option.go Outdated Show resolved Hide resolved
storage/transfermanager/option.go Show resolved Hide resolved
// errorIs is equivalent to errors.Is, except that it additionally will return
// true if err and targetErr are googleapi.Errors with identical error codes,
// or if both errors have the same gRPC status code.
func errorIs(err error, targetErr error) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever idea, we should probably do something like this for other tests which force errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean in the storage package? Yeah, I think we mostly do this type of check in the test functions themselves or don't check the exact returned err.

If we make a shared testutil we can add this to it!

@BrennaEpp BrennaEpp requested a review from tritone May 22, 2024 18:26
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few docs notes, otherwise I think we should be able to submit this soon

storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
storage/transfermanager/example_test.go Outdated Show resolved Hide resolved
d.addInput(input)
}
// download but is non-blocking; call Downloader.Results or use the callback to
// process the result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explicitly note that this is intended to be thread-safe (can share the Downloader across goroutines). And add the note about timeouts/timing that you have in the example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Note I also made WaitAndClose callable more than once (just returns the same instead of panicking).

@BrennaEpp BrennaEpp requested a review from tritone May 24, 2024 05:45
@danielduhh
Copy link
Contributor

Is this ready to merge @BrennaEpp @tritone ?

}
ctx := context.Background()

tb.bucket = prefix + uidSpace.New()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should talk about adding some more objects (at least more than the thread count) and of a wider variety of sizes. Can be in a follow up PR though.

@BrennaEpp BrennaEpp enabled auto-merge (squash) June 4, 2024 23:54
@BrennaEpp BrennaEpp merged commit cde5cbb into googleapis:main Jun 5, 2024
8 checks passed
gcf-merge-on-green bot pushed a commit that referenced this pull request Jun 10, 2024
🤖 I have created a release *beep* *boop*
---


## [1.42.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.41.0...storage/v1.42.0) (2024-06-10)


### Features

* **storage:** Add new package transfermanager. This package is intended for parallel uploads and downloads, and is in preview. It is not stable, and is likely to change. ([#10045](https://togithub.com/googleapis/google-cloud-go/issues/10045)) ([cde5cbb](https://togithub.com/googleapis/google-cloud-go/commit/cde5cbba3145d5a702683656a42158621234fe71))
* **storage:** Add bucket HierarchicalNamespace ([#10315](https://togithub.com/googleapis/google-cloud-go/issues/10315)) ([b92406c](https://togithub.com/googleapis/google-cloud-go/commit/b92406ccfadfdcee379e86d6f78c901d772401a9)), refs [#10146](https://togithub.com/googleapis/google-cloud-go/issues/10146)
* **storage:** Add BucketName to BucketHandle ([#10127](https://togithub.com/googleapis/google-cloud-go/issues/10127)) ([203cc59](https://togithub.com/googleapis/google-cloud-go/commit/203cc599e5e2f2f821dc75b47c5a4c9073333f05))


### Bug Fixes

* **storage:** Set invocation headers on xml reads ([#10250](https://togithub.com/googleapis/google-cloud-go/issues/10250)) ([c87e1ab](https://togithub.com/googleapis/google-cloud-go/commit/c87e1ab6f9618b8b3f4d0005ac159abd87b0daaf))


### Documentation

* **storage:** Update autoclass doc ([#10135](https://togithub.com/googleapis/google-cloud-go/issues/10135)) ([e4b2737](https://togithub.com/googleapis/google-cloud-go/commit/e4b2737ddc16d3bf8139a6def7326ac905f62acd))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
husam-e pushed a commit to husam-e/google-cloud-go that referenced this pull request Jun 11, 2024
🤖 I have created a release *beep* *boop*
---


## [1.42.0](https://togithub.com/googleapis/google-cloud-go/compare/storage/v1.41.0...storage/v1.42.0) (2024-06-10)


### Features

* **storage:** Add new package transfermanager. This package is intended for parallel uploads and downloads, and is in preview. It is not stable, and is likely to change. ([googleapis#10045](https://togithub.com/googleapis/google-cloud-go/issues/10045)) ([cde5cbb](https://togithub.com/googleapis/google-cloud-go/commit/cde5cbba3145d5a702683656a42158621234fe71))
* **storage:** Add bucket HierarchicalNamespace ([googleapis#10315](https://togithub.com/googleapis/google-cloud-go/issues/10315)) ([b92406c](https://togithub.com/googleapis/google-cloud-go/commit/b92406ccfadfdcee379e86d6f78c901d772401a9)), refs [googleapis#10146](https://togithub.com/googleapis/google-cloud-go/issues/10146)
* **storage:** Add BucketName to BucketHandle ([googleapis#10127](https://togithub.com/googleapis/google-cloud-go/issues/10127)) ([203cc59](https://togithub.com/googleapis/google-cloud-go/commit/203cc599e5e2f2f821dc75b47c5a4c9073333f05))


### Bug Fixes

* **storage:** Set invocation headers on xml reads ([googleapis#10250](https://togithub.com/googleapis/google-cloud-go/issues/10250)) ([c87e1ab](https://togithub.com/googleapis/google-cloud-go/commit/c87e1ab6f9618b8b3f4d0005ac159abd87b0daaf))


### Documentation

* **storage:** Update autoclass doc ([googleapis#10135](https://togithub.com/googleapis/google-cloud-go/issues/10135)) ([e4b2737](https://togithub.com/googleapis/google-cloud-go/commit/e4b2737ddc16d3bf8139a6def7326ac905f62acd))

---
This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants