Skip to content

Commit

Permalink
fix(upload): change to use exponential backup
Browse files Browse the repository at this point in the history
  • Loading branch information
philloooo committed Oct 30, 2015
1 parent a55399c commit 4550e94
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 27 deletions.
59 changes: 34 additions & 25 deletions gdc_client/upload/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import time
import copy
from ..log import get_logger
from ..defaults import part_size

MAX_RETRIES = 10
MAX_TIMEOUT = 60

OS_WINDOWS = platform.system() == 'Windows'

Expand All @@ -29,11 +30,11 @@
# needed for forking to work
freeze_support()


from multiprocessing.pool import ThreadPool as Pool

# Fake multiprocessing manager namespace
class FakeNamespace(object):

def __init__(self):
self.completed = 0

Expand Down Expand Up @@ -66,7 +67,7 @@ def read(self, num):

def upload_multipart(filename, offset, bytes, url, upload_id, part_number,
headers, verify=True, pbar=None, ns=None):
tries = 10
tries = MAX_RETRIES
while tries > 0:
try:
log.debug("Start upload part {}".format(part_number))
Expand Down Expand Up @@ -99,28 +100,36 @@ def upload_multipart(filename, offset, bytes, url, upload_id, part_number,
log.debug("Finish upload part {}".format(part_number))
return True
else:
time.sleep(2)
time.sleep(get_sleep_time(tries))

tries -= 1
log.debug("Retry upload part {}, {}".format(part_number, res.text))
log.debug(
"Retry upload part {}, {}".format(part_number, res.text))

except:
time.sleep(2)
time.sleep(get_sleep_time(tries))
tries -= 1
return False


def get_sleep_time(tries):
timeout = (min(MAX_TIMEOUT, 2**(MAX_RETRIES-tries)))
return timeout * (0.5 + random.random()/2)


class GDCUploadClient(object):

def __init__(self, token, processes, server,
multipart=True, debug=False, part_size=part_size,
def __init__(self, token, processes, server, part_size,
multipart=True, debug=False,
files={}, verify=True, manifest_name=None):
self.headers = {'X-Auth-Token': token}
self.manifest_name = manifest_name
self.verify = verify
if OS_WINDOWS:
try:
# this only works in executable built by pyinstaller
self.verify = os.path.join(sys._MEIPASS, 'cacert.pem') if verify else verify
self.verify = os.path.join(
sys._MEIPASS, 'cacert.pem') if verify else verify
except:
print 'Using system default CA'
self.files = files
Expand All @@ -131,11 +140,9 @@ def __init__(self, token, processes, server,
self.debug = debug
self.processes = processes
self.part_size = (part_size/PAGESIZE+1)*PAGESIZE
self.retries = 10
self._metadata = None
self.resume_path = None


@property
def metadata(self):
return self._metadata or self.get_metadata(self.node_id)
Expand All @@ -147,13 +154,13 @@ def get_metadata(self, id):
try:
self._metadata = None
query = {'query':
"""query Files { node (id: "%s")
{ project_id, file_name }}""" %id}
"""query Files { node (id: "%s")
{ project_id, file_name }}""" % id}
r = requests.post(
urljoin(self.server, "v0/submission/graphql"),
headers=self.headers,
data=json.dumps(query),
verify=self.verify)
urljoin(self.server, "v0/submission/graphql"),
headers=self.headers,
data=json.dumps(query),
verify=self.verify)
if r.status_code == 404:
raise Exception("File with id {} not found".format(id))
elif r.status_code == 200:
Expand Down Expand Up @@ -186,12 +193,12 @@ def get_file(self, f, action='download'):

self.path = f.get('path') or '.'
self.filename = f.get('file_name') or self.metadata['file_name']
self.file_path = os.path.join(self.path, self.filename)
self.file_path = os.path.join(self.path, self.filename)
self.file = open(self.file_path, 'rb')

self.file_size = os.fstat(self.file.fileno()).st_size
self.upload_id = f.get('upload_id')

except KeyError as e:
raise KeyError(
"Please provide {} from manifest or as an argument"
Expand Down Expand Up @@ -243,7 +250,7 @@ def delete(self):

def _upload(self):
'''Simple S3 PUT'''

with open(self.file_path, 'rb') as f:
try:
# r = requests.put(self.url+"/_dry_run", headers=self.headers, verify=self.verify)
Expand Down Expand Up @@ -300,12 +307,13 @@ def handle_multipart(self):
print "Failure:", e.message

def check_multipart(self):
tries = self.retries
tries = MAX_RETRIES

while tries:
if self.list_parts() is None:
tries -= 1
time.sleep(2)
time.sleep(get_sleep_time(tries))

else:
return
raise Exception(
Expand Down Expand Up @@ -380,11 +388,11 @@ def complete(self):
raise Exception(
"""Multipart upload failed for file {}:
completed parts:{}, total parts: {}, please try to resume"""
.format(self.node_id,self.ns.completed, self.total_parts))
.format(self.node_id, self.ns.completed, self.total_parts))

self.pbar.finish()
url = self.url+"?uploadId={}".format(self.upload_id)
tries = self.retries
tries = MAX_RETRIES
tries = 1
while tries > 0:
r = requests.post(url,
Expand All @@ -393,7 +401,8 @@ def complete(self):
verify=self.verify)
if r.status_code != 200:
tries -= 1
time.sleep(2)
time.sleep(get_sleep_time(tries))

else:
print "Multipart upload finished for file {}".format(self.node_id)
return
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
package_data={},
install_requires=[
'parcel',
'lxml',
'pyyaml',
'lxml==3.5.0b1',
'PyYAML==3.11',
],
dependency_links=[
'git+ssh://git@github.com/LabAdvComp/parcel.git@c51523de7088208ac6a559283644035f3ea1ea7b#egg=parcel',
Expand Down

0 comments on commit 4550e94

Please sign in to comment.