Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Incorporate marcacohen's fixes for computing checksum when downloadin…
Browse files Browse the repository at this point in the history
…g large files on Windows
  • Loading branch information
mfschwartz committed Dec 12, 2011
1 parent dfc9809 commit f83da62
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 117 deletions.
24 changes: 11 additions & 13 deletions boto/s3/key.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from boto.provider import Provider from boto.provider import Provider
from boto.s3.user import User from boto.s3.user import User
from boto import UserAgent from boto import UserAgent
from boto.utils import compute_md5
try: try:
from hashlib import md5 from hashlib import md5
except ImportError: except ImportError:
Expand Down Expand Up @@ -595,19 +596,16 @@ def compute_md5(self, fp):
as the first element and the base64 encoded version of the as the first element and the base64 encoded version of the
plain digest as the second element. plain digest as the second element.
""" """
m = md5() tup = compute_md5(fp)
fp.seek(0) # Returned values are MD5 hash, base64 encoded MD5 hash, and file size.
s = fp.read(self.BufferSize) # The internal implementation of compute_md5() needs to return the
while s: # file size but we don't want to return that value to the external
m.update(s) # caller because it changes the class interface (i.e. it might
s = fp.read(self.BufferSize) # break some code) so we consume the third tuple value here and
hex_md5 = m.hexdigest() # return the remainder of the tuple to the caller, thereby preserving
base64md5 = base64.encodestring(m.digest()) # the existing interface.
if base64md5[-1] == '\n': self.size = tup[2]
base64md5 = base64md5[0:-1] return tup[0:2]
self.size = fp.tell()
fp.seek(0)
return (hex_md5, base64md5)


def set_contents_from_stream(self, fp, headers=None, replace=True, def set_contents_from_stream(self, fp, headers=None, replace=True,
cb=None, num_cb=10, policy=None, cb=None, num_cb=10, policy=None,
Expand Down
26 changes: 4 additions & 22 deletions boto/s3/resumable_download_handler.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -212,27 +212,6 @@ def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
override_num_retries=0) override_num_retries=0)
fp.flush() fp.flush()


def _check_final_md5(self, key, file_name):
"""
Checks that etag from server agrees with md5 computed after the
download completes. This is important, since the download could
have spanned a number of hours and multiple processes (e.g.,
gsutil runs), and the user could change some of the file and not
realize they have inconsistent data.
"""
fp = open(file_name, 'r')
if key.bucket.connection.debug >= 1:
print 'Checking md5 against etag.'
hex_md5 = key.compute_md5(fp)[0]
if hex_md5 != key.etag.strip('"\''):
file_name = fp.name
fp.close()
os.unlink(file_name)
raise ResumableDownloadException(
'File changed during download: md5 signature doesn\'t match '
'etag (incorrect downloaded file deleted)',
ResumableTransferDisposition.ABORT)

def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False, def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
version_id=None): version_id=None):
""" """
Expand Down Expand Up @@ -287,7 +266,10 @@ def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
torrent, version_id) torrent, version_id)
# Download succceded, so remove the tracker file (if have one). # Download succceded, so remove the tracker file (if have one).
self._remove_tracker_file() self._remove_tracker_file()
self._check_final_md5(key, fp.name) # Previously, check_final_md5() was called here to validate
# downloaded file's checksum, however, to be consistent with
# non-resumable downloads, this call was removed. Checksum
# validation of file contents should be done by the caller.
if debug >= 1: if debug >= 1:
print 'Resumable download complete.' print 'Resumable download complete.'
return return
Expand Down
37 changes: 37 additions & 0 deletions boto/utils.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
from email.Utils import formatdate from email.Utils import formatdate
from email import Encoders from email import Encoders
import gzip import gzip
import base64
try:
from hashlib import md5
except ImportError:
from md5 import md5




try: try:
Expand Down Expand Up @@ -689,3 +694,35 @@ def guess_mime_type(content, deftype):
rtype = mimetype rtype = mimetype
break break
return(rtype) return(rtype)

def compute_md5(fp, buf_size=8192):
"""
Compute MD5 hash on passed file and return results in a tuple of values.
:type fp: file
:param fp: File pointer to the file to MD5 hash. The file pointer
will be reset to the beginning of the file before the
method returns.
:type buf_size: integer
:param buf_size: Number of bytes per read request.
:rtype: tuple
:return: A tuple containing the hex digest version of the MD5 hash
as the first element, the base64 encoded version of the
plain digest as the second element and the file size as
the third element.
"""
m = md5()
fp.seek(0)
s = fp.read(buf_size)
while s:
m.update(s)
s = fp.read(buf_size)
hex_md5 = m.hexdigest()
base64md5 = base64.encodestring(m.digest())
if base64md5[-1] == '\n':
base64md5 = base64md5[0:-1]
file_size = fp.tell()
fp.seek(0)
return (hex_md5, base64md5, file_size)
43 changes: 43 additions & 0 deletions tests/s3/mock_storage_service.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@


