Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back support for retries in storage uploads. #3378

Merged
merged 1 commit into from
May 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 47 additions & 16 deletions storage/google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,13 @@
'storageClass',
)
_NUM_RETRIES_MESSAGE = (
'num_retries is no longer supported. When a transient error occurs, '
'such as a 429 Too Many Requests or 500 Internal Server Error, upload '
'requests will be automatically retried. Subsequent retries will be '
'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until '
'10 minutes of wait time have elapsed. At that point, there will be no '
'more attempts to retry.')
'`num_retries` has been deprecated and will be removed in a future '
'release. The default behavior (when `num_retries` is not specified) when '
'a transient error (e.g. 429 Too Many Requests or 500 Internal Server '
'Error) occurs will be as follows: upload requests will be automatically '
'retried. Subsequent retries will be sent after waiting 1, 2, 4, 8, etc. '
'seconds (exponential backoff) until 10 minutes of wait time have '
'elapsed. At that point, there will be no more attempts to retry.')
_READ_LESS_THAN_SIZE = (
'Size {:d} was specified but the file-like object only had '
'{:d} bytes remaining.')
Expand Down Expand Up @@ -583,7 +584,8 @@ def _get_upload_arguments(self, content_type):
content_type = self._get_content_type(content_type)
return headers, object_metadata, content_type

def _do_multipart_upload(self, client, stream, content_type, size):
def _do_multipart_upload(self, client, stream, content_type,
size, num_retries):
"""Perform a multipart upload.

Assumes ``chunk_size`` is :data:`None` on the current blob.
Expand All @@ -610,6 +612,10 @@ def _do_multipart_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the multipart
upload request.
Expand All @@ -631,13 +637,19 @@ def _do_multipart_upload(self, client, stream, content_type, size):
upload_url = _MULTIPART_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
upload = MultipartUpload(upload_url, headers=headers)

if num_retries is not None:

This comment was marked as spam.

This comment was marked as spam.

upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

response = upload.transmit(
transport, data, object_metadata, content_type)

return response

def _initiate_resumable_upload(self, client, stream, content_type,
size, extra_headers=None, chunk_size=None):
size, num_retries, extra_headers=None,
chunk_size=None):
"""Initiate a resumable upload.

