Skip to content

Commit

Permalink
chore: Amend Transfer Manager samples (#1113)
Browse files Browse the repository at this point in the history
* chore: Amend Transfer Manager samples

* tests

* tests again

* respond to feedback
  • Loading branch information
andrewsg committed Aug 31, 2023
1 parent 9a189da commit bdd7c6c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 18 deletions.
23 changes: 19 additions & 4 deletions samples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,11 @@
import storage_set_bucket_default_kms_key
import storage_set_client_endpoint
import storage_set_metadata
import storage_transfer_manager_download_all_blobs
import storage_transfer_manager_download_bucket
import storage_transfer_manager_download_chunks_concurrently
import storage_transfer_manager_download_many
import storage_transfer_manager_upload_directory
import storage_transfer_manager_upload_many_blobs
import storage_transfer_manager_upload_many
import storage_upload_file
import storage_upload_from_memory
import storage_upload_from_stream
Expand Down Expand Up @@ -689,7 +690,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
with open(os.path.join(uploads, name), "w") as f:
f.write(name)

storage_transfer_manager_upload_many_blobs.upload_many_blobs_with_transfer_manager(
storage_transfer_manager_upload_many.upload_many_blobs_with_transfer_manager(
test_bucket.name,
BLOB_NAMES,
source_directory="{}/".format(uploads),
Expand All @@ -702,10 +703,24 @@ def test_transfer_manager_snippets(test_bucket, capsys):

with tempfile.TemporaryDirectory() as downloads:
# Download the files.
storage_transfer_manager_download_all_blobs.download_all_blobs_with_transfer_manager(
storage_transfer_manager_download_bucket.download_bucket_with_transfer_manager(
test_bucket.name,
destination_directory=os.path.join(downloads, ""),
processes=8,
max_results=10000,
)
out, _ = capsys.readouterr()

for name in BLOB_NAMES:
assert "Downloaded {}".format(name) in out

with tempfile.TemporaryDirectory() as downloads:
# Download the files.
storage_transfer_manager_download_many.download_many_blobs_with_transfer_manager(
test_bucket.name,
blob_names=BLOB_NAMES,
destination_directory=os.path.join(downloads, ""),
processes=8,
)
out, _ = capsys.readouterr()

Expand Down
74 changes: 74 additions & 0 deletions samples/snippets/storage_transfer_manager_download_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START storage_transfer_manager_download_bucket]
def download_bucket_with_transfer_manager(
bucket_name, destination_directory="", processes=8, max_results=1000
):
"""Download all of the blobs in a bucket, concurrently in a process pool.
The filename of each blob once downloaded is derived from the blob name and
the `destination_directory `parameter. For complete control of the filename
of each blob, use transfer_manager.download_many() instead.
Directories will be created automatically as needed, for instance to
accommodate blob names that include slashes.
"""

# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The directory on your computer to which to download all of the files. This
# string is prepended (with os.path.join()) to the name of each blob to form
# the full path. Relative paths and absolute paths are both accepted. An
# empty string means "the current working directory". Note that this
# parameter allows accepts directory traversal ("../" etc.) and is not
# intended for unsanitized end user input.
# destination_directory = ""

# The maximum number of processes to use for the operation. The performance
# impact of this value depends on the use case, but smaller files usually
# benefit from a higher number of processes. Each additional process occupies
# some CPU and memory resources until finished.
# processes=8

# The maximum number of results to fetch from bucket.list_blobs(). This
# sample code fetches all of the blobs up to max_results and queues them all
# for download at once. Though they will still be executed in batches up to
# the processes limit, queueing them all at once can be taxing on system
# memory if buckets are very large. Adjust max_results as needed for your
# system environment, or set it to None if you are sure the bucket is not
# too large to hold in memory easily.
# max_results=1000

from google.cloud.storage import Client, transfer_manager

storage_client = Client()
bucket = storage_client.bucket(bucket_name)

blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)]

results = transfer_manager.download_many_to_path(
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
)

for name, result in zip(blob_names, results):
# The results list is either `None` or an exception for each blob in
# the input list, in order.

if isinstance(result, Exception):
print("Failed to download {} due to exception: {}".format(name, result))
else:
print("Downloaded {} to {}.".format(name, destination_directory + name))
# [END storage_transfer_manager_download_bucket]
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# [START storage_transfer_manager_download_chunks_concurrently]
def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
"""Download a single file in chunks, concurrently."""
"""Download a single file in chunks, concurrently in a process pool."""

# The ID of your GCS bucket
# bucket_name = "your-bucket-name"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Google LLC
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
Expand All @@ -12,23 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# [START storage_transfer_manager_download_all_blobs]
def download_all_blobs_with_transfer_manager(
bucket_name, destination_directory="", processes=8
# [START storage_transfer_manager_download_many]
def download_many_blobs_with_transfer_manager(
bucket_name, blob_names, destination_directory="", processes=8
):
"""Download all of the blobs in a bucket, concurrently in a thread pool.
"""Download blobs in a list by name, concurrently in a process pool.
The filename of each blob once downloaded is derived from the blob name and
the `destination_directory `parameter. For complete control of the filename
of each blob, use transfer_manager.download_many() instead.
Directories will be created automatically as needed, for instance to
accommodate blob names that include slashes.
Directories will be created automatically as needed to accommodate blob
names that include slashes.
"""

# The ID of your GCS bucket
# bucket_name = "your-bucket-name"

# The list of blob names to download. The names of each blobs will also
# be the name of each destination file (use transfer_manager.download_many()
# instead to control each destination file name). If there is a "/" in the
# blob name, then corresponding directories will be created on download.
# blob_names = ["myblob", "myblob2"]

# The directory on your computer to which to download all of the files. This
# string is prepended (with os.path.join()) to the name of each blob to form
# the full path. Relative paths and absolute paths are both accepted. An
Expand All @@ -48,8 +54,6 @@ def download_all_blobs_with_transfer_manager(
storage_client = Client()
bucket = storage_client.bucket(bucket_name)

blob_names = [blob.name for blob in bucket.list_blobs()]

results = transfer_manager.download_many_to_path(
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
)
Expand All @@ -62,4 +66,4 @@ def download_all_blobs_with_transfer_manager(
print("Failed to download {} due to exception: {}".format(name, result))
else:
print("Downloaded {} to {}.".format(name, destination_directory + name))
# [END storage_transfer_manager_download_all_blobs]
# [END storage_transfer_manager_download_many]
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# [START storage_transfer_manager_upload_many_blobs]
# [START storage_transfer_manager_upload_many]
def upload_many_blobs_with_transfer_manager(
bucket_name, filenames, source_directory="", processes=8
):
"""Upload every file in a list to a bucket, concurrently in a thread pool.
"""Upload every file in a list to a bucket, concurrently in a process pool.
Each blob name is derived from the filename, not including the
`source_directory` parameter. For complete control of the blob name for each
Expand Down Expand Up @@ -63,4 +63,4 @@ def upload_many_blobs_with_transfer_manager(
print("Failed to upload {} due to exception: {}".format(name, result))
else:
print("Uploaded {} to {}.".format(name, bucket.name))
# [END storage_transfer_manager_upload_many_blobs]
# [END storage_transfer_manager_upload_many]

0 comments on commit bdd7c6c

Please sign in to comment.