diff --git a/hca/dss/upload_to_cloud.py b/hca/dss/upload_to_cloud.py index 3f1237ca..9bc5b99f 100755 --- a/hca/dss/upload_to_cloud.py +++ b/hca/dss/upload_to_cloud.py @@ -11,7 +11,8 @@ import boto3 from boto3.s3.transfer import TransferConfig -from dcplib.checksumming_io import ChecksummingBufferedReader, S3Etag +from dcplib import s3_multipart +from dcplib.checksumming_io import ChecksummingBufferedReader def encode_tags(tags): @@ -27,7 +28,7 @@ def _mime_type(filename): return "application/octet-stream" -def _copy_from_s3(path, s3, tx_cfg): +def _copy_from_s3(path, s3): bucket_end = path.find("/", 5) bucket_name = path[5: bucket_end] dir_path = path[bucket_end + 1:] @@ -59,19 +60,20 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False): :param replica: The cloud replica to write to. One of 'aws', 'gc', or 'azure'. No functionality now. :return: a list of each file's unique key name. """ - tx_cfg = TransferConfig(multipart_threshold=S3Etag.etag_stride, - multipart_chunksize=S3Etag.etag_stride) s3 = boto3.resource("s3") file_uuids = [] key_names = [] if from_cloud: - file_uuids, key_names = _copy_from_s3(file_handles[0], s3, tx_cfg) - + file_uuids, key_names = _copy_from_s3(file_handles[0], s3) else: destination_bucket = s3.Bucket(staging_bucket) for raw_fh in file_handles: - with ChecksummingBufferedReader(raw_fh) as fh: + file_size = os.path.getsize(raw_fh.name) + multipart_chunksize = s3_multipart.get_s3_multipart_chunk_size(file_size) + tx_cfg = TransferConfig(multipart_threshold=s3_multipart.MULTIPART_THRESHOLD, + multipart_chunksize=multipart_chunksize) + with ChecksummingBufferedReader(raw_fh, multipart_chunksize) as fh: file_uuid = str(uuid.uuid4()) key_name = "{}/{}".format(file_uuid, os.path.basename(fh.raw.name)) destination_bucket.upload_fileobj( @@ -89,11 +91,9 @@ def upload_to_cloud(file_handles, staging_bucket, replica, from_cloud=False): "hca-dss-sha256": sums["sha256"], "hca-dss-crc32c": sums["crc32c"], } - s3.meta.client.put_object_tagging(Bucket=destination_bucket.name, Key=key_name, - Tagging=dict(TagSet=encode_tags(metadata)) - ) + Tagging=dict(TagSet=encode_tags(metadata))) file_uuids.append(file_uuid) key_names.append(key_name) diff --git a/hca/upload/s3_agent.py b/hca/upload/s3_agent.py index 132b1443..0109bcec 100644 --- a/hca/upload/s3_agent.py +++ b/hca/upload/s3_agent.py @@ -7,8 +7,7 @@ from botocore.credentials import CredentialResolver from botocore.session import get_session -KB = 1024 -MB = KB * KB +from dcplib import s3_multipart def sizeof_fmt(num, suffix='B'): @@ -48,7 +47,7 @@ def upload_progress_callback(self, bytes_transferred): sys.stdout.flush() def upload_file(self, local_path, target_bucket, target_key, content_type, report_progress=False): - self.file_size = os.stat(local_path).st_size + self.file_size = os.path.getsize(local_path) bucket = self.s3.Bucket(target_bucket) obj = bucket.Object(target_key) upload_fileobj_args = { @@ -69,16 +68,5 @@ def list_bucket_by_page(self, bucket_name, key_prefix): @classmethod def transfer_config(cls, file_size): - etag_stride = cls._s3_chunk_size(file_size) - return TransferConfig(multipart_threshold=etag_stride, - multipart_chunksize=etag_stride) - - @staticmethod - def _s3_chunk_size(file_size): - if file_size <= 10000 * 64 * MB: - return 64 * MB - else: - div = file_size // 10000 - if div * 10000 < file_size: - div += 1 - return ((div + (MB - 1)) // MB) * MB + return TransferConfig(multipart_threshold=s3_multipart.MULTIPART_THRESHOLD, + multipart_chunksize=s3_multipart.get_s3_multipart_chunk_size(file_size)) diff --git a/requirements.txt b/requirements.txt index 4e69d821..b2890630 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,6 @@ jsonschema >= 2.6, < 3 requests >= 2.17, < 3 six >= 1.10, < 2 tweak >= 0.6.7, < 1 -dcplib >= 1.1.0, < 2 +dcplib >= 1.3.2, < 2 argcomplete >= 1.9.3, < 2 commonmark >= 0.7.4, < 1