From 2f71ade821ffee2bd111c957780dcda3fbb1b6be Mon Sep 17 00:00:00 2001 From: Tony Tung Date: Tue, 1 May 2018 08:13:33 -0700 Subject: [PATCH] Add HTTP resume to DCP CLI download. This allows us to do ranged gets rather than restart the entire download. Verify the resulting download for extra correctness. Test plan: Start a download, yank the ethernet cable, plug it back in, and watch the download succeed. --- hca/dss/__init__.py | 98 ++++++++++++++++++++++++++++++++++++-------- hca/util/__init__.py | 2 +- 2 files changed, 83 insertions(+), 17 deletions(-) diff --git a/hca/dss/__init__.py b/hca/dss/__init__.py index ac5d7ef8..7c1358a8 100644 --- a/hca/dss/__init__.py +++ b/hca/dss/__init__.py @@ -1,18 +1,21 @@ from __future__ import absolute_import, division, print_function, unicode_literals +import hashlib import os +import re import time import uuid from io import open import requests -from requests.packages.urllib3.exceptions import ProtocolError, DecodeError, ReadTimeoutError +from requests.exceptions import ChunkedEncodingError, ConnectionError, ReadTimeout from ..util import SwaggerClient from ..util.exceptions import SwaggerAPIException from .. import logger from .upload_to_cloud import upload_to_cloud + class DSSClient(SwaggerClient): """ Client for the Data Storage Service API. @@ -22,7 +25,7 @@ def __init__(self, *args, **kwargs): super(DSSClient, self).__init__(*args, **kwargs) self.commands += [self.download, self.upload] - def download(self, bundle_uuid, replica, version="", dest_name=""): + def download(self, bundle_uuid, replica, version="", dest_name="", initial_retries_left=10, min_delay_seconds=0.25): """ Download a bundle and save it to the local filesystem as a directory. """ @@ -38,24 +41,87 @@ def download(self, bundle_uuid, replica, version="", dest_name=""): file_uuid = file_["uuid"] filename = file_.get("name", file_uuid) - logger.info("Retrieving file %s", filename) - + logger.info("File %s: Retrieving...", filename) file_path = os.path.join(dest_name, filename) - retries = self.get_session().adapters["https://"].max_retries - # When streaming response data, requests/urllib3 does not obey its usual retry logic, so we reenact it here. - try: - with self.get_file.stream(uuid=file_uuid, replica=replica) as fh, open(file_path, "wb") as dest_fh: - logger.info("%s", "File {}: GET SUCCEEDED. Writing to disk.".format(filename)) - while True: - chunk = fh.raw.read(1024*1024) - if len(chunk) == 0: + # Attempt to download the data. If a retryable exception occurs, we wait a bit and retry again. The delay + # increases each time we fail and decreases each time we successfully read a block. We set a quota for the + # number of failures that goes up with every successful block read and down with each failure. + # + # If we can, we will attempt HTTP resume. However, we verify that the server supports HTTP resume. If the + # ranged get doesn't yield the correct header, then we start over. + delay = min_delay_seconds + retries_left = initial_retries_left + hasher = hashlib.sha256() + with open(file_path, "wb") as fh: + while True: + try: + response = self.get_file._request( + dict(uuid=file_uuid, replica=replica), + stream=True, + headers={ + 'Range': "bytes={}-".format(fh.tell()) + }, + ) + try: + if not response.ok: + logger.error("%s", "File {}: GET FAILED.".format(filename)) + logger.error("%s", "Response: {}".format(response.text)) + break + + consume_bytes = int(fh.tell()) + server_start = 0 + content_range_header = response.headers.get('Content-Range', None) + if content_range_header is not None: + cre = re.compile("bytes (\d+)-(\d+)") + mo = cre.search(content_range_header) + if mo is not None: + server_start = int(mo.group(1)) + + consume_bytes -= server_start + assert consume_bytes >= 0 + if server_start > 0 and consume_bytes == 0: + logger.info("%s", "File {}: Resuming at {}.".format( + filename, server_start)) + elif consume_bytes > 0: + logger.info("%s", "File {}: Resuming at {}. Dropping {} bytes to match".format( + filename, server_start, consume_bytes)) + + while consume_bytes > 0: + bytes_to_read = min(consume_bytes, 1024*1024) + content = response.iter_content(chunk_size=bytes_to_read) + chunk = next(content) + if chunk: + consume_bytes -= len(chunk) + + for chunk in response.iter_content(chunk_size=1024*1024): + if chunk: + fh.write(chunk) + hasher.update(chunk) + retries_left = min(retries_left + 1, initial_retries_left) + delay = max(delay / 2, min_delay_seconds) break - dest_fh.write(chunk) + finally: + response.close() + except (ChunkedEncodingError, ConnectionError, ReadTimeout): + if retries_left > 0: + # resume!! + logger.info("%s", "File {}: GET FAILED. Attempting to resume.".format( + filename, file_path)) + time.sleep(delay) + delay *= 2 + retries_left -= 1 + continue + raise + + if hasher.hexdigest().lower() != file_["sha256"].lower(): + os.remove(file_path) + logger.error("%s", "File {}: GET FAILED. Checksum mismatch.".format(filename)) + raise ValueError("Expected sha256 {} Received sha256 {}".format( + file_["sha256"].lower(), hasher.hexdigest().lower())) + else: logger.info("%s", "File {}: GET SUCCEEDED. Stored at {}.".format(filename, file_path)) - except (ProtocolError, DecodeError, ReadTimeoutError) as e: - logger.error(e) - retries = retries.increment(method="GET", error=e) + return {} def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200): diff --git a/hca/util/__init__.py b/hca/util/__init__.py index 6739a156..cabc9d11 100755 --- a/hca/util/__init__.py +++ b/hca/util/__init__.py @@ -108,7 +108,7 @@ def __init__(self, retry_after_status_codes={301}, **kwargs): class _ClientMethodFactory(object): retry_policy = RetryPolicy(read=10, status=10, status_forcelist=frozenset({500, 502, 503, 504})) - timeout_policy = timeout.Timeout(connect=60, read=120) + timeout_policy = timeout.Timeout(connect=60, read=10) def __init__(self, client, parameters, path_parameters, http_method, method_name, method_data, body_props): self.__dict__.update(locals())