import copy import copy
import boto import boto
import base64
from boto.utils import compute_md5

try:
from hashlib import md5
except ImportError:
from md5 import md5


NOT_IMPL = None NOT_IMPL = None


Expand All @@ -53,10 +60,12 @@ def __init__(self, bucket=None, name=None):
self.bucket = bucket self.bucket = bucket
self.name = name self.name = name
self.data = None self.data = None
self.etag = None
self.size = None self.size = None
self.content_encoding = None self.content_encoding = None
self.content_type = None self.content_type = None
self.last_modified = 'Wed, 06 Oct 2010 05:11:54 GMT' self.last_modified = 'Wed, 06 Oct 2010 05:11:54 GMT'
self.BufferSize = 8192


def get_contents_as_string(self, headers=NOT_IMPL, def get_contents_as_string(self, headers=NOT_IMPL,
cb=NOT_IMPL, num_cb=NOT_IMPL, cb=NOT_IMPL, num_cb=NOT_IMPL,
Expand Down Expand Up @@ -93,13 +102,15 @@ def set_contents_from_file(self, fp, headers=None, replace=NOT_IMPL,
policy=NOT_IMPL, md5=NOT_IMPL, policy=NOT_IMPL, md5=NOT_IMPL,
res_upload_handler=NOT_IMPL): res_upload_handler=NOT_IMPL):
self.data = fp.read() self.data = fp.read()
self.set_etag()
self.size = len(self.data) self.size = len(self.data)
self._handle_headers(headers) self._handle_headers(headers)


def set_contents_from_string(self, s, headers=NOT_IMPL, replace=NOT_IMPL, def set_contents_from_string(self, s, headers=NOT_IMPL, replace=NOT_IMPL,
cb=NOT_IMPL, num_cb=NOT_IMPL, policy=NOT_IMPL, cb=NOT_IMPL, num_cb=NOT_IMPL, policy=NOT_IMPL,
md5=NOT_IMPL, reduced_redundancy=NOT_IMPL): md5=NOT_IMPL, reduced_redundancy=NOT_IMPL):
self.data = copy.copy(s) self.data = copy.copy(s)
self.set_etag()
self.size = len(s) self.size = len(s)
self._handle_headers(headers) self._handle_headers(headers)


Expand All @@ -118,6 +129,38 @@ def copy(self, dst_bucket_name, dst_key, metadata=NOT_IMPL,
return dst_bucket.copy_key(dst_key, self.bucket.name, return dst_bucket.copy_key(dst_key, self.bucket.name,
self.name, metadata) self.name, metadata)


def set_etag(self):
"""
Set etag attribute by generating hex MD5 checksum on current
contents of mock key.
"""
m = md5()
m.update(self.data)
hex_md5 = m.hexdigest()
self.etag = hex_md5

def compute_md5(self, fp):
"""
:type fp: file
:param fp: File pointer to the file to MD5 hash. The file pointer
will be reset to the beginning of the file before the
method returns.
:rtype: tuple
:return: A tuple containing the hex digest version of the MD5 hash
as the first element and the base64 encoded version of the
plain digest as the second element.
"""
tup = compute_md5(fp)
# Returned values are MD5 hash, base64 encoded MD5 hash, and file size.
# The internal implementation of compute_md5() needs to return the
# file size but we don't want to return that value to the external
# caller because it changes the class interface (i.e. it might
# break some code) so we consume the third tuple value here and
# return the remainder of the tuple to the caller, thereby preserving
# the existing interface.
self.size = tup[2]
return tup[0:2]


class MockBucket(object): class MockBucket(object):


Expand Down
18 changes: 18 additions & 0 deletions tests/s3/test_gsconnection.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import unittest import unittest
import time import time
import os import os
import re
from boto.gs.connection import GSConnection from boto.gs.connection import GSConnection
from boto import storage_uri from boto import storage_uri


Expand Down Expand Up @@ -218,12 +219,25 @@ def test_2_copy_key(self):


def test_3_default_object_acls(self): def test_3_default_object_acls(self):
"""test default object acls""" """test default object acls"""
# regexp for matching project-private default object ACL
project_private_re = '\s*<AccessControlList>\s*<Entries>\s*<Entry>' \
'\s*<Scope type="GroupById"><ID>[0-9a-fA-F]+</ID></Scope>' \
'\s*<Permission>FULL_CONTROL</Permission>\s*</Entry>\s*<Entry>' \
'\s*<Scope type="GroupById"><ID>[0-9a-fA-F]+</ID></Scope>' \
'\s*<Permission>FULL_CONTROL</Permission>\s*</Entry>\s*<Entry>' \
'\s*<Scope type="GroupById"><ID>[0-9a-fA-F]+</ID></Scope>' \
'\s*<Permission>READ</Permission></Entry>\s*</Entries>' \
'\s*</AccessControlList>\s*'
c = GSConnection() c = GSConnection()
# create a new bucket # create a new bucket
bucket_name = 'test-%d' % int(time.time()) bucket_name = 'test-%d' % int(time.time())
bucket = c.create_bucket(bucket_name) bucket = c.create_bucket(bucket_name)
# now call get_bucket to see if it's really there # now call get_bucket to see if it's really there
bucket = c.get_bucket(bucket_name) bucket = c.get_bucket(bucket_name)
# get default acl and make sure it's project-private
acl = bucket.get_def_acl()
assert re.search(project_private_re, acl.to_xml())
# set default acl to a canned acl and verify it gets set
bucket.set_def_acl('public-read') bucket.set_def_acl('public-read')
acl = bucket.get_def_acl() acl = bucket.get_def_acl()
# save public-read acl for later test # save public-read acl for later test
Expand Down Expand Up @@ -252,6 +266,10 @@ def test_3_default_object_acls(self):
bucket_name = 'test-%d' % int(time.time()) bucket_name = 'test-%d' % int(time.time())
uri = storage_uri('gs://' + bucket_name) uri = storage_uri('gs://' + bucket_name)
uri.create_bucket() uri.create_bucket()
# get default acl and make sure it's project-private
acl = uri.get_def_acl()
assert re.search(project_private_re, acl.to_xml())
# set default acl to a canned acl and verify it gets set
uri.set_def_acl('public-read') uri.set_def_acl('public-read')
acl = uri.get_def_acl() acl = uri.get_def_acl()
# save public-read acl for later test # save public-read acl for later test
Expand Down
82 changes: 0 additions & 82 deletions tests/s3/test_resumable_downloads.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -379,88 +379,6 @@ def test_zero_length_object_download(self):
self.dst_fp, res_download_handler=res_download_handler) self.dst_fp, res_download_handler=res_download_handler)
self.assertEqual(0, get_cur_file_size(self.dst_fp)) self.assertEqual(0, get_cur_file_size(self.dst_fp))


