Skip to content
This repository
Browse code

Merge pull request #1409 from yovadia12/compose

Support for GCS object composition and CRC32c hashes.
  • Loading branch information...
commit dcff946e220f22783d12d169dbbcbbdd7db80842 2 parents e1edf37 + 416c629
Mike Schwartz mfschwartz authored
242 boto/gs/key.py
@@ -27,6 +27,7 @@
27 27 from boto.exception import BotoClientError
28 28 from boto.s3.key import Key as S3Key
29 29 from boto.s3.keyfile import KeyFile
  30 +from boto.utils import compute_hash
30 31
31 32 class Key(S3Key):
32 33 """
@@ -47,7 +48,7 @@ class Key(S3Key):
47 48 :ivar last_modified: The string timestamp representing the last
48 49 time this object was modified in GS.
49 50 :ivar owner: The ID of the owner of this object.
50   - :ivar storage_class: The storage class of the object. Currently, one of:
  51 + :ivar storage_class: The storage class of the object. Currently, one of:
51 52 STANDARD | DURABLE_REDUCED_AVAILABILITY.
52 53 :ivar md5: The MD5 hash of the contents of the object.
53 54 :ivar size: The size, in bytes, of the object.
@@ -55,9 +56,16 @@ class Key(S3Key):
55 56 :ivar metageneration: The generation number of the object metadata.
56 57 :ivar encrypted: Whether the object is encrypted while at rest on
57 58 the server.
  59 + :ivar cloud_hashes: Dictionary of checksums as supplied by the storage
  60 + provider.
58 61 """
59   - generation = None
60   - metageneration = None
  62 +
  63 + def __init__(self, bucket=None, name=None, generation=None):
  64 + super(Key, self).__init__(bucket=bucket, name=name)
  65 + self.generation = generation
  66 + self.meta_generation = None
  67 + self.cloud_hashes = {}
  68 + self.component_count = None
61 69
62 70 def __repr__(self):
63 71 if self.generation and self.metageneration:
@@ -100,17 +108,164 @@ def handle_version_headers(self, resp, force=False):
100 108 self.metageneration = resp.getheader('x-goog-metageneration', None)
101 109 self.generation = resp.getheader('x-goog-generation', None)
102 110
  111 + def handle_addl_headers(self, headers):
  112 + for key, value in headers:
  113 + if key == 'x-goog-hash':
  114 + for hash_pair in value.split(','):
  115 + alg, b64_digest = hash_pair.strip().split('=', 1)
  116 + self.cloud_hashes[alg] = binascii.a2b_base64(b64_digest)
  117 + elif key == 'x-goog-component-count':
  118 + self.component_count = int(value)
  119 +
  120 +
103 121 def get_file(self, fp, headers=None, cb=None, num_cb=10,
104 122 torrent=False, version_id=None, override_num_retries=None,
105   - response_headers=None):
  123 + response_headers=None, hash_algs=None):
106 124 query_args = None
107 125 if self.generation:
108 126 query_args = ['generation=%s' % self.generation]
109 127 self._get_file_internal(fp, headers=headers, cb=cb, num_cb=num_cb,
110 128 override_num_retries=override_num_retries,
111 129 response_headers=response_headers,
  130 + hash_algs=hash_algs,
112 131 query_args=query_args)
113 132
  133 + def get_contents_to_file(self, fp, headers=None,
  134 + cb=None, num_cb=10,
  135 + torrent=False,
  136 + version_id=None,
  137 + res_download_handler=None,
  138 + response_headers=None,
  139 + hash_algs=None):
  140 + """
  141 + Retrieve an object from GCS using the name of the Key object as the
  142 + key in GCS. Write the contents of the object to the file pointed
  143 + to by 'fp'.
  144 +
  145 + :type fp: File -like object
  146 + :param fp:
  147 +
  148 + :type headers: dict
  149 + :param headers: additional HTTP headers that will be sent with
  150 + the GET request.
  151 +
  152 + :type cb: function
  153 + :param cb: a callback function that will be called to report
  154 + progress on the upload. The callback should accept two
  155 + integer parameters, the first representing the number of
  156 + bytes that have been successfully transmitted to GCS and
  157 + the second representing the size of the to be transmitted
  158 + object.
  159 +
  160 + :type cb: int
  161 + :param num_cb: (optional) If a callback is specified with the
  162 + cb parameter this parameter determines the granularity of
  163 + the callback by defining the maximum number of times the
  164 + callback will be called during the file transfer.
  165 +
  166 + :type torrent: bool
  167 + :param torrent: If True, returns the contents of a torrent
  168 + file as a string.
  169 +
  170 + :type res_upload_handler: ResumableDownloadHandler
  171 + :param res_download_handler: If provided, this handler will
  172 + perform the download.
  173 +
  174 + :type response_headers: dict
  175 + :param response_headers: A dictionary containing HTTP
  176 + headers/values that will override any headers associated
  177 + with the stored object in the response. See
  178 + http://goo.gl/sMkcC for details.
  179 + """
  180 + if self.bucket != None:
  181 + if res_download_handler:
  182 + res_download_handler.get_file(self, fp, headers, cb, num_cb,
  183 + torrent=torrent,
  184 + version_id=version_id,
  185 + hash_algs=hash_algs)
  186 + else:
  187 + self.get_file(fp, headers, cb, num_cb, torrent=torrent,
  188 + version_id=version_id,
  189 + response_headers=response_headers,
  190 + hash_algs=hash_algs)
  191 +
  192 + def compute_hash(self, fp, algorithm, size=None):
  193 + """
  194 + :type fp: file
  195 + :param fp: File pointer to the file to hash. The file
  196 + pointer will be reset to the same position before the
  197 + method returns.
  198 +
  199 + :type algorithm: zero-argument constructor for hash objects that
  200 + implements update() and digest() (e.g. hashlib.md5)
  201 +
  202 + :type size: int
  203 + :param size: (optional) The Maximum number of bytes to read
  204 + from the file pointer (fp). This is useful when uploading
  205 + a file in multiple parts where the file is being split
  206 + in place into different parts. Less bytes may be available.
  207 + """
  208 + hex_digest, b64_digest, data_size = compute_hash(
  209 + fp, size=size, hash_algorithm=algorithm)
  210 + # The internal implementation of compute_hash() needs to return the
  211 + # data size, but we don't want to return that value to the external
  212 + # caller because it changes the class interface (i.e. it might
  213 + # break some code), so we consume the third tuple value here and
  214 + # return the remainder of the tuple to the caller, thereby preserving
  215 + # the existing interface.
  216 + self.size = data_size
  217 + return (hex_digest, b64_digest)
  218 +
  219 + def send_file(self, fp, headers=None, cb=None, num_cb=10,
  220 + query_args=None, chunked_transfer=False, size=None,
  221 + hash_algs=None):
  222 + """
  223 + Upload a file to GCS.
  224 +
  225 + :type fp: file
  226 + :param fp: The file pointer to upload. The file pointer must
  227 + point point at the offset from which you wish to upload.
  228 + ie. if uploading the full file, it should point at the
  229 + start of the file. Normally when a file is opened for
  230 + reading, the fp will point at the first byte. See the
  231 + bytes parameter below for more info.
  232 +
  233 + :type headers: dict
  234 + :param headers: The headers to pass along with the PUT request
  235 +
  236 + :type num_cb: int
  237 + :param num_cb: (optional) If a callback is specified with the
  238 + cb parameter this parameter determines the granularity of
  239 + the callback by defining the maximum number of times the
  240 + callback will be called during the file
  241 + transfer. Providing a negative integer will cause your
  242 + callback to be called with each buffer read.
  243 +
  244 + :type query_args: string
  245 + :param query_args: Arguments to pass in the query string.
  246 +
  247 + :type chunked_transfer: boolean
  248 + :param chunked_transfer: (optional) If true, we use chunked
  249 + Transfer-Encoding.
  250 +
  251 + :type size: int
  252 + :param size: (optional) The Maximum number of bytes to read
  253 + from the file pointer (fp). This is useful when uploading
  254 + a file in multiple parts where you are splitting the file
  255 + up into different ranges to be uploaded. If not specified,
  256 + the default behaviour is to read all bytes from the file
  257 + pointer. Less bytes may be available.
  258 +
  259 + :type hash_algs: dictionary
  260 + :param hash_algs: (optional) Dictionary of hash algorithms and
  261 + corresponding hashing class that implements update() and digest().
  262 + Defaults to {'md5': hashlib.md5}.
  263 + """
  264 + self._send_file_internal(fp, headers=headers, cb=cb, num_cb=num_cb,
  265 + query_args=query_args,
  266 + chunked_transfer=chunked_transfer, size=size,
  267 + hash_algs=hash_algs)
  268 +
