Skip to content
This repository has been archived by the owner on May 10, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesls committed Mar 27, 2013
2 parents c74facf + 9f39550 commit 2b4adff
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 153 deletions.
6 changes: 3 additions & 3 deletions boto/auth.py
Expand Up @@ -164,9 +164,9 @@ def add_auth(self, http_request, **kwargs):
boto.log.debug('StringToSign:\n%s' % string_to_sign)
b64_hmac = self.sign_string(string_to_sign)
auth_hdr = self._provider.auth_header
headers['Authorization'] = ("%s %s:%s" %
(auth_hdr,
self._provider.access_key, b64_hmac))
auth = ("%s %s:%s" % (auth_hdr, self._provider.access_key, b64_hmac))
boto.log.debug('Signature:\n%s' % auth)
headers['Authorization'] = auth


class HmacAuthV2Handler(AuthHandler, HmacKeys):
Expand Down
16 changes: 10 additions & 6 deletions boto/gs/acl.py
Expand Up @@ -51,11 +51,12 @@
SupportedPermissions = ['READ', 'WRITE', 'FULL_CONTROL']
"""A list of supported ACL permissions."""

class ACL:

class ACL(object):

def __init__(self, parent=None):
self.parent = parent
self.entries = []
self.entries = Entries(self)

@property
def acl(self):
Expand Down Expand Up @@ -125,7 +126,7 @@ def to_xml(self):
return s


class Entries:
class Entries(object):

def __init__(self, parent=None):
self.parent = parent
Expand Down Expand Up @@ -154,15 +155,17 @@ def endElement(self, name, value, connection):
setattr(self, name, value)

def to_xml(self):
if not self.entry_list:
return ''
s = '<%s>' % ENTRIES
for entry in self.entry_list:
s += entry.to_xml()
s += '</%s>' % ENTRIES
return s


# Class that represents a single (Scope, Permission) entry in an ACL.
class Entry:
class Entry(object):

def __init__(self, scope=None, type=None, id=None, name=None,
email_address=None, domain=None, permission=None):
Expand Down Expand Up @@ -219,7 +222,8 @@ def to_xml(self):
s += '</%s>' % ENTRY
return s

class Scope:

class Scope(object):

# Map from Scope type.lower() to lower-cased list of allowed sub-elems.
ALLOWED_SCOPE_TYPE_SUB_ELEMS = {
Expand Down
242 changes: 215 additions & 27 deletions boto/gs/key.py

Large diffs are not rendered by default.

74 changes: 51 additions & 23 deletions boto/gs/resumable_upload_handler.py
Expand Up @@ -163,6 +163,22 @@ def get_tracker_uri(self):
"""
return self.tracker_uri

def get_upload_id(self):
"""
Returns the upload ID for the resumable upload, or None if the upload
has not yet started.
"""
# We extract the upload_id from the tracker uri. We could retrieve the
# upload_id from the headers in the response but this only works for
# the case where we get the tracker uri from the service. In the case
# where we get the tracker from the tracking file we need to do this
# logic anyway.
delim = '?upload_id='
if self.tracker_uri and delim in self.tracker_uri:
return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):]
else:
return None

def _remove_tracker_file(self):
if (self.tracker_file_name and
os.path.exists(self.tracker_file_name)):
Expand Down Expand Up @@ -306,7 +322,7 @@ def _start_new_resumable_upload(self, key, headers=None):
self._save_tracker_uri_to_file()

def _upload_file_bytes(self, conn, http_conn, fp, file_length,
total_bytes_uploaded, cb, num_cb, md5sum, headers):
total_bytes_uploaded, cb, num_cb, headers):
"""
Makes one attempt to upload file bytes, using an existing resumable
upload connection.
Expand Down Expand Up @@ -360,7 +376,8 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
http_conn.set_debuglevel(0)
while buf:
http_conn.send(buf)
md5sum.update(buf)
for alg in self.digesters:
self.digesters[alg].update(buf)
total_bytes_uploaded += len(buf)
if cb:
i += 1
Expand Down Expand Up @@ -400,7 +417,7 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
(resp.status, resp.reason), disposition)

