Skip to content

Commit

Permalink
Support uploading bundle from staged s3 bucket to DSS (#16)
Browse files Browse the repository at this point in the history
* Basic functionality in place for uploading full bundle from an s3 url.

* Convert dict quotes to their appropriate form. Convert zip to list so can be re-iterated.

* test getting uuids and file names from cloud, remove unnecessary var.

* Add encryption and aws region variables to travis.

* made AssertListEqual compatible with py3.
  • Loading branch information
Mackey22 committed Jul 13, 2017
1 parent 4368761 commit 97ffc1e
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 65 deletions.
27 changes: 13 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
language: python
cache: pip

python:
- 2.7
- 3.4
- 3.5
- 3.6

- 2.7
- 3.4
- 3.5
- 3.6
before_install:
- pip install --quiet coverage flake8 pyyaml

- pip install --quiet coverage flake8 pyyaml
install:
- make install

- make install
script:
- make test

- make test
after_success:
- bash <(curl -s https://codecov.io/bash)

- bash <(curl -s https://codecov.io/bash)
sudo: false
env:
global:
- secure: F4114AVsajBPgK2lhGA+oKUwICiq50ziJk7USgT587GRCXR3rt/ERdOzZd2lBfmb6i2jR6YKcZ1/W4qp5D7EC1qyOgfXUQbKFuJ00JsMD3C/PJXQWQxtaBfhFOaKxXPzpKKo2oJhkC2kqTpC6ch6gjMCupUoAgJZd7KVRS4pkkxSNF9FhObl6etUfod76XHnPKfMCzZQGVkLJsg1i5dbiwKz5TsCBfXcVb0xgFE0MgwmAuIkrou0RH+Ag/XMNEY8NFRd0Nuf3hC4CJ07K9U/rlfkxDuRHN8otd3dMAoZV8ZPO87wPgvJ+S62BKvWXQd2f0CzqTCrQsS9tJoHyQvfz8lVGpYd8+Qywfw7Ao6k+lGEm6bmeZjPQ0Pnk/s3AD255fCsBKaA9bVUQ423vwmiebYM/5sHCsThtqx61gJyjojFDmN5rFgPtdOlVJXU3ka20jpsa/wCyys9TzdaHvM7Q8NWkwfk090+WqgWLlvwsAIqpoZLm1ZwVm66V51Z1el+/zaw05BDoRrNFIaP1oV3dYyb8aki/fWSLcQ2DSP54a1SZkUCiiD/mOxalZZdG4yRL3wRV6bSKeWppt/dX0SHXmPEPWW930YVaLz5peCsHcB3Zvd7IOAGUWs7wCRAq26lYruvi6cqD3UtHJl+/ghv+oy/Hgpk2x8rcVKIlvMuD5A=
- secure: i4TnnjYv1u6ujWBNCZiO82uu1Ooti/Savgti+c5cCsWg1zus5QI1LMc5f16hx+NjYW4e9kWHz5nahcTFEZclm0ovx7FMLcLCHuUlaAiQhc89yqpWYR0arxBWBD0myZy0KG8PfVO/oXecKjJUQSFDUHL9OT+ELQ0Z0NYdNFiCZKzzt0Bsh+EMRcZ0r97SJHOohkGVFvX2NeKGvkcOVyxd20H7xMKTG+2Sx5IEMj8BI6EPrZ1yPjJ4AlUjF2f5nsf5Bry9z1tZkHQn8xQeYz+e1XoGKic2qxU8utpslefWAGE5HTBydk4JwOMpACge9PUoT09/9SEu8ma01iUinMvMVNu1MbfclK6lsaohjdqaQXOdWwvjy8SltdHKG7ws9AshhnCgupM/0znLX6Gj15AFV0vCuNqUVSNfKlGt/XqwOZE+lUb8+Ej9DPqnFNSAGU2RSFDEmrVu1OWkA1dz5cJ6R09+C6jFAuP4YKn3YB3S2LJ/iIqvx2SW3O8j0q6JDVxmoPVY/CO9y37JnP5QqeQQL6xepK+/Iymd2G+gQDV2VV3LZtJnVerZPAbNL0b8tGLxJhw8aMp5lSIt+WEla6LeV+D1wnPzrw9vzAG71qmGraU2R7IO/upYuH8RPuEFWDeATWQjsa7mxwWZy45JKltlbFDH8E1GkhO/9CysNH1g9bc=
- AWS_DEFAULT_REGION=us-east-1
70 changes: 43 additions & 27 deletions hca/full_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import sys
import uuid
import logging

from io import open

import boto3

from .constants import Constants
from .upload_to_cloud import upload_to_cloud

Expand All @@ -31,7 +32,13 @@ def add_parser(cls, subparsers):
subparser.add_argument(
cls.FILE_OR_DIR_ARGNAME,
nargs="+",
help="Relative or direct path to folder with all bundle files."
help="Relative or direct path to folder with all bundle files. Alternatively, user can provide a url \
to an s3 bucket containing a bundle. S3 files must have checksum tags already calculated and assigned."
)

subparser.add_argument(
"--staging-bucket",
help="Bucket within replica to upload to."
)

subparser.add_argument(
Expand All @@ -43,46 +50,50 @@ def add_parser(cls, subparsers):
return True

@classmethod
def _upload_files(cls, args):
filenames = []
def _upload_files(cls, args, staging_bucket):
files_to_upload = []
from_cloud = False
for path in args[cls.FILE_OR_DIR_ARGNAME]:
# Path is s3 url
if path[:5] == "s3://":
from_cloud = True
files_to_upload.append(path)

# If the path is a directory, add all files in the directory to the bundle
if os.path.isdir(path):
elif os.path.isdir(path):
for filename in os.listdir(path):
full_file_name = os.path.join(path, filename)
filenames.append(filename)
files_to_upload.append(open(full_file_name, "rb"))
else: # It's a file
filenames.append(os.path.basename(path))
files_to_upload.append(open(path, "rb"))

file_uuids, uploaded_keys = upload_to_cloud(files_to_upload, staging_bucket, args['replica'], from_cloud)
filenames = list(map(os.path.basename, uploaded_keys))

# Print to stderr, upload the files to s3 and return a list of tuples: (filename, filekey)
logging.info("Uploading the following keys to aws:")
for file_ in filenames:
logging.info("\t", file_)
logging.info(file_)

uploaded_keys = upload_to_cloud(files_to_upload, args["staging_bucket"], args["replica"])

filename_key_list = zip(filenames, uploaded_keys)
logging.info("\nThe following keys were uploaded successfuly:")
for filename, key in filename_key_list:
logging.info('\t{:<12} {:<12}'.format(filename, key))
filename_key_list = list(zip(filenames, file_uuids, uploaded_keys))
logging.info("\nThe following keys were uploaded successfully:")
for filename, file_uuid, key in filename_key_list:
logging.info('{:<12} {:<12}'.format(filename, key))
return filename_key_list

@classmethod
def _put_files(cls, filename_key_list, staging_bucket, api):
"""Use the API class to make a put-files request on each of these files."""
bundle_uuid = str(uuid.uuid4())
files = []
for filename, key in filename_key_list:
for filename, file_uuid, key in filename_key_list:
logging.info("File {}: registering...".format(filename))

# Generating file data
creator_uid = os.environ.get(cls.CREATOR_ID_ENVIRONMENT_VARIABLE, "1")
source_url = "s3://{}/{}".format(staging_bucket, key)
file_uuid = key[:key.find("/")]
logging.info(source_url)
# file_uuid = key[:key.find("/")]
logging.info("File {}: assigned uuid {}".format(filename, file_uuid))

response = api.make_request([
Expand All @@ -94,13 +105,13 @@ def _put_files(cls, filename_key_list, staging_bucket, api):
], stream=True)

if response.ok:
version = response.json().get("version", "blank")
version = response.json().get('version', "blank")
logging.info("File {}: registered with uuid {}".format(filename, file_uuid))
files.append({
"name": filename,
"version": version,
"uuid": file_uuid,
"creator_uid": creator_uid
'name': filename,
'version': version,
'uuid': file_uuid,
'creator_uid': creator_uid
})
response.close()

Expand All @@ -119,9 +130,9 @@ def _put_bundle(cls, bundle_uuid, files, api, replica):
"""Use the API class to make a put-bundles request."""
file_args = [Constants.OBJECT_SPLITTER.join([
"True",
file["name"],
file["uuid"],
file["version"]]) for file in files]
file['name'],
file['uuid'],
file['version']]) for file in files]
creator_uid = os.environ.get(cls.CREATOR_ID_ENVIRONMENT_VARIABLE, "1")

logging.info("Bundle {}: registering...".format(bundle_uuid))
Expand All @@ -138,7 +149,7 @@ def _put_bundle(cls, bundle_uuid, files, api, replica):
version = None

if response.ok:
version = response.json().get("version", "blank")
version = response.json().get('version', None)
logging.info("Bundle {}: registered successfully".format(bundle_uuid))

else:
Expand Down Expand Up @@ -167,7 +178,12 @@ def run(cls, args, api):
Step 2: Put the files in the blue box with a shared bundle_uuid.
Step 3: Put all uploaded files into a bundle together.
"""
filename_key_list = cls._upload_files(args)
bundle_uuid, files = cls._put_files(filename_key_list, args["staging_bucket"], api)
first_url = args['file_or_dir'][0]
# If there's a staging bucket input, set staging bucket to that.
# Otherwise grab it from the s3 url.
staging_bucket = args.get('staging_bucket', first_url[5: first_url.find("/", 5)])

filename_key_list = cls._upload_files(args, staging_bucket)
bundle_uuid, files = cls._put_files(filename_key_list, staging_bucket, api)
final_return = cls._put_bundle(bundle_uuid, files, api, args["replica"])
return final_return
78 changes: 54 additions & 24 deletions hca/upload_to_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,68 @@ def _mime_type(filename):
raise RuntimeError("Can't discern mime type")


def upload_to_cloud(files, staging_bucket, replica):
def _copy_from_s3(path, s3, tx_cfg):
bucket_end = path.find("/", 5)
bucket_name = path[5: bucket_end]
dir_path = path[bucket_end + 1:]

src_bucket = s3.Bucket(bucket_name)
file_uuids = []
key_names = []
logging.info("Key Names:")
for obj in src_bucket.objects.filter(Prefix=dir_path):
# Empty files with no name were throwing errors
if obj.key == dir_path:
continue

logging.info(obj.key)

file_uuids.append(str(uuid.uuid4()))
key_names.append(obj.key)

return file_uuids, key_names


def upload_to_cloud(files, staging_bucket, replica, from_cloud=False):
"""
Upload files to cloud.
:param files: A list of binary files to upload.
:param files: If from_cloud, files is a aws s3 directory path to files with appropriate metadata uploaded.
Else, a list of binary files to upload.
:param staging_bucket: The aws bucket to upload the files to.
:param replica: The cloud replica to write to. One of 'aws', 'gc', or 'azure'. No functionality now.
:return: a list of each file's unique key name.
"""
tx_cfg = TransferConfig(multipart_threshold=S3Etag.etag_stride,
multipart_chunksize=S3Etag.etag_stride)
s3 = boto3.resource("s3")
bucket = s3.Bucket(staging_bucket)
file_uuids = []
key_names = []
for raw_fh in files:
with ChecksummingBufferedReader(raw_fh) as fh:

key_name = "{}/{}".format(uuid.uuid4(), os.path.basename(fh.raw.name))
bucket.upload_fileobj(fh, key_name, Config=tx_cfg)
sums = fh.get_checksums()
metadata = {
"hca-dss-s3_etag": sums["s3_etag"],
"hca-dss-sha1": sums["sha1"],
"hca-dss-sha256": sums["sha256"],
"hca-dss-crc32c": sums["crc32c"],
"hca-dss-content-type": _mime_type(fh.raw.name)
}

s3.meta.client.put_object_tagging(Bucket=bucket.name,
Key=key_name,
Tagging=dict(TagSet=encode_tags(metadata))
)
key_names.append(key_name)

return key_names

if from_cloud:
file_uuids, key_names = _copy_from_s3(files[0], s3, tx_cfg)

else:
destination_bucket = s3.Bucket(staging_bucket)
for raw_fh in files:
with ChecksummingBufferedReader(raw_fh) as fh:
file_uuid = str(uuid.uuid4())
key_name = "{}/{}".format(file_uuid, os.path.basename(fh.raw.name))
destination_bucket.upload_fileobj(fh, key_name, Config=tx_cfg)
sums = fh.get_checksums()
metadata = {
"hca-dss-s3_etag": sums["s3_etag"],
"hca-dss-sha1": sums["sha1"],
"hca-dss-sha256": sums["sha256"],
"hca-dss-crc32c": sums["crc32c"],
"hca-dss-content-type": _mime_type(fh.raw.name)
}

s3.meta.client.put_object_tagging(Bucket=destination_bucket.name,
Key=key_name,
Tagging=dict(TagSet=encode_tags(metadata))
)
file_uuids.append(file_uuid)
key_names.append(key_name)

return file_uuids, key_names
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
author_email='akislyuk@chanzuckerberg.com',
description='Human Cell Atlas Data Storage System Command Line Interface',
long_description=open('README.rst').read(),
tests_require=[
"six==1.10.0"
],
install_requires=[
"requests==2.17.3",
"jsonpointer==1.10",
Expand Down
19 changes: 19 additions & 0 deletions test/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import unittest
import pprint

import six

pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, pkg_root)

Expand Down Expand Up @@ -230,6 +232,23 @@ def test_json_input(self):
out = {"query": {"hello": "world", "goodbye": "earth"}}
self.assertEqual(out, parsed_args)

def test_upload_to_cloud_from_s3(self):
uuids, names = hca.upload_to_cloud.upload_to_cloud(
["s3://hca-dss-test-src/data-bundles-examples/import/10x/pbmc8k/bundles/bundle1/"],
"pointless-staging-bucket",
"aws",
True
)
out = [
"data-bundles-examples/import/10x/pbmc8k/bundles/bundle1/assay.json",
"data-bundles-examples/import/10x/pbmc8k/bundles/bundle1/project.json",
"data-bundles-examples/import/10x/pbmc8k/bundles/bundle1/sample.json"
]
self.assertEqual(len(uuids), len(names))
assert_list_items_equal = (self.assertCountEqual if six.PY3
else self.assertItemsEqual)
assert_list_items_equal(names, out)

def test_upload_files(self):
pass

Expand Down

0 comments on commit 97ffc1e

Please sign in to comment.