def test_download_with_object_size_change_between_starts(self):
"""
Tests resumable download on an object that changes sizes between inital
download start and restart
"""
harnass = CallbackTestHarnass(
fail_after_n_bytes=self.larger_src_key_size/2, num_times_to_fail=2)
# Set up first process' ResumableDownloadHandler not to do any
# retries (initial download request will establish expected size to
# download server).
res_download_handler = ResumableDownloadHandler(
tracker_file_name=self.tracker_file_name, num_retries=0)
try:
self.larger_src_key.get_contents_to_file(
self.dst_fp, cb=harnass.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
# First abort (from harnass-forced failure) should be
# ABORT_CUR_PROCESS.
self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT_CUR_PROCESS)
# Ensure a tracker file survived.
self.assertTrue(os.path.exists(self.tracker_file_name))
# Try it again, this time with different src key (simulating an
# object that changes sizes between downloads).
try:
self.small_src_key.get_contents_to_file(
self.dst_fp, res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
# This abort should be a hard abort (object size changing during
# transfer).
self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT)
self.assertNotEqual(
e.message.find('md5 signature doesn\'t match etag'), -1)

def test_download_with_file_content_change_during_download(self):
"""
Tests resumable download on an object where the file content changes
without changing length while download in progress
"""
harnass = CallbackTestHarnass(
fail_after_n_bytes=self.larger_src_key_size/2, num_times_to_fail=2)
# Set up first process' ResumableDownloadHandler not to do any
# retries (initial download request will establish expected size to
# download server).
res_download_handler = ResumableDownloadHandler(
tracker_file_name=self.tracker_file_name, num_retries=0)
dst_filename = self.dst_fp.name
try:
self.larger_src_key.get_contents_to_file(
self.dst_fp, cb=harnass.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
# First abort (from harnass-forced failure) should be
# ABORT_CUR_PROCESS.
self.assertEqual(e.disposition,
ResumableTransferDisposition.ABORT_CUR_PROCESS)
# Ensure a tracker file survived.
self.assertTrue(os.path.exists(self.tracker_file_name))
# Before trying again change the first byte of the file fragment
# that was already downloaded.
orig_size = get_cur_file_size(self.dst_fp)
self.dst_fp.seek(0, os.SEEK_SET)
self.dst_fp.write('a')
# Ensure the file size didn't change.
self.assertEqual(orig_size, get_cur_file_size(self.dst_fp))
try:
self.larger_src_key.get_contents_to_file(
self.dst_fp, cb=harnass.call,
res_download_handler=res_download_handler)
self.fail('Did not get expected ResumableDownloadException')
except ResumableDownloadException, e:
# This abort should be a hard abort (file content changing during
# transfer).
self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT)
self.assertNotEqual(
e.message.find('md5 signature doesn\'t match etag'), -1)
# Ensure the bad data wasn't left around.
self.assertFalse(os.path.exists(dst_filename))

def test_download_with_invalid_tracker_etag(self): def test_download_with_invalid_tracker_etag(self):
""" """
Tests resumable download with a tracker file containing an invalid etag Tests resumable download with a tracker file containing an invalid etag
Expand Down

0 comments on commit f83da62

Please sign in to comment.