def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
num_cb, md5sum):
num_cb):
"""
Attempts a resumable upload.
Expand All @@ -419,9 +436,9 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,

if server_end:
# If the server already has some of the content, we need to
# update the md5 with the bytes that have already been
# update the digesters with the bytes that have already been
# uploaded to ensure we get a complete hash in the end.
print 'Catching up md5 for resumed upload'
print 'Catching up hash digest(s) for resumed upload'
fp.seek(0)
# Read local file's bytes through position server has. For
# example, if server has (0, 3) we want to read 3-0+1=4 bytes.
Expand All @@ -430,13 +447,14 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
chunk = fp.read(min(key.BufferSize, bytes_to_go))
if not chunk:
raise ResumableUploadException(
'Hit end of file during resumable upload md5 '
'Hit end of file during resumable upload hash '
'catchup. This should not happen under\n'
'normal circumstances, as it indicates the '
'server has more bytes of this transfer\nthan'
' the current file size. Restarting upload.',
ResumableTransferDisposition.START_OVER)
md5sum.update(chunk)
for alg in self.digesters:
self.digesters[alg].update(chunk)
bytes_to_go -= len(chunk)

if conn.debug >= 1:
Expand Down Expand Up @@ -476,7 +494,7 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
# and can report that progress on next attempt.
try:
return self._upload_file_bytes(conn, http_conn, fp, file_length,
total_bytes_uploaded, cb, num_cb, md5sum,
total_bytes_uploaded, cb, num_cb,
headers)
except (ResumableUploadException, socket.error):
resp = self._query_server_state(conn, file_length)
Expand Down Expand Up @@ -540,9 +558,9 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
else:
self.progress_less_iterations += 1
if roll_back_md5:
# Rollback any potential md5sum updates, as we did not
# Rollback any potential hash updates, as we did not
# make any progress in this iteration.
self.md5sum = self.md5sum_before_attempt
self.digesters = self.digesters_before_attempt

if self.progress_less_iterations > self.num_retries:
# Don't retry any longer in the current process.
Expand All @@ -559,7 +577,7 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
(self.progress_less_iterations, sleep_time_secs))
time.sleep(sleep_time_secs)

def send_file(self, key, fp, headers, cb=None, num_cb=10):
def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
"""
Upload a file to a key into a bucket on GS, using GS resumable upload
protocol.
Expand Down Expand Up @@ -587,6 +605,12 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
during the file transfer. Providing a negative integer will cause
your callback to be called with each buffer read.
:type hash_algs: dictionary
:param hash_algs: (optional) Dictionary mapping hash algorithm
descriptions to corresponding state-ful hashing objects that
implement update(), digest(), and copy() (e.g. hashlib.md5()).
Defaults to {'md5': md5()}.
Raises ResumableUploadException if a problem occurs during the transfer.
"""

Expand All @@ -597,22 +621,25 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
# that header.
CT = 'Content-Type'
if CT in headers and headers[CT] is None:
del headers[CT]
del headers[CT]

headers['User-Agent'] = UserAgent

# Determine file size different ways for case where fp is actually a
# wrapper around a Key vs an actual file.
if isinstance(fp, KeyFile):
file_length = fp.getkey().size
file_length = fp.getkey().size
else:
fp.seek(0, os.SEEK_END)
file_length = fp.tell()
fp.seek(0)
fp.seek(0, os.SEEK_END)
file_length = fp.tell()
fp.seek(0)
debug = key.bucket.connection.debug

# Compute the MD5 checksum on the fly.
self.md5sum = md5()
if hash_algs is None:
hash_algs = {'md5': md5}
self.digesters = dict(
(alg, hash_algs[alg]()) for alg in hash_algs or {})

# Use num-retries from constructor if one was provided; else check
# for a value specified in the boto config file; else default to 5.
Expand All @@ -622,19 +649,20 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):

while True: # Retry as long as we're making progress.
server_had_bytes_before_attempt = self.server_has_bytes
self.md5sum_before_attempt = self.md5sum.copy()
self.digesters_before_attempt = dict(
(alg, self.digesters[alg].copy())
for alg in self.digesters)
try:
# Save generation and metageneration in class state so caller
# can find these values, for use in preconditions of future
# operations on the uploaded object.
(etag, self.generation, self.metageneration) = (
self._attempt_resumable_upload(key, fp, file_length,
headers, cb, num_cb,
self.md5sum))
headers, cb, num_cb))

# Get the final md5 for the uploaded content.
hd = self.md5sum.hexdigest()
key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
# Get the final digests for the uploaded content.
for alg in self.digesters:
key.local_hashes[alg] = self.digesters[alg].digest()

# Upload succceded, so remove the tracker file (if have one).
self._remove_tracker_file()
Expand Down
3 changes: 3 additions & 0 deletions boto/s3/bucket.py
Expand Up @@ -209,6 +209,7 @@ def _get_key_internal(self, key_name, headers, query_args_l):
k.handle_version_headers(response)
k.handle_encryption_headers(response)
k.handle_restore_headers(response)
k.handle_addl_headers(response.getheaders())
return k, response
else:
if response.status == 404:
Expand Down Expand Up @@ -622,6 +623,7 @@ def _delete_key_internal(self, key_name, headers=None, version_id=None,
k = self.key_class(self)
k.name = key_name
k.handle_version_headers(response)
k.handle_addl_headers(response.getheaders())
return k

def copy_key(self, new_key_name, src_bucket_name,
Expand Down Expand Up @@ -715,6 +717,7 @@ def copy_key(self, new_key_name, src_bucket_name,
if hasattr(key, 'Error'):
raise provider.storage_copy_error(key.Code, key.Message, body)
key.handle_version_headers(response)
key.handle_addl_headers(response.getheaders())
if preserve_acl:
self.set_xml_acl(acl, new_key_name)
return key
Expand Down

0 comments on commit 2b4adff

Please sign in to comment.