diff --git a/.changes/next-release/bugfix-copy-96913.json b/.changes/next-release/bugfix-copy-96913.json new file mode 100644 index 00000000..a20b9878 --- /dev/null +++ b/.changes/next-release/bugfix-copy-96913.json @@ -0,0 +1,5 @@ +{ + "type": "bugfix", + "category": "copy", + "description": "Added support for ``ChecksumAlgorithm`` when uploading copy data in parts." +} diff --git a/s3transfer/copies.py b/s3transfer/copies.py index a1dfdc8b..a42b848f 100644 --- a/s3transfer/copies.py +++ b/s3transfer/copies.py @@ -220,6 +220,8 @@ def _submit_multipart_request( num_parts, transfer_future.meta.size, ) + # Get the checksum algorithm of the multipart request. + checksum_algorithm = call_args.extra_args.get("ChecksumAlgorithm") part_futures.append( self._transfer_coordinator.submit( request_executor, @@ -234,6 +236,7 @@ def _submit_multipart_request( 'extra_args': extra_part_args, 'callbacks': progress_callbacks, 'size': size, + 'checksum_algorithm': checksum_algorithm, }, pending_main_kwargs={ 'upload_id': create_multipart_future @@ -331,6 +334,7 @@ def _main( extra_args, callbacks, size, + checksum_algorithm=None, ): """ :param client: The client to use when calling PutObject @@ -345,6 +349,8 @@ def _main( :param callbacks: List of callbacks to call after copy part :param size: The size of the transfer. This value is passed into the callbacks + :param checksum_algorithm: The algorithm that was used to create the multipart + upload :rtype: dict :returns: A dictionary representing a part:: @@ -352,7 +358,8 @@ def _main( {'Etag': etag_value, 'PartNumber': part_number} This value can be appended to a list to be used to complete - the multipart upload. + the multipart upload. If a checksum is in the response, + it will also be included. """ response = client.upload_part_copy( CopySource=copy_source, @@ -360,9 +367,16 @@ def _main( Key=key, UploadId=upload_id, PartNumber=part_number, - **extra_args + **extra_args, ) for callback in callbacks: callback(bytes_transferred=size) etag = response['CopyPartResult']['ETag'] - return {'ETag': etag, 'PartNumber': part_number} + part_metadata = {'ETag': etag, 'PartNumber': part_number} + if checksum_algorithm: + checksum_member = f'Checksum{checksum_algorithm.upper()}' + if checksum_member in response['CopyPartResult']: + part_metadata[checksum_member] = response['CopyPartResult'][ + checksum_member + ] + return part_metadata diff --git a/tests/functional/test_copy.py b/tests/functional/test_copy.py index 86f05cbf..38e19b71 100644 --- a/tests/functional/test_copy.py +++ b/tests/functional/test_copy.py @@ -200,6 +200,29 @@ def test_copy(self): future.result() self.stubber.assert_no_pending_responses() + def test_copy_with_checksum(self): + self.extra_args['ChecksumAlgorithm'] = 'crc32' + expected_head_params = { + 'Bucket': 'mysourcebucket', + 'Key': 'mysourcekey', + } + expected_copy_object = { + 'Bucket': self.bucket, + 'Key': self.key, + 'CopySource': self.copy_source, + 'ChecksumAlgorithm': 'crc32', + } + self.add_head_object_response(expected_params=expected_head_params) + self.add_successful_copy_responses( + expected_copy_params=expected_copy_object + ) + + call_kwargs = self.create_call_kwargs() + call_kwargs['extra_args'] = self.extra_args + future = self.manager.copy(**call_kwargs) + future.result() + self.stubber.assert_no_pending_responses() + def test_copy_with_extra_args(self): self.extra_args['MetadataDirective'] = 'REPLACE' @@ -302,6 +325,7 @@ def setUp(self): multipart_chunksize=4, ) self._manager = TransferManager(self.client, self.config) + self.multipart_id = 'my-upload-id' def create_stubbed_responses(self): return [ @@ -311,7 +335,7 @@ def create_stubbed_responses(self): }, { 'method': 'create_multipart_upload', - 'service_response': {'UploadId': 'my-upload-id'}, + 'service_response': {'UploadId': self.multipart_id}, }, { 'method': 'upload_part_copy', @@ -328,6 +352,84 @@ def create_stubbed_responses(self): {'method': 'complete_multipart_upload', 'service_response': {}}, ] + def add_get_head_response_with_default_expected_params( + self, extra_expected_params=None + ): + expected_params = { + 'Bucket': 'mysourcebucket', + 'Key': 'mysourcekey', + } + if extra_expected_params: + expected_params.update(extra_expected_params) + response = self.create_stubbed_responses()[0] + response['expected_params'] = expected_params + self.stubber.add_response(**response) + + def add_create_multipart_response_with_default_expected_params( + self, extra_expected_params=None + ): + expected_params = {'Bucket': self.bucket, 'Key': self.key} + if extra_expected_params: + expected_params.update(extra_expected_params) + response = self.create_stubbed_responses()[1] + response['expected_params'] = expected_params + self.stubber.add_response(**response) + + def add_upload_part_copy_responses_with_default_expected_params( + self, extra_expected_params=None + ): + ranges = [ + 'bytes=0-5242879', + 'bytes=5242880-10485759', + 'bytes=10485760-13107199', + ] + upload_part_responses = self.create_stubbed_responses()[2:-1] + for i, range_val in enumerate(ranges): + upload_part_response = upload_part_responses[i] + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + 'CopySource': self.copy_source, + 'UploadId': self.multipart_id, + 'PartNumber': i + 1, + 'CopySourceRange': range_val, + } + if extra_expected_params: + if 'ChecksumAlgorithm' in extra_expected_params: + name = extra_expected_params['ChecksumAlgorithm'] + checksum_member = 'Checksum%s' % name.upper() + response = upload_part_response['service_response'] + response['CopyPartResult'][checksum_member] = 'sum%s==' % ( + i + 1 + ) + else: + expected_params.update(extra_expected_params) + + upload_part_response['expected_params'] = expected_params + self.stubber.add_response(**upload_part_response) + + def add_complete_multipart_response_with_default_expected_params( + self, extra_expected_params=None + ): + expected_params = { + 'Bucket': self.bucket, + 'Key': self.key, + 'UploadId': self.multipart_id, + 'MultipartUpload': { + 'Parts': [ + {'ETag': 'etag-1', 'PartNumber': 1}, + {'ETag': 'etag-2', 'PartNumber': 2}, + {'ETag': 'etag-3', 'PartNumber': 3}, + ] + }, + } + if extra_expected_params: + expected_params.update(extra_expected_params) + + response = self.create_stubbed_responses()[-1] + response['expected_params'] = expected_params + self.stubber.add_response(**response) + def create_expected_progress_callback_info(self): # Note that last read is from the empty sentinel indicating # that the stream is done. @@ -341,8 +443,6 @@ def add_create_multipart_upload_response(self): self.stubber.add_response(**self.create_stubbed_responses()[1]) def _get_expected_params(self): - upload_id = 'my-upload-id' - # Add expected parameters to the head object expected_head_params = { 'Bucket': 'mysourcebucket', @@ -368,7 +468,7 @@ def _get_expected_params(self): 'Bucket': self.bucket, 'Key': self.key, 'CopySource': self.copy_source, - 'UploadId': upload_id, + 'UploadId': self.multipart_id, 'PartNumber': i + 1, 'CopySourceRange': range_val, } @@ -378,7 +478,7 @@ def _get_expected_params(self): expected_complete_mpu_params = { 'Bucket': self.bucket, 'Key': self.key, - 'UploadId': upload_id, + 'UploadId': self.multipart_id, 'MultipartUpload': { 'Parts': [ {'ETag': 'etag-1', 'PartNumber': 1}, @@ -441,6 +541,54 @@ def test_copy_with_extra_args(self): future.result() self.stubber.assert_no_pending_responses() + def test_copy_passes_checksums(self): + # This extra argument should be added to the head object, + # the create multipart upload, and upload part copy. + self.extra_args['ChecksumAlgorithm'] = 'sha256' + + self.add_get_head_response_with_default_expected_params() + + # ChecksumAlgorithm should be passed on the create_multipart call + self.add_create_multipart_response_with_default_expected_params( + self.extra_args, + ) + + # ChecksumAlgorithm should be passed to the upload_part_copy calls + self.add_upload_part_copy_responses_with_default_expected_params( + self.extra_args, + ) + + # The checksums should be used in the complete call like etags + self.add_complete_multipart_response_with_default_expected_params( + extra_expected_params={ + 'MultipartUpload': { + 'Parts': [ + { + 'ETag': 'etag-1', + 'PartNumber': 1, + 'ChecksumSHA256': 'sum1==', + }, + { + 'ETag': 'etag-2', + 'PartNumber': 2, + 'ChecksumSHA256': 'sum2==', + }, + { + 'ETag': 'etag-3', + 'PartNumber': 3, + 'ChecksumSHA256': 'sum3==', + }, + ] + } + } + ) + + call_kwargs = self.create_call_kwargs() + call_kwargs['extra_args'] = self.extra_args + future = self.manager.copy(**call_kwargs) + future.result() + self.stubber.assert_no_pending_responses() + def test_copy_blacklists_args_to_create_multipart(self): # This argument can never be used for multipart uploads self.extra_args['MetadataDirective'] = 'COPY' @@ -528,7 +676,7 @@ def test_abort_on_failure(self): expected_params={ 'Bucket': self.bucket, 'Key': self.key, - 'UploadId': 'my-upload-id', + 'UploadId': self.multipart_id, }, ) diff --git a/tests/unit/test_copies.py b/tests/unit/test_copies.py index 946fe31c..8768db2c 100644 --- a/tests/unit/test_copies.py +++ b/tests/unit/test_copies.py @@ -98,6 +98,7 @@ def setUp(self): self.upload_id = 'myuploadid' self.part_number = 1 self.result_etag = 'my-etag' + self.checksum_sha1 = 'my-checksum_sha1' def get_copy_task(self, **kwargs): default_kwargs = { @@ -133,6 +134,35 @@ def test_main(self): ) self.stubber.assert_no_pending_responses() + def test_main_with_checksum(self): + self.stubber.add_response( + 'upload_part_copy', + service_response={ + 'CopyPartResult': { + 'ETag': self.result_etag, + 'ChecksumSHA1': self.checksum_sha1, + } + }, + expected_params={ + 'Bucket': self.bucket, + 'Key': self.key, + 'CopySource': self.copy_source, + 'UploadId': self.upload_id, + 'PartNumber': self.part_number, + 'CopySourceRange': self.copy_source_range, + }, + ) + task = self.get_copy_task(checksum_algorithm="sha1") + self.assertEqual( + task(), + { + 'PartNumber': self.part_number, + 'ETag': self.result_etag, + 'ChecksumSHA1': self.checksum_sha1, + }, + ) + self.stubber.assert_no_pending_responses() + def test_extra_args(self): self.extra_args['RequestPayer'] = 'requester' self.stubber.add_response(