The content type of the upload will be determined in order
Expand All @@ -662,6 +674,10 @@ def _initiate_resumable_upload(self, client, stream, content_type,
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:type extra_headers: dict
:param extra_headers: (Optional) Extra headers to add to standard
headers.
Expand Down Expand Up @@ -693,13 +709,19 @@ def _initiate_resumable_upload(self, client, stream, content_type,
upload_url = _RESUMABLE_URL_TEMPLATE.format(
bucket_path=self.bucket.path)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)

if num_retries is not None:

This comment was marked as spam.

This comment was marked as spam.

upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

upload.initiate(
transport, stream, object_metadata, content_type,
total_bytes=size, stream_final=False)

return upload, transport

def _do_resumable_upload(self, client, stream, content_type, size):
def _do_resumable_upload(self, client, stream, content_type,
size, num_retries):
"""Perform a resumable upload.

Assumes ``chunk_size`` is not :data:`None` on the current blob.
Expand All @@ -726,19 +748,23 @@ def _do_resumable_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _do_upload(self, client, stream, content_type, size):
def _do_upload(self, client, stream, content_type, size, num_retries):
"""Determine an upload strategy and then perform the upload.

If the current blob has a ``chunk_size`` set, then a resumable upload
Expand Down Expand Up @@ -767,17 +793,21 @@ def _do_upload(self, client, stream, content_type, size):
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: dict
:returns: The parsed JSON from the "200 OK" response. This will be the
**only** response in the multipart case and it will be the
**final** response in the resumable case.
"""
if self.chunk_size is None:
response = self._do_multipart_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)
else:
response = self._do_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

return response.json()

Expand Down Expand Up @@ -831,7 +861,8 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
:param content_type: Optional type of content being uploaded.

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated.)
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:type client: :class:`~google.cloud.storage.client.Client`
:param client: (Optional) The client to use. If not passed, falls back
Expand All @@ -846,7 +877,7 @@ def upload_from_file(self, file_obj, rewind=False, size=None,
_maybe_rewind(file_obj, rewind=rewind)
try:
created_json = self._do_upload(
client, file_obj, content_type, size)
client, file_obj, content_type, size, num_retries)
self._set_properties(created_json)
except resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)
Expand Down Expand Up @@ -1004,7 +1035,7 @@ def create_resumable_upload_session(
# to the `ResumableUpload` constructor. The chunk size only
# matters when **sending** bytes to an upload.
upload, _ = self._initiate_resumable_upload(
client, dummy_stream, content_type, size,
client, dummy_stream, content_type, size, None,
extra_headers=extra_headers,
chunk_size=self._CHUNK_SIZE_MULTIPLE)

Expand Down
2 changes: 1 addition & 1 deletion storage/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
REQUIREMENTS = [
'google-cloud-core >= 0.24.1, < 0.25dev',
'google-auth >= 1.0.0',
'google-resumable-media >= 0.1.0',
'google-resumable-media >= 0.1.1',
'requests >= 2.0.0',
]

Expand Down
52 changes: 39 additions & 13 deletions storage/tests/unit/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ def _mock_transport(self, status_code, headers, content=b''):
fake_transport.request.return_value = fake_response
return fake_transport

def _do_multipart_success(self, mock_get_boundary, size=None):
def _do_multipart_success(self, mock_get_boundary, size=None,
num_retries=None):
bucket = mock.Mock(path='/b/w00t', spec=[u'path'])
blob = self._make_one(u'blob-name', bucket=bucket)
self.assertIsNone(blob.chunk_size)
Expand All @@ -777,7 +778,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None):
stream = io.BytesIO(data)
content_type = u'application/xml'
response = blob._do_multipart_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

# Check the mocks and the returned value.
self.assertIs(response, fake_transport.request.return_value)
Expand Down Expand Up @@ -817,6 +818,11 @@ def test__do_multipart_upload_no_size(self, mock_get_boundary):
def test__do_multipart_upload_with_size(self, mock_get_boundary):
self._do_multipart_success(mock_get_boundary, size=10)

@mock.patch(u'google.resumable_media._upload.get_boundary',
return_value=b'==0==')
def test__do_multipart_upload_with_retry(self, mock_get_boundary):

This comment was marked as spam.

This comment was marked as spam.

self._do_multipart_success(mock_get_boundary, num_retries=8)

def test__do_multipart_upload_bad_size(self):
blob = self._make_one(u'blob-name', bucket=None)

Expand All @@ -826,15 +832,15 @@ def test__do_multipart_upload_bad_size(self):
self.assertGreater(size, len(data))

with self.assertRaises(ValueError) as exc_info:
blob._do_multipart_upload(None, stream, None, size)
blob._do_multipart_upload(None, stream, None, size, None)

exc_contents = str(exc_info.exception)
self.assertIn(
'was specified but the file-like object only had', exc_contents)
self.assertEqual(stream.tell(), len(data))

def _initiate_resumable_helper(self, size=None, extra_headers=None,
chunk_size=None):
chunk_size=None, num_retries=None):
from google.resumable_media.requests import ResumableUpload

bucket = mock.Mock(path='/b/whammy', spec=[u'path'])
Expand Down Expand Up @@ -862,7 +868,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
stream = io.BytesIO(data)
content_type = u'text/plain'
upload, transport = blob._initiate_resumable_upload(
client, stream, content_type, size,
client, stream, content_type, size, num_retries,
extra_headers=extra_headers, chunk_size=chunk_size)

# Check the returned values.
Expand Down Expand Up @@ -890,6 +896,14 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None,
self.assertEqual(upload._total_bytes, size)
self.assertEqual(upload._content_type, content_type)
self.assertEqual(upload.resumable_url, resumable_url)
retry_strategy = upload._retry_strategy
self.assertEqual(retry_strategy.max_sleep, 64.0)
if num_retries is None:
self.assertEqual(retry_strategy.max_cumulative_retry, 600.0)
self.assertIsNone(retry_strategy.max_retries)
else:
self.assertIsNone(retry_strategy.max_cumulative_retry)
self.assertEqual(retry_strategy.max_retries, num_retries)
self.assertIs(transport, fake_transport)
# Make sure we never read from the stream.
self.assertEqual(stream.tell(), 0)
Expand Down Expand Up @@ -923,6 +937,9 @@ def test__initiate_resumable_upload_with_extra_headers(self):
extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'}
self._initiate_resumable_helper(extra_headers=extra_headers)

def test__initiate_resumable_upload_with_retry(self):
self._initiate_resumable_helper(num_retries=11)

def _make_resumable_transport(self, headers1, headers2,
headers3, total_bytes):
from google import resumable_media
Expand Down Expand Up @@ -990,7 +1007,7 @@ def _do_resumable_upload_call2(blob, content_type, data,
return mock.call(
'PUT', resumable_url, data=payload, headers=expected_headers)

def _do_resumable_helper(self, use_size=False):
def _do_resumable_helper(self, use_size=False, num_retries=None):
bucket = mock.Mock(path='/b/yesterday', spec=[u'path'])
blob = self._make_one(u'blob-name', bucket=bucket)
blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE
Expand All @@ -1017,7 +1034,7 @@ def _do_resumable_helper(self, use_size=False):
stream = io.BytesIO(data)
content_type = u'text/html'
response = blob._do_resumable_upload(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

# Check the returned values.
self.assertIs(response, responses[2])
Expand All @@ -1039,7 +1056,10 @@ def test__do_resumable_upload_no_size(self):
def test__do_resumable_upload_with_size(self):
self._do_resumable_helper(use_size=True)

def _do_upload_helper(self, chunk_size=None):
def test__do_resumable_upload_with_retry(self):
self._do_resumable_helper(num_retries=6)

def _do_upload_helper(self, chunk_size=None, num_retries=None):
blob = self._make_one(u'blob-name', bucket=None)

# Create a fake response.
Expand All @@ -1061,17 +1081,18 @@ def _do_upload_helper(self, chunk_size=None):
size = 12345654321

# Make the request and check the mocks.
created_json = blob._do_upload(client, stream, content_type, size)
created_json = blob._do_upload(
client, stream, content_type, size, num_retries)
self.assertIs(created_json, mock.sentinel.json)
response.json.assert_called_once_with()
if chunk_size is None:
blob._do_multipart_upload.assert_called_once_with(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)
blob._do_resumable_upload.assert_not_called()
else:
blob._do_multipart_upload.assert_not_called()
blob._do_resumable_upload.assert_called_once_with(
client, stream, content_type, size)
client, stream, content_type, size, num_retries)

def test__do_upload_without_chunk_size(self):
self._do_upload_helper()
Expand All @@ -1080,6 +1101,9 @@ def test__do_upload_with_chunk_size(self):
chunk_size = 1024 * 1024 * 1024 # 1GB
self._do_upload_helper(chunk_size=chunk_size)

def test__do_upload_with_retry(self):
self._do_upload_helper(num_retries=20)

def _upload_from_file_helper(self, side_effect=None, **kwargs):
from google.cloud._helpers import UTC

Expand Down Expand Up @@ -1109,8 +1133,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs):
self.assertEqual(blob.updated, new_updated)

# Check the mock.
num_retries = kwargs.get('num_retries')
blob._do_upload.assert_called_once_with(
client, stream, content_type, len(data))
client, stream, content_type, len(data), num_retries)

return stream

Expand Down Expand Up @@ -1151,10 +1176,11 @@ def _do_upload_mock_call_helper(self, blob, client, content_type, size):
mock_call = blob._do_upload.mock_calls[0]
call_name, pos_args, kwargs = mock_call
self.assertEqual(call_name, '')
self.assertEqual(len(pos_args), 4)
self.assertEqual(len(pos_args), 5)
self.assertEqual(pos_args[0], client)
self.assertEqual(pos_args[2], content_type)
self.assertEqual(pos_args[3], size)
self.assertIsNone(pos_args[4]) # num_retries
self.assertEqual(kwargs, {})

return pos_args[1]
Expand Down