114 269 def delete(self):
115 270 return self.bucket.delete_key(self.name, version_id=self.version_id,
116 271 generation=self.generation)
@@ -289,7 +444,8 @@ def set_contents_from_file(self, fp, headers=None, replace=True,
289 444 provider = self.bucket.connection.provider
290 445 if res_upload_handler and size:
291 446 # could use size instead of file_length if provided but...
292   - raise BotoClientError('"size" param not supported for resumable uploads.')
  447 + raise BotoClientError(
  448 + '"size" param not supported for resumable uploads.')
293 449 headers = headers or {}
294 450 if policy:
295 451 headers[provider.acl_header] = policy
@@ -431,22 +587,21 @@ def set_contents_from_filename(self, filename, headers=None, replace=True,
431 587 this value. If set to the value 0, the object will only be written
432 588 if it doesn't already exist.
433 589 """
434   - # Clear out any previously computed md5 hashes, since we are setting the content.
435   - self.md5 = None
436   - self.base64md5 = None
  590 + # Clear out any previously computed hashes, since we are setting the
  591 + # content.
  592 + self.local_hashes = {}
437 593
438   - fp = open(filename, 'rb')
439   - self.set_contents_from_file(fp, headers, replace, cb, num_cb,
440   - policy, md5, res_upload_handler,
441   - if_generation=if_generation)
442   - fp.close()
  594 + with open(filename, 'rb') as fp:
  595 + self.set_contents_from_file(fp, headers, replace, cb, num_cb,
  596 + policy, md5, res_upload_handler,
  597 + if_generation=if_generation)
443 598
444 599 def set_contents_from_string(self, s, headers=None, replace=True,
445 600 cb=None, num_cb=10, policy=None, md5=None,
446 601 if_generation=None):
447 602 """
448   - Store an object in S3 using the name of the Key object as the
449   - key in S3 and the string 's' as the contents.
  603 + Store an object in GCS using the name of the Key object as the
  604 + key in GCS and the string 's' as the contents.
450 605 See set_contents_from_file method for details about the
451 606 parameters.
452 607
@@ -460,10 +615,10 @@ def set_contents_from_string(self, s, headers=None, replace=True,
460 615
461 616 :type cb: function
462 617 :param cb: a callback function that will be called to report
463   - progress on the upload. The callback should accept
  618 + progress on the upload. The callback should accept
464 619 two integer parameters, the first representing the
465 620 number of bytes that have been successfully
466   - transmitted to S3 and the second representing the
  621 + transmitted to GCS and the second representing the
467 622 size of the to be transmitted object.
468 623
469 624 :type cb: int
@@ -473,19 +628,19 @@ def set_contents_from_string(self, s, headers=None, replace=True,
473 628 the maximum number of times the callback will
474 629 be called during the file transfer.
475 630
476   - :type policy: :class:`boto.s3.acl.CannedACLStrings`
  631 + :type policy: :class:`boto.gs.acl.CannedACLStrings`
477 632 :param policy: A canned ACL policy that will be applied to the
478   - new key in S3.
  633 + new key in GCS.
479 634
480 635 :type md5: A tuple containing the hexdigest version of the MD5
481 636 checksum of the file as the first element and the
482 637 Base64-encoded version of the plain checksum as the
483   - second element. This is the same format returned by
  638 + second element. This is the same format returned by
484 639 the compute_md5 method.
485 640 :param md5: If you need to compute the MD5 for any reason prior
486 641 to upload, it's silly to have to do it twice so this
487 642 param, if present, will be used as the MD5 values
488   - of the file. Otherwise, the checksum will be computed.
  643 + of the file. Otherwise, the checksum will be computed.
489 644
490 645 :type if_generation: int
491 646 :param if_generation: (optional) If set to a generation number, the
@@ -550,12 +705,6 @@ def set_contents_from_stream(self, *args, **kwargs):
550 705 :param policy: A canned ACL policy that will be applied to the new key
551 706 in GS.
552 707
553   - :type reduced_redundancy: bool
554   - :param reduced_redundancy: If True, this will set the storage
555   - class of the new Key to be REDUCED_REDUNDANCY. The Reduced
556   - Redundancy Storage (RRS) feature of S3, provides lower
557   - redundancy at lower storage cost.
558   -
559 708 :type size: int
560 709 :param size: (optional) The Maximum number of bytes to read from
561 710 the file pointer (fp). This is useful when uploading a
@@ -702,3 +851,42 @@ def set_canned_acl(self, acl_str, headers=None, generation=None,
702 851 if_generation=if_generation,
703 852 if_metageneration=if_metageneration
704 853 )
  854 +
  855 + def compose(self, components, content_type=None, headers=None):
  856 + """Create a new object from a sequence of existing objects.
  857 +
  858 + The content of the object representing this Key will be the
  859 + concatenation of the given object sequence. For more detail, visit
  860 +
  861 + https://developers.google.com/storage/docs/composite-objects
  862 +
  863 + :type components list of Keys
  864 + :param components List of gs.Keys representing the component objects
  865 +
  866 + :type content_type (optional) string
  867 + :param content_type Content type for the new composite object.
  868 + """
  869 + compose_req = []
  870 + for key in components:
  871 + if key.bucket.name != self.bucket.name:
  872 + raise BotoClientError(
  873 + 'GCS does not support inter-bucket composing')
  874 +
  875 + generation_tag = ''
  876 + if key.generation:
  877 + generation_tag = ('<Generation>%s</Generation>'
  878 + % str(key.generation))
  879 + compose_req.append('<Component><Name>%s</Name>%s</Component>' %
  880 + (key.name, generation_tag))
  881 + compose_req_xml = ('<ComposeRequest>%s</ComposeRequest>' %
  882 + ''.join(compose_req))
  883 + headers = headers or {}
  884 + if content_type:
  885 + headers['Content-Type'] = content_type
  886 + resp = self.bucket.connection.make_request('PUT', self.bucket.name,
  887 + self.name, headers=headers,
  888 + query_args='compose',
  889 + data=compose_req_xml)
  890 + if resp.status < 200 or resp.status > 299:
  891 + raise self.bucket.connection.provider.storage_response_error(
  892 + resp.status, resp.reason, resp.read())
58 boto/gs/resumable_upload_handler.py
@@ -322,7 +322,7 @@ def _start_new_resumable_upload(self, key, headers=None):
322 322 self._save_tracker_uri_to_file()
323 323
324 324 def _upload_file_bytes(self, conn, http_conn, fp, file_length,
325   - total_bytes_uploaded, cb, num_cb, md5sum, headers):
  325 + total_bytes_uploaded, cb, num_cb, headers):
326 326 """
327 327 Makes one attempt to upload file bytes, using an existing resumable
328 328 upload connection.
@@ -376,7 +376,8 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
376 376 http_conn.set_debuglevel(0)
377 377 while buf:
378 378 http_conn.send(buf)
379   - md5sum.update(buf)
  379 + for alg in self.digesters:
  380 + self.digesters[alg].update(buf)
380 381 total_bytes_uploaded += len(buf)
381 382 if cb:
382 383 i += 1
@@ -416,7 +417,7 @@ def _upload_file_bytes(self, conn, http_conn, fp, file_length,
416 417 (resp.status, resp.reason), disposition)
417 418
418 419 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
419   - num_cb, md5sum):
  420 + num_cb):
420 421 """
421 422 Attempts a resumable upload.
422 423
@@ -435,9 +436,9 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
435 436
436 437 if server_end:
437 438 # If the server already has some of the content, we need to
438   - # update the md5 with the bytes that have already been
  439 + # update the digesters with the bytes that have already been
439 440 # uploaded to ensure we get a complete hash in the end.
440   - print 'Catching up md5 for resumed upload'
  441 + print 'Catching up hash digest(s) for resumed upload'
441 442 fp.seek(0)
442 443 # Read local file's bytes through position server has. For
443 444 # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
@@ -446,13 +447,14 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
446 447 chunk = fp.read(min(key.BufferSize, bytes_to_go))
447 448 if not chunk:
448 449 raise ResumableUploadException(
449   - 'Hit end of file during resumable upload md5 '
  450 + 'Hit end of file during resumable upload hash '
450 451 'catchup. This should not happen under\n'
451 452 'normal circumstances, as it indicates the '
452 453 'server has more bytes of this transfer\nthan'
453 454 ' the current file size. Restarting upload.',
454 455 ResumableTransferDisposition.START_OVER)
455   - md5sum.update(chunk)
  456 + for alg in self.digesters:
  457 + self.digesters[alg].update(chunk)
456 458 bytes_to_go -= len(chunk)
457 459
458 460 if conn.debug >= 1:
@@ -492,7 +494,7 @@ def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
492 494 # and can report that progress on next attempt.
493 495 try:
494 496 return self._upload_file_bytes(conn, http_conn, fp, file_length,
495   - total_bytes_uploaded, cb, num_cb, md5sum,
  497 + total_bytes_uploaded, cb, num_cb,
496 498 headers)
497 499 except (ResumableUploadException, socket.error):
498 500 resp = self._query_server_state(conn, file_length)
@@ -556,9 +558,9 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
556 558 else:
557 559 self.progress_less_iterations += 1
558 560 if roll_back_md5:
559   - # Rollback any potential md5sum updates, as we did not
  561 + # Rollback any potential hash updates, as we did not
560 562 # make any progress in this iteration.
561   - self.md5sum = self.md5sum_before_attempt
  563 + self.digesters = self.digesters_before_attempt
562 564
563 565 if self.progress_less_iterations > self.num_retries:
564 566 # Don't retry any longer in the current process.
@@ -575,7 +577,7 @@ def track_progress_less_iterations(self, server_had_bytes_before_attempt,
575 577 (self.progress_less_iterations, sleep_time_secs))
576 578 time.sleep(sleep_time_secs)
577 579
578   - def send_file(self, key, fp, headers, cb=None, num_cb=10):
  580 + def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
579 581 """
580 582 Upload a file to a key into a bucket on GS, using GS resumable upload
581 583 protocol.
@@ -603,6 +605,12 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
603 605 during the file transfer. Providing a negative integer will cause
604 606 your callback to be called with each buffer read.
605 607
  608 + :type hash_algs: dictionary
  609 + :param hash_algs: (optional) Dictionary mapping hash algorithm
  610 + descriptions to corresponding state-ful hashing objects that
  611 + implement update(), digest(), and copy() (e.g. hashlib.md5()).
  612 + Defaults to {'md5': md5()}.
  613 +
606 614 Raises ResumableUploadException if a problem occurs during the transfer.
607 615 """
608 616
@@ -613,22 +621,25 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
613 621 # that header.
614 622 CT = 'Content-Type'
615 623 if CT in headers and headers[CT] is None:
616   - del headers[CT]
  624 + del headers[CT]
617 625
618 626 headers['User-Agent'] = UserAgent
619 627
620 628 # Determine file size different ways for case where fp is actually a
621 629 # wrapper around a Key vs an actual file.
622 630 if isinstance(fp, KeyFile):
623   - file_length = fp.getkey().size
  631 + file_length = fp.getkey().size
624 632 else:
625   - fp.seek(0, os.SEEK_END)
626   - file_length = fp.tell()
627   - fp.seek(0)
  633 + fp.seek(0, os.SEEK_END)
  634 + file_length = fp.tell()
  635 + fp.seek(0)
628 636 debug = key.bucket.connection.debug
629 637
630 638 # Compute the MD5 checksum on the fly.
631   - self.md5sum = md5()
  639 + if hash_algs is None:
  640 + hash_algs = {'md5': md5}
  641 + self.digesters = dict(
  642 + (alg, hash_algs[alg]()) for alg in hash_algs or {})
632 643
633 644 # Use num-retries from constructor if one was provided; else check
634 645 # for a value specified in the boto config file; else default to 5.
@@ -638,19 +649,20 @@ def send_file(self, key, fp, headers, cb=None, num_cb=10):
638 649
639 650 while True: # Retry as long as we're making progress.
640 651 server_had_bytes_before_attempt = self.server_has_bytes
641   - self.md5sum_before_attempt = self.md5sum.copy()
  652 + self.digesters_before_attempt = dict(
  653 + (alg, self.digesters[alg].copy())
  654 + for alg in self.digesters)
642 655 try:
643 656 # Save generation and metageneration in class state so caller
644 657 # can find these values, for use in preconditions of future
645 658 # operations on the uploaded object.
646 659 (etag, self.generation, self.metageneration) = (
647 660 self._attempt_resumable_upload(key, fp, file_length,
648   - headers, cb, num_cb,
649   - self.md5sum))
  661 + headers, cb, num_cb))
650 662
651   - # Get the final md5 for the uploaded content.
652   - hd = self.md5sum.hexdigest()
653   - key.md5, key.base64md5 = key.get_md5_from_hexdigest(hd)
  663 + # Get the final digests for the uploaded content.
  664 + for alg in self.digesters:
  665 + key.local_hashes[alg] = self.digesters[alg].digest()
654 666
655 667 # Upload succceded, so remove the tracker file (if have one).
656 668 self._remove_tracker_file()
3  boto/s3/bucket.py
@@ -209,6 +209,7 @@ def _get_key_internal(self, key_name, headers, query_args_l):
209 209 k.handle_version_headers(response)
210 210 k.handle_encryption_headers(response)
211 211 k.handle_restore_headers(response)
  212 + k.handle_addl_headers(response.getheaders())
212 213 return k, response
213 214 else:
214 215 if response.status == 404:
@@ -622,6 +623,7 @@ def _delete_key_internal(self, key_name, headers=None, version_id=None,
622 623 k = self.key_class(self)
623 624 k.name = key_name
624 625 k.handle_version_headers(response)
  626 + k.handle_addl_headers(response.getheaders())
625 627 return k
626 628
627 629 def copy_key(self, new_key_name, src_bucket_name,
@@ -715,6 +717,7 @@ def copy_key(self, new_key_name, src_bucket_name,
715 717 if hasattr(key, 'Error'):
716 718 raise provider.storage_copy_error(key.Code, key.Message, body)
717 719 key.handle_version_headers(response)
  720 + key.handle_addl_headers(response.getheaders())
718 721 if preserve_acl:
719 722 self.set_xml_acl(acl, new_key_name)
720 723 return key
154 boto/s3/key.py
@@ -109,8 +109,6 @@ def __init__(self, bucket=None, name=None):
109 109 self.last_modified = None
110 110 self.owner = None
111 111 self.storage_class = 'STANDARD'
112   - self.md5 = None
113   - self.base64md5 = None
114 112 self.path = None
115 113 self.resp = None
116 114 self.mode = None
@@ -126,6 +124,7 @@ def __init__(self, bucket=None, name=None):
126 124 # restored object.
127 125 self.ongoing_restore = None
128 126 self.expiry_date = None
  127 + self.local_hashes = {}
129 128
130 129 def __repr__(self):
131 130 if self.bucket:
@@ -133,18 +132,6 @@ def __repr__(self):
133 132 else:
134 133 return '<Key: None,%s>' % self.name
135 134
136   - def __getattr__(self, name):
137   - if name == 'key':
138   - return self.name
139   - else:
140   - raise AttributeError
141   -
142   - def __setattr__(self, name, value):
143   - if name == 'key':
144   - self.__dict__['name'] = value
145   - else:
146   - self.__dict__[name] = value
147   -
148 135 def __iter__(self):
149 136 return self
150 137
@@ -155,6 +142,38 @@ def provider(self):
155 142 provider = self.bucket.connection.provider
156 143 return provider
157 144
  145 + @property
  146 + def key(self):
  147 + return self.name
  148 +
  149 + @key.setter
  150 + def key(self, value):
  151 + self.name = value
  152 +
  153 + @property
  154 + def md5(self):
  155 + if 'md5' in self.local_hashes and self.local_hashes['md5']:
  156 + return binascii.b2a_hex(self.local_hashes['md5'])
  157 +
  158 + @md5.setter
  159 + def md5(self, value):
  160 + if value:
  161 + self.local_hashes['md5'] = binascii.a2b_hex(value)
  162 + elif 'md5' in self.local_hashes:
  163 + self.local_hashes.pop('md5', None)
  164 +
  165 + @property
  166 + def base64md5(self):
  167 + if 'md5' in self.local_hashes and self.local_hashes['md5']:
  168 + return binascii.b2a_base64(self.local_hashes['md5']).rstrip('\n')
  169 +
  170 + @base64md5.setter
  171 + def base64md5(self, value):
  172 + if value:
  173 + self.local_hashes['md5'] = binascii.a2b_base64(value)
  174 + elif 'md5' in self.local_hashes:
  175 + del self.local_hashes['md5']
  176 +
158 177 def get_md5_from_hexdigest(self, md5_hexdigest):
159 178 """
160 179 A utility function to create the 2-tuple (md5hexdigest, base64md5)
@@ -169,7 +188,8 @@ def get_md5_from_hexdigest(self, md5_hexdigest):
169 188 def handle_encryption_headers(self, resp):
170 189 provider = self.bucket.connection.provider
171 190 if provider.server_side_encryption_header:
172   - self.encrypted = resp.getheader(provider.server_side_encryption_header, None)
  191 + self.encrypted = resp.getheader(
  192 + provider.server_side_encryption_header, None)
173 193 else:
174 194 self.encrypted = None
175 195
@@ -202,6 +222,13 @@ def handle_restore_headers(self, response):
202 222 elif key == 'expiry-date':
203 223 self.expiry_date = val
204 224
  225 + def handle_addl_headers(self, headers):
  226 + """
  227 + Used by Key subclasses to do additional, provider-specific
  228 + processing of response headers. No-op for this base class.
  229 + """
  230 + pass
  231 +
205 232 def open_read(self, headers=None, query_args='',
206 233 override_num_retries=None, response_headers=None):
207 234 """
@@ -265,6 +292,7 @@ def open_read(self, headers=None, query_args='',
265 292 self.content_disposition = value
266 293 self.handle_version_headers(self.resp)
267 294 self.handle_encryption_headers(self.resp)
  295 + self.handle_addl_headers(self.resp.getheaders())
268 296
269 297 def open_write(self, headers=None, override_num_retries=None):
270 298 """
@@ -646,20 +674,12 @@ def send_file(self, fp, headers=None, cb=None, num_cb=10,
646 674 point point at the offset from which you wish to upload.
647 675 ie. if uploading the full file, it should point at the
648 676 start of the file. Normally when a file is opened for
649   - reading, the fp will point at the first byte. See the
  677 + reading, the fp will point at the first byte. See the
650 678 bytes parameter below for more info.
651 679
652 680 :type headers: dict
653 681 :param headers: The headers to pass along with the PUT request
654 682
655   - :type cb: function
656   - :param cb: a callback function that will be called to report
657   - progress on the upload. The callback should accept two
658   - integer parameters, the first representing the number of
659   - bytes that have been successfully transmitted to S3 and
660   - the second representing the size of the to be transmitted
661   - object.
662   -
663 683 :type num_cb: int
664 684 :param num_cb: (optional) If a callback is specified with the
665 685 cb parameter this parameter determines the granularity of
@@ -668,6 +688,13 @@ def send_file(self, fp, headers=None, cb=None, num_cb=10,
668 688 transfer. Providing a negative integer will cause your
669 689 callback to be called with each buffer read.
670 690
  691 + :type query_args: string
  692 + :param query_args: (optional) Arguments to pass in the query string.
  693 +
  694 + :type chunked_transfer: boolean
  695 + :param chunked_transfer: (optional) If true, we use chunked
  696 + Transfer-Encoding.
  697 +
671 698 :type size: int
672 699 :param size: (optional) The Maximum number of bytes to read
673 700 from the file pointer (fp). This is useful when uploading
@@ -676,6 +703,13 @@ def send_file(self, fp, headers=None, cb=None, num_cb=10,
676 703 the default behaviour is to read all bytes from the file
677 704 pointer. Less bytes may be available.
678 705 """
  706 + self._send_file_internal(fp, headers=headers, cb=cb, num_cb=num_cb,
  707 + query_args=query_args,
  708 + chunked_transfer=chunked_transfer, size=size)
  709 +
  710 + def _send_file_internal(self, fp, headers=None, cb=None, num_cb=10,
  711 + query_args=None, chunked_transfer=False, size=None,
  712 + hash_algs=None):
679 713 provider = self.bucket.connection.provider
680 714 try:
681 715 spos = fp.tell()
@@ -683,6 +717,12 @@ def send_file(self, fp, headers=None, cb=None, num_cb=10,
683 717 spos = None
684 718 self.read_from_stream = False
685 719
  720 + # If hash_algs is unset and the MD5 hasn't already been computed,
  721 + # default to an MD5 hash_alg to hash the data on-the-fly.
  722 + if hash_algs is None and not self.md5:
  723 + hash_algs = {'md5': md5}
  724 + digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
  725 +
686 726 def sender(http_conn, method, path, data, headers):
687 727 # This function is called repeatedly for temporary retries
688 728 # so we must be sure the file pointer is pointing at the
@@ -701,12 +741,6 @@ def sender(http_conn, method, path, data, headers):
701 741 http_conn.putheader(key, headers[key])
702 742 http_conn.endheaders()
703 743
704   - # Calculate all MD5 checksums on the fly, if not already computed
705   - if not self.base64md5:
706   - m = md5()
707   - else:
708   - m = None
709   -
710 744 save_debug = self.bucket.connection.debug
711 745 self.bucket.connection.debug = 0
712 746 # If the debuglevel < 4 we don't want to show connection
@@ -729,7 +763,8 @@ def sender(http_conn, method, path, data, headers):
729 763 # of data transferred, except when we know size.
730 764 cb_count = (1024 * 1024) / self.BufferSize
731 765 elif num_cb > 1:
732   - cb_count = int(math.ceil(cb_size / self.BufferSize / (num_cb - 1.0)))
  766 + cb_count = int(
  767 + math.ceil(cb_size / self.BufferSize / (num_cb - 1.0)))
733 768 elif num_cb < 0:
734 769 cb_count = -1
735 770 else:
@@ -754,8 +789,8 @@ def sender(http_conn, method, path, data, headers):
754 789 http_conn.send('\r\n')
755 790 else:
756 791 http_conn.send(chunk)
757   - if m:
758   - m.update(chunk)
  792 + for alg in digesters:
  793 + digesters[alg].update(chunk)
759 794 if bytes_togo:
760 795 bytes_togo -= chunk_len
761 796 if bytes_togo <= 0:
@@ -772,10 +807,8 @@ def sender(http_conn, method, path, data, headers):
772 807
773 808 self.size = data_len
774 809
775   - if m:
776   - # Use the chunked trailer for the digest
777   - hd = m.hexdigest()
778   - self.md5, self.base64md5 = self.get_md5_from_hexdigest(hd)
  810 + for alg in digesters:
  811 + self.local_hashes[alg] = digesters[alg].digest()
779 812
780 813 if chunked_transfer:
781 814 http_conn.send('0\r\n')
@@ -846,6 +879,7 @@ def sender(http_conn, method, path, data, headers):
846 879 sender=sender,
847 880 query_args=query_args)
848 881 self.handle_version_headers(resp, force=True)
  882 + self.handle_addl_headers(resp.getheaders())
849 883
850 884 def compute_md5(self, fp, size=None):
851 885 """
@@ -858,14 +892,9 @@ def compute_md5(self, fp, size=None):
858 892 :param size: (optional) The Maximum number of bytes to read
859 893 from the file pointer (fp). This is useful when uploading
860 894 a file in multiple parts where the file is being split
861   - inplace into different parts. Less bytes may be available.
862   -
863   - :rtype: tuple
864   - :return: A tuple containing the hex digest version of the MD5
865   - hash as the first element and the base64 encoded version
866   - of the plain digest as the second element.
  895 + in place into different parts. Less bytes may be available.
867 896 """
868   - tup = compute_md5(fp, size=size)
  897 + hex_digest, b64_digest, data_size = compute_md5(fp, size=size)
869 898 # Returned values are MD5 hash, base64 encoded MD5 hash, and data size.
870 899 # The internal implementation of compute_md5() needs to return the
871 900 # data size but we don't want to return that value to the external
@@ -873,8 +902,8 @@ def compute_md5(self, fp, size=None):
873 902 # break some code) so we consume the third tuple value here and
874 903 # return the remainder of the tuple to the caller, thereby preserving
875 904 # the existing interface.
876   - self.size = tup[2]
877   - return tup[0:2]
  905 + self.size = data_size
  906 + return (hex_digest, b64_digest)
878 907
879 908 def set_contents_from_stream(self, fp, headers=None, replace=True,
880 909 cb=None, num_cb=10, policy=None,
@@ -1203,14 +1232,11 @@ class of the new Key to be REDUCED_REDUNDANCY. The Reduced
1203 1232 :rtype: int
1204 1233 :return: The number of bytes written to the key.
1205 1234 """
1206   - fp = open(filename, 'rb')
1207   - try:
1208   - return self.set_contents_from_file(fp, headers, replace,
1209   - cb, num_cb, policy,
1210   - md5, reduced_redundancy,
  1235 + with open(filename, 'rb') as fp:
  1236 + return self.set_contents_from_file(fp, headers, replace, cb,
  1237 + num_cb, policy, md5,
  1238 + reduced_redundancy,
1211 1239 encrypt_key=encrypt_key)
1212   - finally:
1213   - fp.close()
1214 1240
1215 1241 def set_contents_from_string(self, s, headers=None, replace=True,
1216 1242 cb=None, num_cb=10, policy=None, md5=None,
@@ -1321,11 +1347,12 @@ def get_file(self, fp, headers=None, cb=None, num_cb=10,
1321 1347 torrent=torrent, version_id=version_id,
1322 1348 override_num_retries=override_num_retries,
1323 1349 response_headers=response_headers,
  1350 + hash_algs=None,
1324 1351 query_args=None)
1325 1352
1326 1353 def _get_file_internal(self, fp, headers=None, cb=None, num_cb=10,
1327 1354 torrent=False, version_id=None, override_num_retries=None,
1328   - response_headers=None, query_args=None):
  1355 + response_headers=None, hash_algs=None, query_args=None):
1329 1356 if headers is None:
1330 1357 headers = {}
1331 1358 save_debug = self.bucket.connection.debug
@@ -1335,9 +1362,11 @@ def _get_file_internal(self, fp, headers=None, cb=None, num_cb=10,
1335 1362 query_args = query_args or []
1336 1363 if torrent:
1337 1364 query_args.append('torrent')
1338   - m = None
1339   - else:
1340   - m = md5()
  1365 +
  1366 + if hash_algs is None and not torrent:
  1367 + hash_algs = {'md5': md5}
  1368 + digesters = dict((alg, hash_algs[alg]()) for alg in hash_algs or {})
  1369 +
1341 1370 # If a version_id is passed in, use that. If not, check to see
1342 1371 # if the Key object has an explicit version_id and, if so, use that.
1343 1372 # Otherwise, don't pass a version_id query param.
@@ -1347,7 +1376,8 @@ def _get_file_internal(self, fp, headers=None, cb=None, num_cb=10,
1347 1376 query_args.append('versionId=%s' % version_id)
1348 1377 if response_headers:
1349 1378 for key in response_headers:
1350   - query_args.append('%s=%s' % (key, urllib.quote(response_headers[key])))
  1379 + query_args.append('%s=%s' % (
  1380 + key, urllib.quote(response_headers[key])))
1351 1381 query_args = '&'.join(query_args)
1352 1382 self.open('r', headers, query_args=query_args,
1353 1383 override_num_retries=override_num_retries)
@@ -1373,8 +1403,8 @@ def _get_file_internal(self, fp, headers=None, cb=None, num_cb=10,
1373 1403 for bytes in self:
1374 1404 fp.write(bytes)
1375 1405 data_len += len(bytes)
1376   - if m:
1377   - m.update(bytes)
  1406 + for alg in digesters:
  1407 + digesters[alg].update(bytes)
1378 1408 if cb:
1379 1409 if cb_size > 0 and data_len >= cb_size:
1380 1410 break
@@ -1384,8 +1414,8 @@ def _get_file_internal(self, fp, headers=None, cb=None, num_cb=10,
1384 1414 i = 0
1385 1415 if cb and (cb_count <= 1 or i > 0) and data_len > 0:
1386 1416 cb(data_len, cb_size)
1387   - if m:
1388   - self.md5 = m.hexdigest()
  1417 + for alg in digesters:
  1418 + self.local_hashes[alg] = digesters[alg].digest()
1389 1419 if self.size is None and not torrent and "Range" not in headers:
1390 1420 self.size = data_len
1391 1421 self.close()
27 boto/s3/resumable_download_handler.py
@@ -90,7 +90,7 @@ class ResumableDownloadHandler(object):
90 90 Handler for resumable downloads.
91 91 """
92 92
93   - ETAG_REGEX = '([a-z0-9]{32})\n'
  93 + MIN_ETAG_LEN = 5
94 94
95 95 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
96 96 socket.gaierror)
@@ -127,11 +127,11 @@ def _load_tracker_file_etag(self):
127 127 f = None
128 128 try:
129 129 f = open(self.tracker_file_name, 'r')
130   - etag_line = f.readline()
131   - m = re.search(self.ETAG_REGEX, etag_line)
132   - if m:
133   - self.etag_value_for_current_download = m.group(1)
134   - else:
  130 + self.etag_value_for_current_download = f.readline().rstrip('\n')
  131 + # We used to match an MD5-based regex to ensure that the etag was
  132 + # read correctly. Since ETags need not be MD5s, we now do a simple
  133 + # length sanity check instead.
  134 + if len(self.etag_value_for_current_download) < self.MIN_ETAG_LEN:
135 135 print('Couldn\'t read etag in tracker file (%s). Restarting '
136 136 'download from scratch.' % self.tracker_file_name)
137 137 except IOError, e:
@@ -173,7 +173,7 @@ def _remove_tracker_file(self):
173 173 os.unlink(self.tracker_file_name)
174 174
175 175 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
176   - torrent, version_id):
  176 + torrent, version_id, hash_algs):
177 177 """
178 178 Attempts a resumable download.
179 179
@@ -213,11 +213,11 @@ def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
213 213 # Disable AWSAuthConnection-level retry behavior, since that would
214 214 # cause downloads to restart from scratch.
215 215 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
216   - override_num_retries=0)
  216 + override_num_retries=0, hash_algs=hash_algs)
217 217 fp.flush()
218 218
219 219 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
220   - version_id=None):
  220 + version_id=None, hash_algs=None):
221 221 """
222 222 Retrieves a file from a Key
223 223 :type key: :class:`boto.s3.key.Key` or subclass
@@ -249,6 +249,11 @@ def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
249 249 :type version_id: string
250 250 :param version_id: The version ID (optional)
251 251
  252 + :type hash_algs: dictionary
  253 + :param hash_algs: (optional) Dictionary of hash algorithms and
  254 + corresponding hashing class that implements update() and digest().
  255 + Defaults to {'md5': hashlib/md5.md5}.
  256 +
252 257 Raises ResumableDownloadException if a problem occurs during
253 258 the transfer.
254 259 """
@@ -267,7 +272,7 @@ def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
267 272 had_file_bytes_before_attempt = get_cur_file_size(fp)
268 273 try:
269 274 self._attempt_resumable_download(key, fp, headers, cb, num_cb,
270   - torrent, version_id)
  275 + torrent, version_id, hash_algs)
271 276 # Download succceded, so remove the tracker file (if have one).
272 277 self._remove_tracker_file()
273 278 # Previously, check_final_md5() was called here to validate
@@ -286,7 +291,7 @@ def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
286 291 # so we need to close and reopen the key before resuming
287 292 # the download.
288 293 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
289   - override_num_retries=0)
  294 + override_num_retries=0, hash_algs=hash_algs)
290 295 except ResumableDownloadException, e:
291 296 if (e.disposition ==
292 297 ResumableTransferDisposition.ABORT_CUR_PROCESS):
23 boto/storage_uri.py
@@ -195,12 +195,20 @@ def get_contents_to_stream(self, fp, headers=None, version_id=None):
195 195
196 196 def get_contents_to_file(self, fp, headers=None, cb=None, num_cb=10,
197 197 torrent=False, version_id=None,
198   - res_download_handler=None, response_headers=None):
  198 + res_download_handler=None, response_headers=None,
  199 + hash_algs=None):
199 200 self._check_object_uri('get_contents_to_file')
200 201 key = self.get_key(None, headers)
201 202 self.check_response(key, 'key', self.uri)
202   - key.get_contents_to_file(fp, headers, cb, num_cb, torrent, version_id,
203   - res_download_handler, response_headers)
  203 + if hash_algs:
  204 + key.get_contents_to_file(fp, headers, cb, num_cb, torrent,
  205 + version_id, res_download_handler,
  206 + response_headers,
  207 + hash_algs=hash_algs)
  208 + else:
  209 + key.get_contents_to_file(fp, headers, cb, num_cb, torrent,
  210 + version_id, res_download_handler,
  211 + response_headers)
204 212
205 213 def get_contents_as_string(self, validate=False, headers=None, cb=None,
206 214 num_cb=10, torrent=False, version_id=None):
@@ -742,6 +750,15 @@ def set_metadata(self, metadata_plus, metadata_minus, preserve_acl,
742 750 preserve_acl,
743 751 headers=headers)
744 752
  753 + def compose(self, components, content_type=None, headers=None):
  754 + self._check_object_uri('compose')
  755 + component_keys = []
  756 + for suri in components:
  757 + component_keys.append(suri.new_key())
  758 + component_keys[-1].generation = suri.generation
  759 + self.new_key().compose(
  760 + component_keys, content_type=content_type, headers=headers)
  761 +
745 762 def exists(self, headers=None):
746 763 """Returns True if the object exists or False if it doesn't"""
747 764 if not self.object_name:
4 boto/utils.py
@@ -90,7 +90,9 @@
90 90 # GET bucket?storageClass is not part of the S3 API.)
91 91 'storageClass',
92 92 # websiteConfig is a QSA for buckets in Google Cloud Storage.
93   - 'websiteConfig']
  93 + 'websiteConfig',
  94 + # compose is a QSA for objects in Google Cloud Storage.
  95 + 'compose']
94 96
95 97
96 98 _first_cap_regex = re.compile('(.)([A-Z][a-z]+)')
4 tests/integration/gs/test_resumable_downloads.py
@@ -113,9 +113,7 @@ def test_failed_download_with_persistent_tracker(self):
113 113 self.assertTrue(os.path.exists(tracker_file_name))
114 114 f = open(tracker_file_name)
115 115 etag_line = f.readline()
116   - m = re.search(ResumableDownloadHandler.ETAG_REGEX, etag_line)
117   - f.close()
118   - self.assertTrue(m)
  116 + self.assertEquals(etag_line.rstrip('\n'), small_src_key.etag.strip('"\''))
119 117
120 118 def test_retryable_exception_recovery(self):
121 119 """
38 tests/integration/gs/test_storage_uri.py
@@ -23,10 +23,12 @@
23 23
24 24 """Unit tests for StorageUri interface."""
25 25
  26 +import binascii
26 27 import re
27 28 import StringIO
28 29
29 30 from boto import storage_uri
  31 +from boto.exception import BotoClientError
30 32 from boto.gs.acl import SupportedPermissions as perms
31 33 from tests.integration.gs.testcase import GSTestCase
32 34
@@ -79,7 +81,7 @@ def testSetAclXml(self):
79 81 "<Permission>READ</Permission></Entry>")
80 82 acl_string = re.sub(r"</Entries>",
81 83 all_users_read_permission + "</Entries>",
82   - bucket_acl.to_xml())
  84 + bucket_acl.to_xml())
83 85
84 86 # Test-generated owner IDs are not currently valid for buckets
85 87 acl_no_owner_string = re.sub(r"<Owner>.*</Owner>", "", acl_string)
@@ -123,3 +125,37 @@ def testPropertiesUpdated(self):
123 125 k = b.get_key("obj")
124 126 self.assertEqual(k.generation, key_uri.generation)
125 127 self.assertEquals(k.get_contents_as_string(), "data3")
  128 +
  129 + def testCompose(self):
  130 + data1 = 'hello '
  131 + data2 = 'world!'
  132 + expected_crc = 1238062967
  133 +
  134 + b = self._MakeBucket()
  135 + bucket_uri = storage_uri("gs://%s" % b.name)
  136 + key_uri1 = bucket_uri.clone_replace_name("component1")
  137 + key_uri1.set_contents_from_string(data1)
  138 + key_uri2 = bucket_uri.clone_replace_name("component2")
  139 + key_uri2.set_contents_from_string(data2)
  140 +
  141 + # Simple compose.
  142 + key_uri_composite = bucket_uri.clone_replace_name("composite")
  143 + components = [key_uri1, key_uri2]
  144 + key_uri_composite.compose(components, content_type='text/plain')
  145 + self.assertEquals(key_uri_composite.get_contents_as_string(),
  146 + data1 + data2)
  147 + composite_key = key_uri_composite.get_key()
  148 + cloud_crc32c = binascii.hexlify(
  149 + composite_key.cloud_hashes['crc32c'])
  150 + self.assertEquals(cloud_crc32c, hex(expected_crc)[2:])
  151 + self.assertEquals(composite_key.content_type, 'text/plain')
  152 +
  153 + # Compose disallowed between buckets.
  154 + key_uri1.bucket_name += '2'
  155 + try:
  156 + key_uri_composite.compose(components)
  157 + self.fail('Composing between buckets didn\'t fail as expected.')
  158 + except BotoClientError as err:
  159 + self.assertEquals(
  160 + err.reason, 'GCS does not support inter-bucket composing')
  161 +

0 comments on commit dcff946

Please sign in to comment.
Something went wrong with that request. Please try again.