Permalink
Browse files

Support for GCS object composition and CRC32c hashes.

  • Loading branch information...
1 parent e1edf37 commit 416c629b61a548f3aa34c902397d66211e40f47c @yovadia yovadia committed Feb 12, 2013
View

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -322,7 +322,7 @@ def _start_new_resumable_upload(self, key, headers=None):
self._save_tracker_uri_to_file()
def _upload_file_bytes(self, conn, http_conn, fp, file_length,
- total_bytes_uploaded, cb, num_cb, md5sum, headers):
+ total_bytes_uploaded, cb, num_cb, headers):
"""
Makes one attempt to upload file bytes, using an existing resumable
upload connection.
@@ -376,7 +376,8 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
http_conn.set_debuglevel(0)
while buf:
http_conn.send(buf)
- md5sum.update(buf)
+ for alg in self.digesters:
+ self.digesters[alg].update(buf)
total_bytes_uploaded += len(buf)
if cb:
i += 1
@@ -416,7 +417,7 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
(resp.status, resp.reason), disposition)
def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
- num_cb, md5sum):
+ num_cb):
"""
Attempts a resumable upload.
@@ -435,9 +436,9 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
if server_end:
# If the server already has some of the content, we need to
- # update the md5 with the bytes that have already been
+ # update the digesters with the bytes that have already been
# uploaded to ensure we get a complete hash in the end.
- print 'Catching up md5 for resumed upload'
+ print 'Catching up hash digest(s) for resumed upload'
fp.seek(0)
# Read local file's bytes through position server has. For
# example, if server has (0, 3) we want to read 3-0+1=4 bytes.
@@ -446,13 +447,14 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
chunk = fp.read(min(key.BufferSize, bytes_to_go))
if not chunk:
raise ResumableUploadException(
- 'Hit end of file during resumable upload md5 '
+ 'Hit end of file during resumable upload hash '
'catchup. This should not happen under\n'
'normal circumstances, as it indicates the '
'server has more bytes of this transfer\nthan'
' the current file size. Restarting upload.',
ResumableTransferDisposition.START_OVER)
- md5sum.update(chunk)
+ for alg in self.digesters:
+ self.digesters[alg].update(chunk)
bytes_to_go -= len(chunk)
if conn.debug >= 1:
@@ -492,7 +494,7 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
# and can report that progress on next attempt.
try:
return self._upload_file_bytes(conn, http_conn, fp, file_length,
- total_bytes_uploaded, cb, num_cb, md5sum,
+ total_bytes_uploaded, cb, num_cb,
headers)
except (ResumableUploadException, socket.error):
resp = self._query_server_state(conn, file_length)
@@ -556,9 +558,9 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
else:
self.progress_less_iterations += 1
if roll_back_md5:
- # Rollback any potential md5sum updates, as we did not
+ # Rollback any potential hash updates, as we did not
# make any progress in this iteration.
- self.md5sum = self.md5sum_before_attempt
+ self.digesters = self.digesters_before_attempt
if self.progress_less_iterations > self.num_retries:
# Don't retry any longer in the current process.
@@ -575,7 +577,7 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
(self.progress_less_iterations, sleep_time_secs))
time.sleep(sleep_time_secs)
- def send_file(self, key, fp, headers, cb=None, num_cb=10):
+ def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
"""
Upload a file to a key into a bucket on GS, using GS resumable upload
protocol.
@@ -603,6 +605,12 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
during the file transfer. Providing a negative integer will cause
your callback to be called with each buffer read.
+ :type hash_algs: dictionary
+ :param hash_algs: (optional) Dictionary mapping hash algorithm
+ descriptions to corresponding state-ful hashing objects that
+ implement update(), digest(), and copy() (e.g. hashlib.md5()).
+ Defaults to {'md5': md5()}.
+
Raises ResumableUploadException if a problem occurs during the transfer.
"""
@@ -613,22 +621,25 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
# that header.
CT = 'Content-Type'
if CT in headers and headers[CT] is None:
- del headers[CT]
+ del headers[CT]
headers['User-Agent'] = UserAgent
# Determine file size different ways for case where fp is actually a
# wrapper around a Key vs an actual file.
if isinstance(fp, KeyFile):
- file_length = fp.getkey().size
+ file_length = fp.getkey().size
else:
- fp.seek(0, os.SEEK_END)
- file_length = fp.tell()
- fp.seek(0)
+ fp.seek(0, os.SEEK_END)
+ file_length = fp.tell()
+ fp.seek(0)
debug = key.bucket.connection.debug
# Compute the MD5 checksum on the fly.
- self.md5sum = md5()
+ if hash_algs is None:
+ hash_algs = {'md5': md5}
+ self.digesters = dict(
+ (alg, hash_algs[alg]()) for alg in hash_algs or {})
# Use num-retries from constructor if one was provided; else check
# for a value specified in the boto config file; else default to 5.
@@ -638,19 +649,20 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
while True: # Retry as long as we're making progress.
server_had_bytes_before_attempt = self.server_has_bytes
- self.md5sum_before_attempt = self.md5sum.copy()
+ self.digesters_before_attempt = dict(
+ (alg, self.digesters[alg].copy())
+ for alg in self.digesters)
try:
# Save generation and metageneration in class state so caller
# can find these values, for use in preconditions of future
# operations on the uploaded object.
(etag, self.generation, self.metageneration) = (
self._attempt_resumable_upload(key, fp, file_length,
- headers, cb, num_cb,
- self.md5sum))
+ headers, cb, num_cb))
- # Get the final md5 for the uploaded content.
- hd = self.md5sum.hexdigest()
- key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
+ # Get the final digests for the uploaded content.
+ for alg in self.digesters:
+ key.local_hashes[alg] = self.digesters[alg].digest()
# Upload succceded, so remove the tracker file (if have one).
self._remove_tracker_file()
View
@@ -209,6 +209,7 @@ def _get_key_internal(self, key_name, headers, query_args_l):
k.handle_version_headers(response)
k.handle_encryption_headers(response)
k.handle_restore_headers(response)
+ k.handle_addl_headers(response.getheaders())
return k, response
else:
if response.status == 404:
@@ -622,6 +623,7 @@ def _delete_key_internal(self, key_name, headers=None, version_id=None,
k = self.key_class(self)
k.name = key_name
k.handle_version_headers(response)
+ k.handle_addl_headers(response.getheaders())
return k
def copy_key(self, new_key_name, src_bucket_name,
@@ -715,6 +717,7 @@ def copy_key(self, new_key_name, src_bucket_name,
if hasattr(key, 'Error'):
raise provider.storage_copy_error(key.Code, key.Message, body)
key.handle_version_headers(response)
+ key.handle_addl_headers(response.getheaders())
if preserve_acl:
self.set_xml_acl(acl, new_key_name)
return key
Oops, something went wrong.

0 comments on commit 416c629

Please sign in to comment.