From 9998a5e1c9e9e8920c4d40e13e39095585de657a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 6 Dec 2022 15:21:15 -0800 Subject: [PATCH] feat: Add "transfer_manager" module for concurrent uploads and downloads, as a preview feature (#943) * checkpoint before design doc impl * checkpoint * more tests * code and tests for transfer manager complete * proactively close temp files when finished reading * respond to comments; destroy tmp files as they are consumed * Add system tests, docstrings, address feedback * Respond to review comments * verify md5 hash of downloaded file in test * lint * default empty strings for root arguments * fix bug with blob constructor * add warning about files not being deleted if their downloads fail * docs: Add samples to multithread branch (#918) * add samples, tests pending * add snippet tests * snippet and snippets_test.py linting * snippets; recursive directory creation; rename some params * Add directory upload snippet * fix: remove chunked downloads; change max_workers to threads * update snippets to add thread info * fix snippets test issue due to change in dependency * snippet nomenclature * fix samples for real this time --- google/cloud/storage/constants.py | 1 + google/cloud/storage/fileio.py | 6 +- google/cloud/storage/transfer_manager.py | 501 +++++++++++++++++++ samples/snippets/snippets_test.py | 118 ++++- samples/snippets/storage_transfer_manager.py | 184 +++++++ tests/system/test_transfer_manager.py | 84 ++++ tests/unit/test_transfer_manager.py | 335 +++++++++++++ 7 files changed, 1210 insertions(+), 19 deletions(-) create mode 100644 google/cloud/storage/transfer_manager.py create mode 100644 samples/snippets/storage_transfer_manager.py create mode 100644 tests/system/test_transfer_manager.py create mode 100644 tests/unit/test_transfer_manager.py diff --git a/google/cloud/storage/constants.py b/google/cloud/storage/constants.py index babbc5a42..5d6497295 100644 --- a/google/cloud/storage/constants.py +++ b/google/cloud/storage/constants.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """Constants used across google.cloud.storage modules.""" # Storage classes diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index d3ae135bb..d09a3c885 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Support for file-like I/O.""" +"""Module for file-like access of blobs, usually invoked via Blob.open().""" import io import warnings @@ -101,10 +101,12 @@ class BlobReader(io.BufferedIOBase): - ``if_metageneration_match`` - ``if_metageneration_not_match`` - ``timeout`` + + Note that download_kwargs are also applied to blob.reload(), if a reload + is needed during seek(). """ def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): - """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: if kwarg not in VALID_DOWNLOAD_KWARGS: raise ValueError( diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py new file mode 100644 index 000000000..e87f0cc76 --- /dev/null +++ b/google/cloud/storage/transfer_manager.py @@ -0,0 +1,501 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Concurrent media operations. This is a PREVIEW FEATURE: API may change.""" + +import concurrent.futures + +import os +import warnings + +from google.api_core import exceptions + +warnings.warn( + "The module `transfer_manager` is a preview feature. Functionality and API " + "may change. This warning will be removed in a future release." +) + + +DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 + + +def upload_many( + file_blob_pairs, + skip_if_exists=False, + upload_kwargs=None, + threads=4, + deadline=None, + raise_exception=False, +): + """Upload many files concurrently via a worker pool. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + :type file_blob_pairs: List(Tuple(IOBase or str, 'google.cloud.storage.blob.Blob')) + :param file_blob_pairs: + A list of tuples of a file or filename and a blob. Each file will be + uploaded to the corresponding blob by using blob.upload_from_file() or + blob.upload_from_filename() as appropriate. + + :type skip_if_exists: bool + :param skip_if_exists: + If True, blobs that already have a live version will not be overwritten. + This is accomplished by setting "if_generation_match = 0" on uploads. + Uploads so skipped will result in a 412 Precondition Failed response + code, which will be included in the return value but not raised + as an exception regardless of the value of raise_exception. + + :type upload_kwargs: dict + :param upload_kwargs: + A dictionary of keyword arguments to pass to the upload method. Refer + to the documentation for blob.upload_from_file() or + blob.upload_from_filename() for more information. The dict is directly + passed into the upload methods and is not validated by this function. + + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + If skip_if_exists is True, 412 Precondition Failed responses are + considered part of normal operation and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + upload method is used (typically, None). + """ + if upload_kwargs is None: + upload_kwargs = {} + if skip_if_exists: + upload_kwargs["if_generation_match"] = 0 + + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + futures = [] + for path_or_file, blob in file_blob_pairs: + method = ( + blob.upload_from_filename + if isinstance(path_or_file, str) + else blob.upload_from_file + ) + futures.append(executor.submit(method, path_or_file, **upload_kwargs)) + results = [] + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + for future in futures: + exp = future.exception() + + # If raise_exception is False, don't call future.result() + if exp and not raise_exception: + results.append(exp) + # If skip_if_exists and the exception is PreconditionFailed, do same. + elif exp and skip_if_exists and isinstance(exp, exceptions.PreconditionFailed): + results.append(exp) + # Get the real result. If there was an exception not handled above, + # this will raise it. + else: + results.append(future.result()) + return results + + +def download_many( + blob_file_pairs, + download_kwargs=None, + threads=4, + deadline=None, + raise_exception=False, +): + """Download many blobs concurrently via a worker pool. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + :type blob_file_pairs: List(Tuple('google.cloud.storage.blob.Blob', IOBase or str)) + :param blob_file_pairs: + A list of tuples of blob and a file or filename. Each blob will be + downloaded to the corresponding blob by using blob.download_to_file() or + blob.download_to_filename() as appropriate. + + Note that blob.download_to_filename() does not delete the destination + file if the download fails. + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + download method is used (typically, None). + """ + + if download_kwargs is None: + download_kwargs = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + futures = [] + for blob, path_or_file in blob_file_pairs: + method = ( + blob.download_to_filename + if isinstance(path_or_file, str) + else blob.download_to_file + ) + futures.append(executor.submit(method, path_or_file, **download_kwargs)) + results = [] + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + for future in futures: + if not raise_exception: + exp = future.exception() + if exp: + results.append(exp) + continue + results.append(future.result()) + return results + + +def upload_many_from_filenames( + bucket, + filenames, + source_directory="", + blob_name_prefix="", + skip_if_exists=False, + blob_constructor_kwargs=None, + upload_kwargs=None, + threads=4, + deadline=None, + raise_exception=False, +): + """Upload many files concurrently by their filenames. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + The destination blobs are automatically created, with blob names based on + the source filenames and the blob_name_prefix. + + For example, if the `filenames` include "images/icon.jpg", + `source_directory` is "/home/myuser/", and `blob_name_prefix` is "myfiles/", + then the file at "/home/myuser/images/icon.jpg" will be uploaded to a blob + named "myfiles/images/icon.jpg". + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which will contain the uploaded blobs. + + :type filenames: list(str) + :param filenames: + A list of filenames to be uploaded. This may include part of the path. + The full path to the file must be source_directory + filename. + + :type source_directory: str + :param source_directory: + A string that will be prepended (with os.path.join()) to each filename + in the input list, in order to find the source file for each blob. + Unlike the filename itself, the source_directory does not affect the + name of the uploaded blob. + + For instance, if the source_directory is "/tmp/img/" and a filename is + "0001.jpg", with an empty blob_name_prefix, then the file uploaded will + be "/tmp/img/0001.jpg" and the destination blob will be "0001.jpg". + + This parameter can be an empty string. + + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. + + :type blob_name_prefix: str + :param blob_name_prefix: + A string that will be prepended to each filename in the input list, in + order to determine the name of the destination blob. Unlike the filename + itself, the prefix string does not affect the location the library will + look for the source data on the local filesystem. + + For instance, if the source_directory is "/tmp/img/", the + blob_name_prefix is "myuser/mystuff-" and a filename is "0001.jpg" then + the file uploaded will be "/tmp/img/0001.jpg" and the destination blob + will be "myuser/mystuff-0001.jpg". + + The blob_name_prefix can be blank (an empty string). + + :type skip_if_exists: bool + :param skip_if_exists: + If True, blobs that already have a live version will not be overwritten. + This is accomplished by setting "if_generation_match = 0" on uploads. + Uploads so skipped will result in a 412 Precondition Failed response + code, which will be included in the return value, but not raised + as an exception regardless of the value of raise_exception. + + :type blob_constructor_kwargs: dict + :param blob_constructor_kwargs: + A dictionary of keyword arguments to pass to the blob constructor. Refer + to the documentation for blob.Blob() for more information. The dict is + directly passed into the constructor and is not validated by this + function. `name` and `bucket` keyword arguments are reserved by this + function and will result in an error if passed in here. + + :type upload_kwargs: dict + :param upload_kwargs: + A dictionary of keyword arguments to pass to the upload method. Refer + to the documentation for blob.upload_from_file() or + blob.upload_from_filename() for more information. The dict is directly + passed into the upload methods and is not validated by this function. + + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. + + If skip_if_exists is True, 412 Precondition Failed responses are + considered part of normal operation and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + upload method is used (typically, None). + """ + if blob_constructor_kwargs is None: + blob_constructor_kwargs = {} + + file_blob_pairs = [] + + for filename in filenames: + path = os.path.join(source_directory, filename) + blob_name = blob_name_prefix + filename + blob = bucket.blob(blob_name, **blob_constructor_kwargs) + file_blob_pairs.append((path, blob)) + + return upload_many( + file_blob_pairs, + skip_if_exists=skip_if_exists, + upload_kwargs=upload_kwargs, + threads=threads, + deadline=deadline, + raise_exception=raise_exception, + ) + + +def download_many_to_path( + bucket, + blob_names, + destination_directory="", + blob_name_prefix="", + download_kwargs=None, + threads=4, + deadline=None, + create_directories=True, + raise_exception=False, +): + """Download many files concurrently by their blob names. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + The destination files are automatically created, with paths based on the + source blob_names and the destination_directory. + + The destination files are not automatically deleted if their downloads fail, + so please check the return value of this function for any exceptions, or + enable `raise_exception=True`, and process the files accordingly. + + For example, if the `blob_names` include "icon.jpg", `destination_directory` + is "/home/myuser/", and `blob_name_prefix` is "images/", then the blob named + "images/icon.jpg" will be downloaded to a file named + "/home/myuser/icon.jpg". + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which contains the blobs to be downloaded + + :type blob_names: list(str) + :param blob_names: + A list of blobs to be downloaded. The blob name in this string will be + used to determine the destination file path as well. + + The full name to the blob must be blob_name_prefix + blob_name. The + blob_name is separate from the blob_name_prefix because the blob_name + will also determine the name of the destination blob. Any shared part of + the blob names that need not be part of the destination path should be + included in the blob_name_prefix. + + :type destination_directory: str + :param destination_directory: + A string that will be prepended (with os.path.join()) to each blob_name + in the input list, in order to determine the destination path for that + blob. + + For instance, if the destination_directory string is "/tmp/img" and a + blob_name is "0001.jpg", with an empty blob_name_prefix, then the source + blob "0001.jpg" will be downloaded to destination "/tmp/img/0001.jpg" . + + This parameter can be an empty string. + + Note that this parameter allows directory traversal (e.g. "/", "../") + and is not intended for unsanitized end user input. + + :type blob_name_prefix: str + :param blob_name_prefix: + A string that will be prepended to each blob_name in the input list, in + order to determine the name of the source blob. Unlike the blob_name + itself, the prefix string does not affect the destination path on the + local filesystem. For instance, if the destination_directory is + "/tmp/img/", the blob_name_prefix is "myuser/mystuff-" and a blob_name + is "0001.jpg" then the source blob "myuser/mystuff-0001.jpg" will be + downloaded to "/tmp/img/0001.jpg". The blob_name_prefix can be blank + (an empty string). + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + :type threads: int + :param threads: + The number of threads to use in the worker pool. This is passed to + `concurrent.futures.ThreadPoolExecutor` as the `max_worker` param; refer + to standard library documentation for details. + + The performance impact of this value depends on the use case, but + generally, smaller files benefit from more threads and larger files + don't benefit from more threads. Too many threads can slow operations, + especially with large files, due to contention over the Python GIL. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type create_directories: bool + :param create_directories: + If True, recursively create any directories that do not exist. For + instance, if downloading object "images/img001.png", create the + directory "images" before downloading. + + :type raise_exception: bool + :param raise_exception: + If True, instead of adding exceptions to the list of return values, + instead they will be raised. Note that encountering an exception on one + operation will not prevent other operations from starting. Exceptions + are only processed and potentially raised after all operations are + complete in success or failure. If skip_if_exists is True, 412 + Precondition Failed responses are considered part of normal operation + and are not raised as an exception. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + + :rtype: list + :returns: A list of results corresponding to, in order, each item in the + input list. If an exception was received, it will be the result + for that operation. Otherwise, the return value from the successful + download method is used (typically, None). + """ + blob_file_pairs = [] + + for blob_name in blob_names: + full_blob_name = blob_name_prefix + blob_name + path = os.path.join(destination_directory, blob_name) + if create_directories: + directory, _ = os.path.split(path) + os.makedirs(directory, exist_ok=True) + blob_file_pairs.append((bucket.blob(full_blob_name), path)) + + return download_many( + blob_file_pairs, + download_kwargs=download_kwargs, + threads=threads, + deadline=deadline, + raise_exception=raise_exception, + ) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 9370ecbdd..4ad0dc1a0 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -72,6 +72,7 @@ import storage_set_bucket_default_kms_key import storage_set_client_endpoint import storage_set_metadata +import storage_transfer_manager import storage_upload_file import storage_upload_from_memory import storage_upload_from_stream @@ -124,8 +125,8 @@ def test_bucket(): def test_public_bucket(): # The new projects don't allow to make a bucket available to public, so # for some tests we need to use the old main project for now. - original_value = os.environ['GOOGLE_CLOUD_PROJECT'] - os.environ['GOOGLE_CLOUD_PROJECT'] = os.environ['MAIN_GOOGLE_CLOUD_PROJECT'] + original_value = os.environ["GOOGLE_CLOUD_PROJECT"] + os.environ["GOOGLE_CLOUD_PROJECT"] = os.environ["MAIN_GOOGLE_CLOUD_PROJECT"] bucket = None while bucket is None or bucket.exists(): storage_client = storage.Client() @@ -135,7 +136,7 @@ def test_public_bucket(): yield bucket bucket.delete(force=True) # Set the value back. - os.environ['GOOGLE_CLOUD_PROJECT'] = original_value + os.environ["GOOGLE_CLOUD_PROJECT"] = original_value @pytest.fixture(scope="module") @@ -255,7 +256,7 @@ def test_download_byte_range(test_blob): storage_download_byte_range.download_byte_range( test_blob.bucket.name, test_blob.name, 0, 4, dest_file.name ) - assert dest_file.read() == b'Hello' + assert dest_file.read() == b"Hello" def test_download_blob(test_blob): @@ -308,7 +309,8 @@ def test_delete_blob(test_blob): def test_make_blob_public(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) r = requests.get(test_public_blob.public_url) assert r.text == "Hello, is it me you're looking for?" @@ -340,7 +342,9 @@ def test_generate_upload_signed_url_v4(test_bucket, capsys): ) requests.put( - url, data=content, headers={"content-type": "application/octet-stream"}, + url, + data=content, + headers={"content-type": "application/octet-stream"}, ) bucket = storage.Client().bucket(test_bucket.name) @@ -447,16 +451,20 @@ def test_get_set_autoclass(new_bucket_obj, test_bucket, capsys): def test_bucket_lifecycle_management(test_bucket, capsys): - bucket = storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_enable_bucket_lifecycle_management.enable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out assert "Lifecycle management is enable" in out assert len(list(bucket.lifecycle_rules)) > 0 - bucket = storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( - test_bucket + bucket = ( + storage_disable_bucket_lifecycle_management.disable_bucket_lifecycle_management( + test_bucket + ) ) out, _ = capsys.readouterr() assert "[]" in out @@ -512,7 +520,8 @@ def test_get_service_account(capsys): def test_download_public_file(test_public_blob): storage_make_public.make_blob_public( - test_public_blob.bucket.name, test_public_blob.name) + test_public_blob.bucket.name, test_public_blob.name + ) with tempfile.NamedTemporaryFile() as dest_file: storage_download_public_file.download_public_file( test_public_blob.bucket.name, test_public_blob.name, dest_file.name @@ -522,8 +531,10 @@ def test_download_public_file(test_public_blob): def test_define_bucket_website_configuration(test_bucket): - bucket = storage_define_bucket_website_configuration.define_bucket_website_configuration( - test_bucket.name, "index.html", "404.html" + bucket = ( + storage_define_bucket_website_configuration.define_bucket_website_configuration( + test_bucket.name, "index.html", "404.html" + ) ) website_val = {"mainPageSuffix": "index.html", "notFoundPage": "404.html"} @@ -586,7 +597,7 @@ def test_change_default_storage_class(test_bucket, capsys): ) out, _ = capsys.readouterr() assert "Default storage class for bucket" in out - assert bucket.storage_class == 'COLDLINE' + assert bucket.storage_class == "COLDLINE" def test_change_file_storage_class(test_blob, capsys): @@ -595,7 +606,7 @@ def test_change_file_storage_class(test_blob, capsys): ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out - assert blob.storage_class == 'NEARLINE' + assert blob.storage_class == "NEARLINE" def test_copy_file_archived_generation(test_blob): @@ -629,7 +640,8 @@ def test_storage_configure_retries(test_blob, capsys): out, _ = capsys.readouterr() assert "The following library method is customized to be retried" in out assert "_should_retry" in out - assert "initial=1.5, maximum=45.0, multiplier=1.2, deadline=500.0" in out + assert "initial=1.5, maximum=45.0, multiplier=1.2" in out + assert "500" in out # "deadline" or "timeout" depending on dependency ver. def test_batch_request(test_bucket): @@ -647,7 +659,79 @@ def test_batch_request(test_bucket): def test_storage_set_client_endpoint(capsys): - storage_set_client_endpoint.set_client_endpoint('https://storage.googleapis.com') + storage_set_client_endpoint.set_client_endpoint("https://storage.googleapis.com") out, _ = capsys.readouterr() assert "client initiated with endpoint: https://storage.googleapis.com" in out + + +def test_transfer_manager_snippets(test_bucket, capsys): + BLOB_NAMES = [ + "test.txt", + "test2.txt", + "blobs/test.txt", + "blobs/nesteddir/test.txt", + ] + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open(os.path.join(uploads, name), "w") as f: + f.write(name) + + storage_transfer_manager.upload_many_blobs_with_transfer_manager( + test_bucket.name, + BLOB_NAMES, + source_directory="{}/".format(uploads), + threads=2, + ) + out, _ = capsys.readouterr() + + for name in BLOB_NAMES: + assert "Uploaded {}".format(name) in out + + with tempfile.TemporaryDirectory() as downloads: + # Download the files. + storage_transfer_manager.download_all_blobs_with_transfer_manager( + test_bucket.name, + destination_directory=os.path.join(downloads, ""), + threads=2, + ) + out, _ = capsys.readouterr() + + for name in BLOB_NAMES: + assert "Downloaded {}".format(name) in out + + +def test_transfer_manager_directory_upload(test_bucket, capsys): + BLOB_NAMES = [ + "dirtest/test.txt", + "dirtest/test2.txt", + "dirtest/blobs/test.txt", + "dirtest/blobs/nesteddir/test.txt", + ] + + with tempfile.TemporaryDirectory() as uploads: + # Create dirs and nested dirs + for name in BLOB_NAMES: + relpath = os.path.dirname(name) + os.makedirs(os.path.join(uploads, relpath), exist_ok=True) + + # Create files with nested dirs to exercise directory handling. + for name in BLOB_NAMES: + with open(os.path.join(uploads, name), "w") as f: + f.write(name) + + storage_transfer_manager.upload_directory_with_transfer_manager( + test_bucket.name, source_directory="{}/".format(uploads) + ) + out, _ = capsys.readouterr() + + assert "Found {}".format(len(BLOB_NAMES)) in out + for name in BLOB_NAMES: + assert "Uploaded {}".format(name) in out diff --git a/samples/snippets/storage_transfer_manager.py b/samples/snippets/storage_transfer_manager.py new file mode 100644 index 000000000..0a02b96e3 --- /dev/null +++ b/samples/snippets/storage_transfer_manager.py @@ -0,0 +1,184 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def upload_many_blobs_with_transfer_manager( + bucket_name, filenames, source_directory="", threads=4 +): + """Upload every file in a list to a bucket, concurrently in a thread pool. + + Each blob name is derived from the filename, not including the + `source_directory` parameter. For complete control of the blob name for each + file (and other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # A list (or other iterable) of filenames to upload. + # filenames = ["file_1.txt", "file_2.txt"] + + # The directory on your computer that is the root of all of the files in the + # list of filenames. This string is prepended (with os.path.join()) to each + # filename to get the full path to the file. Relative paths and absolute + # paths are both accepted. This string is not included in the name of the + # uploaded blob; it is only used to find the source files. An empty string + # means "the current working directory". Note that this parameter allows + # directory traversal (e.g. "/", "../") and is not intended for unsanitized + # end user input. + # source_directory="" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + results = transfer_manager.upload_many_from_filenames( + bucket, filenames, source_directory=source_directory, threads=threads + ) + + for name, result in zip(filenames, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) + + +def upload_directory_with_transfer_manager(bucket_name, source_directory, threads=4): + """Upload every file in a directory, including all files in subdirectories. + + Each blob name is derived from the filename, not including the `directory` + parameter itself. For complete control of the blob name for each file (and + other aspects of individual blob metadata), use + transfer_manager.upload_many() instead. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to upload. Files in the directory and its + # subdirectories will be uploaded. An empty string means "the current + # working directory". + # source_directory="" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from pathlib import Path + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + # Generate a list of paths (in string form) relative to the `directory`. + # This can be done in a single list comprehension, but is expanded into + # multiple lines here for clarity. + + # First, recursively get all files in `directory` as Path objects. + directory_as_path_obj = Path(source_directory) + paths = directory_as_path_obj.rglob("*") + + # Filter so the list only includes files, not directories themselves. + file_paths = [path for path in paths if path.is_file()] + + # These paths are relative to the current working directory. Next, make them + # relative to `directory` + relative_paths = [path.relative_to(source_directory) for path in file_paths] + + # Finally, convert them all to strings. + string_paths = [str(path) for path in relative_paths] + + print("Found {} files.".format(len(string_paths))) + + # Start the upload. + results = transfer_manager.upload_many_from_filenames( + bucket, string_paths, source_directory=source_directory, threads=threads + ) + + for name, result in zip(string_paths, results): + # The results list is either `None` or an exception for each filename in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to upload {} due to exception: {}".format(name, result)) + else: + print("Uploaded {} to {}.".format(name, bucket.name)) + + +def download_all_blobs_with_transfer_manager( + bucket_name, destination_directory="", threads=4 +): + """Download all of the blobs in a bucket, concurrently in a thread pool. + + The filename of each blob once downloaded is derived from the blob name and + the `destination_directory `parameter. For complete control of the filename + of each blob, use transfer_manager.download_many() instead. + + Directories will be created automatically as needed, for instance to + accommodate blob names that include slashes. + """ + + # The ID of your GCS bucket + # bucket_name = "your-bucket-name" + + # The directory on your computer to which to download all of the files. This + # string is prepended (with os.path.join()) to the name of each blob to form + # the full path. Relative paths and absolute paths are both accepted. An + # empty string means "the current working directory". Note that this + # parameter allows accepts directory traversal ("../" etc.) and is not + # intended for unsanitized end user input. + # destination_directory = "" + + # The number of threads to use for the operation. The performance impact of + # this value depends on the use case, but generally, smaller files benefit + # from more threads and larger files don't benefit from more threads. Too + # many threads can slow operations, especially with large files, due to + # contention over the Python GIL. + # threads=4 + + from google.cloud.storage import Client, transfer_manager + + storage_client = Client() + bucket = storage_client.bucket(bucket_name) + + blob_names = [blob.name for blob in bucket.list_blobs()] + + results = transfer_manager.download_many_to_path( + bucket, blob_names, destination_directory=destination_directory, threads=threads + ) + + for name, result in zip(blob_names, results): + # The results list is either `None` or an exception for each blob in + # the input list, in order. + + if isinstance(result, Exception): + print("Failed to download {} due to exception: {}".format(name, result)) + else: + print("Downloaded {} to {}.".format(name, destination_directory + name)) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py new file mode 100644 index 000000000..0b639170d --- /dev/null +++ b/tests/system/test_transfer_manager.py @@ -0,0 +1,84 @@ +# coding=utf-8 +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import tempfile + +from google.cloud.storage import transfer_manager + +from google.api_core import exceptions + + +def test_upload_many(shared_bucket, file_data, blobs_to_delete): + FILE_BLOB_PAIRS = [ + (file_data["simple"]["path"], shared_bucket.blob("simple1")), + (file_data["simple"]["path"], shared_bucket.blob("simple2")), + ] + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + assert results == [None, None] + + blobs = shared_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 2 + + +def test_upload_many_with_file_objs(shared_bucket, file_data, blobs_to_delete): + FILE_BLOB_PAIRS = [ + (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple1")), + (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple2")), + ] + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + assert results == [None, None] + + blobs = shared_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 2 + + +def test_upload_many_skip_if_exists( + listable_bucket, listable_filenames, file_data, blobs_to_delete +): + FILE_BLOB_PAIRS = [ + (file_data["logo"]["path"], listable_bucket.blob(listable_filenames[0])), + (file_data["simple"]["path"], listable_bucket.blob("simple")), + ] + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + ) + assert isinstance(results[0], exceptions.PreconditionFailed) + assert results[1] is None + + blobs = listable_bucket.list_blobs() + for blob in blobs: + if blob.name.startswith("simple"): + blobs_to_delete.append(blob) + assert len(blobs_to_delete) == 1 + + +def test_download_many(listable_bucket): + blobs = list(listable_bucket.list_blobs()) + tempfiles = [tempfile.TemporaryFile(), tempfile.TemporaryFile()] + BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) + + results = transfer_manager.download_many(BLOB_FILE_PAIRS) + assert results == [None, None] + for fp in tempfiles: + assert fp.tell() != 0 diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py new file mode 100644 index 000000000..f52d5471b --- /dev/null +++ b/tests/unit/test_transfer_manager.py @@ -0,0 +1,335 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +with pytest.warns(UserWarning): + from google.cloud.storage import transfer_manager + +from google.api_core import exceptions + +import os +import tempfile +import unittest +import mock + + +class Test_Transfer_Manager(unittest.TestCase): + def test_upload_many_with_filenames(self): + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + FAKE_RESULT = "nothing to see here" + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_filename.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + ) + for (filename, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.assert_any_call( + filename, **EXPECTED_UPLOAD_KWARGS + ) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_upload_many_with_file_objs(self): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock()), + (tempfile.TemporaryFile(), mock.Mock()), + ] + FAKE_CONTENT_TYPE = "text/fake" + UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + FAKE_RESULT = "nothing to see here" + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_file.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + ) + for (file, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_upload_many_passes_concurrency_options(self): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock()), + (tempfile.TemporaryFile(), mock.Mock()), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch( + "concurrent.futures.ThreadPoolExecutor" + ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + transfer_manager.upload_many( + FILE_BLOB_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE + ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with( + mock.ANY, timeout=DEADLINE, return_when=mock.ANY + ) + + def test_upload_many_suppresses_exceptions(self): + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + for result in results: + self.assertEqual(type(result), ConnectionError) + + def test_upload_many_raises_exceptions(self): + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + with self.assertRaises(ConnectionError): + transfer_manager.upload_many(FILE_BLOB_PAIRS, raise_exception=True) + + def test_upload_many_suppresses_412_with_skip_if_exists(self): + FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( + "412" + ) + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + ) + for result in results: + self.assertEqual(type(result), exceptions.PreconditionFailed) + + def test_download_many_with_filenames(self): + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] + FAKE_ENCODING = "fake_gzip" + DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} + FAKE_RESULT = "nothing to see here" + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_filename.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_download_many_with_file_objs(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), tempfile.TemporaryFile()), + (mock.Mock(), tempfile.TemporaryFile()), + ] + FAKE_ENCODING = "fake_gzip" + DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} + FAKE_RESULT = "nothing to see here" + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_file.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + self.assertEqual(result, FAKE_RESULT) + + def test_download_many_passes_concurrency_options(self): + BLOB_FILE_PAIRS = [ + (mock.Mock(), tempfile.TemporaryFile()), + (mock.Mock(), tempfile.TemporaryFile()), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch( + "concurrent.futures.ThreadPoolExecutor" + ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + transfer_manager.download_many( + BLOB_FILE_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE + ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with( + mock.ANY, timeout=DEADLINE, return_when=mock.ANY + ) + + def test_download_many_suppresses_exceptions(self): + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + results = transfer_manager.download_many(BLOB_FILE_PAIRS) + for result in results: + self.assertEqual(type(result), ConnectionError) + + def test_download_many_raises_exceptions(self): + BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + transfer_manager.download_many(BLOB_FILE_PAIRS) + with self.assertRaises(ConnectionError): + transfer_manager.download_many(BLOB_FILE_PAIRS, raise_exception=True) + + def test_upload_many_from_filenames(self): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + ROOT = "mypath/" + PREFIX = "myprefix/" + KEY_NAME = "keyname" + BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} + UPLOAD_KWARGS = {"content-type": "text/fake"} + MAX_WORKERS = 7 + DEADLINE = 10 + + EXPECTED_FILE_BLOB_PAIRS = [ + (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + source_directory=ROOT, + blob_name_prefix=PREFIX, + skip_if_exists=True, + blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, + upload_kwargs=UPLOAD_KWARGS, + threads=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + threads=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) + bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) + + def test_upload_many_from_filenames_minimal_args(self): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + + EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + ) + + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=False, + upload_kwargs=None, + threads=4, + deadline=None, + raise_exception=False, + ) + bucket.blob.assert_any_call(FILENAMES[0]) + bucket.blob.assert_any_call(FILENAMES[1]) + + def test_download_many_to_path(self): + bucket = mock.Mock() + + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + threads=MAX_WORKERS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + threads=MAX_WORKERS, + deadline=DEADLINE, + raise_exception=True, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + + def test_download_many_to_path_creates_directories(self): + bucket = mock.Mock() + + with tempfile.TemporaryDirectory() as tempdir: + DIR_NAME = "dir_a/dir_b" + BLOBNAMES = [ + "file_a.txt", + "file_b.txt", + os.path.join(DIR_NAME, "file_c.txt"), + ] + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=tempdir, + create_directories=True, + raise_exception=True, + ) + + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=None, + threads=4, + deadline=None, + raise_exception=True, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(blobname) + + assert os.path.isdir(os.path.join(tempdir, DIR_NAME))