Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 82 additions & 16 deletions hca/dss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import hashlib
import os
import re
import time
import uuid
from io import open

import requests
from requests.packages.urllib3.exceptions import ProtocolError, DecodeError, ReadTimeoutError
from requests.exceptions import ChunkedEncodingError, ConnectionError, ReadTimeout

from ..util import SwaggerClient
from ..util.exceptions import SwaggerAPIException
from .. import logger
from .upload_to_cloud import upload_to_cloud


class DSSClient(SwaggerClient):
"""
Client for the Data Storage Service API.
Expand All @@ -22,7 +25,7 @@ def __init__(self, *args, **kwargs):
super(DSSClient, self).__init__(*args, **kwargs)
self.commands += [self.download, self.upload]

def download(self, bundle_uuid, replica, version="", dest_name=""):
def download(self, bundle_uuid, replica, version="", dest_name="", initial_retries_left=10, min_delay_seconds=0.25):
"""
Download a bundle and save it to the local filesystem as a directory.
"""
Expand All @@ -38,24 +41,87 @@ def download(self, bundle_uuid, replica, version="", dest_name=""):
file_uuid = file_["uuid"]
filename = file_.get("name", file_uuid)

logger.info("Retrieving file %s", filename)

logger.info("File %s: Retrieving...", filename)
file_path = os.path.join(dest_name, filename)

retries = self.get_session().adapters["https://"].max_retries
# When streaming response data, requests/urllib3 does not obey its usual retry logic, so we reenact it here.
try:
with self.get_file.stream(uuid=file_uuid, replica=replica) as fh, open(file_path, "wb") as dest_fh:
logger.info("%s", "File {}: GET SUCCEEDED. Writing to disk.".format(filename))
while True:
chunk = fh.raw.read(1024*1024)
if len(chunk) == 0:
# Attempt to download the data. If a retryable exception occurs, we wait a bit and retry again. The delay
# increases each time we fail and decreases each time we successfully read a block. We set a quota for the
# number of failures that goes up with every successful block read and down with each failure.
#
# If we can, we will attempt HTTP resume. However, we verify that the server supports HTTP resume. If the
# ranged get doesn't yield the correct header, then we start over.
delay = min_delay_seconds
retries_left = initial_retries_left
hasher = hashlib.sha256()
with open(file_path, "wb") as fh:
while True:
try:
response = self.get_file._request(
dict(uuid=file_uuid, replica=replica),
stream=True,
headers={
'Range': "bytes={}-".format(fh.tell())
},
)
try:
if not response.ok:
logger.error("%s", "File {}: GET FAILED.".format(filename))
logger.error("%s", "Response: {}".format(response.text))
break

consume_bytes = int(fh.tell())
server_start = 0
content_range_header = response.headers.get('Content-Range', None)
if content_range_header is not None:
cre = re.compile("bytes (\d+)-(\d+)")
mo = cre.search(content_range_header)
if mo is not None:
server_start = int(mo.group(1))

consume_bytes -= server_start
assert consume_bytes >= 0
if server_start > 0 and consume_bytes == 0:
logger.info("%s", "File {}: Resuming at {}.".format(
filename, server_start))
elif consume_bytes > 0:
logger.info("%s", "File {}: Resuming at {}. Dropping {} bytes to match".format(
filename, server_start, consume_bytes))

while consume_bytes > 0:
bytes_to_read = min(consume_bytes, 1024*1024)
content = response.iter_content(chunk_size=bytes_to_read)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you use response.iter_content instead of response.raw.read?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a documented API. response.raw.read is not.

Also, the implementation of response.iter_content seems to branch on whether the backend is urllib3 or not, and in the case of urllib3, it doesn't even call response.raw.read.

Copy link
Copy Markdown
Member

@kislyuk kislyuk May 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm ok. I think it is documented (http://docs.python-requests.org/en/master/api/#requests.Response.raw and http://urllib3.readthedocs.io/en/latest/reference/index.html#urllib3.response.HTTPResponse.read) and I'm vaguely unhappy with the extra indirection happening here for no obvious benefit, but I guess it's OK.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So as an example, iter_content(..) in requests calls raw.stream(decode_content=True) for urllib3. If I just call read(..), it will not call with decode_content=True.

chunk = next(content)
if chunk:
consume_bytes -= len(chunk)

for chunk in response.iter_content(chunk_size=1024*1024):
if chunk:
fh.write(chunk)
hasher.update(chunk)
retries_left = min(retries_left + 1, initial_retries_left)
delay = max(delay / 2, min_delay_seconds)
break
dest_fh.write(chunk)
finally:
response.close()
except (ChunkedEncodingError, ConnectionError, ReadTimeout):
if retries_left > 0:
# resume!!
logger.info("%s", "File {}: GET FAILED. Attempting to resume.".format(
filename, file_path))
time.sleep(delay)
delay *= 2
retries_left -= 1
continue
raise

if hasher.hexdigest().lower() != file_["sha256"].lower():
os.remove(file_path)
logger.error("%s", "File {}: GET FAILED. Checksum mismatch.".format(filename))
raise ValueError("Expected sha256 {} Received sha256 {}".format(
file_["sha256"].lower(), hasher.hexdigest().lower()))
else:
logger.info("%s", "File {}: GET SUCCEEDED. Stored at {}.".format(filename, file_path))
except (ProtocolError, DecodeError, ReadTimeoutError) as e:
logger.error(e)
retries = retries.increment(method="GET", error=e)

return {}

def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200):
Expand Down
2 changes: 1 addition & 1 deletion hca/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def __init__(self, retry_after_status_codes={301}, **kwargs):

class _ClientMethodFactory(object):
retry_policy = RetryPolicy(read=10, status=10, status_forcelist=frozenset({500, 502, 503, 504}))
timeout_policy = timeout.Timeout(connect=60, read=120)
timeout_policy = timeout.Timeout(connect=60, read=10)

def __init__(self, client, parameters, path_parameters, http_method, method_name, method_data, body_props):
self.__dict__.update(locals())
Expand Down