Skip to content

Commit

Permalink
Adding support for incremental md5 computation for resumed uploads
Browse files Browse the repository at this point in the history
Catching md5sum" up with previously uploaded content to seed it for
the actual transfer, and then incrementally updating the md5sum as
bytes are uploaded. Also adding md5sum rollback support in the event
of an retryable exception. In addition to speedying up resumed uploads
(especially where the previous uploaded portion is small with respect
to the file size), this also simplfies the md5 logic as we use the
incremental md5 in all cases now.
  • Loading branch information
Evan Worley committed Apr 26, 2012
1 parent 89fa7b2 commit 60cc631
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions boto/gs/resumable_upload_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def __init__(self, tracker_file_name=None, num_retries=None):
self.num_retries = num_retries
self.server_has_bytes = 0 # Byte count at last server check.
self.tracker_uri = None
self.incremental_md5 = False
if tracker_file_name:
self._load_tracker_uri_from_file()
# Save upload_start_point in instance state so caller can find how
Expand Down Expand Up @@ -251,7 +250,6 @@ def _start_new_resumable_upload(self, key, headers=None):
if conn.debug >= 1:
print 'Starting new resumable upload.'
self.server_has_bytes = 0
self.incremental_md5 = True

# Start a new resumable upload by sending a POST request with an
# empty body and the "X-Goog-Resumable: start" header. Include any
Expand Down Expand Up @@ -399,9 +397,17 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
self._query_server_pos(conn, file_length))
self.server_has_bytes = server_start

# Cannot use incremental md5 calculation if the server already has some of the data.
if server_end:
self.incremental_md5 = False
# If the server already has some of the content, we need to update the md5 with
# the bytes that have already been uploaded to ensure we get a complete hash in
# the end.
print 'Catching up md5 for resumed upload'
fp.seek(0)
bytes_to_go = server_end + 1
while bytes_to_go:
chunk = fp.read(min(key.BufferSize, bytes_to_go))
md5sum.update(chunk)
bytes_to_go -= len(chunk)

key=key

This comment has been minimized.

Copy link
@mfschwartz

mfschwartz Apr 26, 2012

could you please delete "key=key"?
It's leftover from some debugging I did a while back; I often do something like:

if condition_being_debugged:
import pdb;pdb.set_trace()
key=key

so I'll break at that point.

Thanks.

if conn.debug >= 1:
Expand Down Expand Up @@ -540,20 +546,14 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):

while True: # Retry as long as we're making progress.
server_had_bytes_before_attempt = self.server_has_bytes
md5sum_before_attempt = md5sum.copy()
try:
etag = self._attempt_resumable_upload(key, fp, file_length,
headers, cb, num_cb, md5sum)

# If this is a resumed upload we need to recompute the MD5 from scratch.
if self.incremental_md5:
hd = md5sum.hexdigest()
key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
else:
print 'Recomputing MD5 from scratch for resumed upload'
fp.seek(0)
hd = key.compute_md5(fp)
key.md5 = hd[0]
key.base64md5 = hd[1]
# Get the final md5 for the uploaded content.
hd = md5sum.hexdigest()
key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)

# Upload succceded, so remove the tracker file (if have one).
self._remove_tracker_file()
Expand Down Expand Up @@ -591,11 +591,14 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
if debug >= 1:
print('Caught ResumableUploadException (%s) - will '
'retry' % e.message)

# At this point we had a re-tryable failure; see if made progress.
if self.server_has_bytes > server_had_bytes_before_attempt:
progress_less_iterations = 0
else:
# Rollback any potential md5sum updates, as we did not
# make any progress in this iteration.
md5sum = md5sum_before_attempt

progress_less_iterations += 1

if progress_less_iterations > self.num_retries:
Expand Down

3 comments on commit 60cc631

@mfschwartz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evan,

Nice improvement, making separate-process resume use incremental md5 computation.

I think the else at line 597 is sufficiently important that it would be good to add a test for it. Can you please take a look at tests/s3/test_resumable_uploads.py? I implemented a CallbackTestHarnass that lets you force exceptions to occur partway through an upload - so it shouldn't be too hard to exercise this case.

Thanks again.

@evanworley
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Mike,

I agree that else is important. There is already a test which exposes this feature (https://github.com/boto/boto/blob/develop/tests/s3/test_resumable_uploads.py#L354).

That test was failing until I added the "md5sum rollback" support.

Is that sufficient, or would you like some other type of of test added?

Thanks,
Evan

@mfschwartz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, test_upload_with_inital_partial_upload_before_failure does test that case.

Code looks good, and all tests pass for me too. I just added one tiny comment - once you do that I'll commit this change.

Thanks again.

Please sign in to comment.