-
Notifications
You must be signed in to change notification settings - Fork 6
Retry streaming read errors in bundle download #98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |||||||||||||||||||||||
| from io import open | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import requests | ||||||||||||||||||||||||
| from requests.packages.urllib3.exceptions import ProtocolError, DecodeError, ReadTimeoutError | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| from ..util import SwaggerClient | ||||||||||||||||||||||||
| from ..util.exceptions import SwaggerAPIException | ||||||||||||||||||||||||
|
|
@@ -37,24 +38,24 @@ def download(self, bundle_uuid, replica, version="", dest_name=""): | |||||||||||||||||||||||
| file_uuid = file_["uuid"] | ||||||||||||||||||||||||
| filename = file_.get("name", file_uuid) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| logger.info("File %s: Retrieving...", filename) | ||||||||||||||||||||||||
| response = self.get_file._request(dict(uuid=file_uuid, replica=replica), stream=True) | ||||||||||||||||||||||||
| logger.info("Retrieving file %s", filename) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| file_path = os.path.join(dest_name, filename) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| retries = self.get_session().adapters["https://"].max_retries | ||||||||||||||||||||||||
| # When streaming response data, requests/urllib3 does not obey its usual retry logic, so we reenact it here. | ||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| if response.ok: | ||||||||||||||||||||||||
| file_path = os.path.join(dest_name, filename) | ||||||||||||||||||||||||
| with self.get_file.stream(uuid=file_uuid, replica=replica) as fh, open(file_path, "wb") as dest_fh: | ||||||||||||||||||||||||
| logger.info("%s", "File {}: GET SUCCEEDED. Writing to disk.".format(filename)) | ||||||||||||||||||||||||
| with open(file_path, "wb") as fh: | ||||||||||||||||||||||||
| for chunk in response.iter_content(chunk_size=1024*1024): | ||||||||||||||||||||||||
| if chunk: | ||||||||||||||||||||||||
| fh.write(chunk) | ||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||
| chunk = fh.raw.read(1024*1024) | ||||||||||||||||||||||||
| if len(chunk) == 0: | ||||||||||||||||||||||||
| break | ||||||||||||||||||||||||
| dest_fh.write(chunk) | ||||||||||||||||||||||||
| logger.info("%s", "File {}: GET SUCCEEDED. Stored at {}.".format(filename, file_path)) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||
| logger.error("%s", "File {}: GET FAILED.".format(filename)) | ||||||||||||||||||||||||
| logger.error("%s", "Response: {}".format(response.text)) | ||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think you still want the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR switches from using Lines 154 to 164 in b4ddfe2
With this logic in place, the |
||||||||||||||||||||||||
| response.close() | ||||||||||||||||||||||||
| except (ProtocolError, DecodeError, ReadTimeoutError) as e: | ||||||||||||||||||||||||
| logger.error(e) | ||||||||||||||||||||||||
| retries = retries.increment(method="GET", error=e) | ||||||||||||||||||||||||
| return {} | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def upload(self, src_dir, replica, staging_bucket, timeout_seconds=1200): | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure I agree with this, but I don't know what's better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is retrieving the retry policy that the request logic ought to apply, and applying it manually. Arguably all this machinery might be better off in the client class, but we can move it there later. Do you have an issue with the policies being the same, or...?