Skip to content

Commit

Permalink
feat: support configurable retries in upload_chunks_concurrently (#1120)
Browse files Browse the repository at this point in the history
* feat: support configurable retries in upload_chunks_concurrently

* lint

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
andrewsg and gcf-owl-bot[bot] committed Sep 19, 2023
1 parent 14a1909 commit 1271686
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
22 changes: 22 additions & 0 deletions google/cloud/storage/transfer_manager.py
Expand Up @@ -28,6 +28,8 @@
from google.cloud.storage import Blob
from google.cloud.storage.blob import _get_host_name
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry
from google.cloud.storage.retry import DEFAULT_RETRY

from google.resumable_media.requests.upload import XMLMPUContainer
from google.resumable_media.requests.upload import XMLMPUPart
Expand Down Expand Up @@ -871,6 +873,7 @@ def upload_chunks_concurrently(
*,
checksum="md5",
timeout=_DEFAULT_TIMEOUT,
retry=DEFAULT_RETRY,
):
"""Upload a single file in chunks, concurrently.
Expand Down Expand Up @@ -966,6 +969,20 @@ def upload_chunks_concurrently(
(Optional) The amount of time, in seconds, to wait
for the server response. See: :ref:`configuring_timeouts`
:type retry: google.api_core.retry.Retry
:param retry: (Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will configure backoff and timeout options. Custom
predicates (customizable error codes) are not supported for media
operations such as this one.
This function does not accept ConditionalRetryPolicy values because
preconditions are not supported by the underlying API call.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
"""

Expand Down Expand Up @@ -995,6 +1012,8 @@ def upload_chunks_concurrently(
headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name

container = XMLMPUContainer(url, filename, headers=headers)
container._retry_strategy = _api_core_retry_to_resumable_media_retry(retry)

container.initiate(transport=transport, content_type=content_type)
upload_id = container.upload_id

Expand Down Expand Up @@ -1025,6 +1044,7 @@ def upload_chunks_concurrently(
part_number=part_number,
checksum=checksum,
headers=headers,
retry=retry,
)
)

Expand Down Expand Up @@ -1054,6 +1074,7 @@ def _upload_part(
part_number,
checksum,
headers,
retry,
):
"""Helper function that runs inside a thread or subprocess to upload a part.
Expand All @@ -1075,6 +1096,7 @@ def _upload_part(
checksum=checksum,
headers=headers,
)
part._retry_strategy = _api_core_retry_to_resumable_media_retry(retry)
part.upload(client._http)
return (part_number, part.etag)

Expand Down
25 changes: 24 additions & 1 deletion tests/unit/test_transfer_manager.py
Expand Up @@ -658,6 +658,11 @@ def test_upload_chunks_concurrently():
container_mock.register_part.assert_any_call(1, ETAG)
container_mock.register_part.assert_any_call(2, ETAG)
container_mock.finalize.assert_called_once_with(bucket.client._http)

assert container_mock._retry_strategy.max_sleep == 60.0
assert container_mock._retry_strategy.max_cumulative_retry == 120.0
assert container_mock._retry_strategy.max_retries is None

part_mock.upload.assert_called_with(transport)


Expand Down Expand Up @@ -693,12 +698,15 @@ def test_upload_chunks_concurrently_passes_concurrency_options():
worker_type=transfer_manager.THREAD,
max_workers=MAX_WORKERS,
deadline=DEADLINE,
retry=None,
)
except ValueError:
pass # The futures don't actually work, so we expect this to abort.
# Conveniently, that gives us a chance to test the auto-delete
# exception handling feature.
container_mock.cancel.assert_called_once_with(transport)
assert container_mock._retry_strategy.max_retries == 0

pool_patch.assert_called_with(max_workers=MAX_WORKERS)
wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY)

Expand Down Expand Up @@ -905,6 +913,8 @@ def test__download_and_write_chunk_in_place():


def test__upload_part():
from google.cloud.storage.retry import DEFAULT_RETRY

pickled_mock = pickle.dumps(_PickleableMockClient())
FILENAME = "file_a.txt"
UPLOAD_ID = "abcd"
Expand All @@ -916,9 +926,22 @@ def test__upload_part():
"google.cloud.storage.transfer_manager.XMLMPUPart", return_value=part
):
result = transfer_manager._upload_part(
pickled_mock, URL, UPLOAD_ID, FILENAME, 0, 256, 1, None, {"key", "value"}
pickled_mock,
URL,
UPLOAD_ID,
FILENAME,
0,
256,
1,
None,
{"key", "value"},
retry=DEFAULT_RETRY,
)
part.upload.assert_called_once()
assert part._retry_strategy.max_sleep == 60.0
assert part._retry_strategy.max_cumulative_retry == 120.0
assert part._retry_strategy.max_retries is None

assert result == (1, ETAG)


Expand Down

0 comments on commit 1271686

Please sign in to comment.