Skip to content

Commit

Permalink
Download bundle metadata into bundle.json (#231)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessebrennan committed Jul 11, 2019
1 parent 8805027 commit aac72c3
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 31 deletions.
101 changes: 70 additions & 31 deletions hca/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import errno
import functools
import itertools
import json
from collections import defaultdict, namedtuple
import csv
import concurrent.futures
Expand Down Expand Up @@ -57,6 +57,16 @@ def from_dss_bundle_response(cls, file_dict, replica):
indexed=file_dict['indexed'],
replica=replica)

@classmethod
def from_bundle_json(cls, metadata_bytes, bundle_uuid, replica):
return cls(name='bundle.json',
uuid=bundle_uuid,
version=datetime.utcnow().strftime("%Y-%m-%dT%H%M%S.%fZ"),
sha256=hashlib.sha256(metadata_bytes).hexdigest(),
size=len(metadata_bytes),
indexed=False,
replica=replica)


class DSSClient(SwaggerClient):
"""
Expand Down Expand Up @@ -389,31 +399,20 @@ def _bundle_download_tasks(self,
Returns an iterator of tasks that each download one of the files in a bundle.
"""
logger.info('Downloading bundle %s version %s ...', bundle_uuid, version)
pages = self.get_bundle.paginate(uuid=bundle_uuid, replica=replica, version=version if version else None)
metadata = self._get_full_bundle_metadata(bundle_uuid, replica, version)
bundle_fqid = bundle_uuid + '.' + metadata['bundle']['version']
bundle_dir = os.path.join(download_dir, bundle_fqid)

files = {}
for page in pages:
for file_ in page['bundle']['files']:
# The file name collision check is case-insensitive even if the local file system we're running on is
# case-sensitive. We do this in order to get consistent download behavior on all operating systems and
# file systems. The case of file names downloaded to a case-sensitive system will still match exactly
# what's specified in the bundle manifest. We just don't want a bundle with files 'Foo' and 'foo' to
# create two files on one system and one file on the other. Allowing this to happen would, in the best
# case, overwrite Foo with foo locally. A resumed download could produce a local file called foo that
# contains a mix of data from Foo and foo.
filename = file_.get("name", file_["uuid"]).lower()
if files.setdefault(filename, file_) is not file_:
raise ValueError("Bundle {bundle_uuid} version {version} contains multiple files named "
"'{filename}' or a case derivation thereof"
.format(filename=filename, bundle_uuid=bundle_uuid, version=version))
# there will always be one page (or else we would have gotten a 404)
# noinspection PyUnboundLocalVariable
bundle_fqid = bundle_uuid + '.' + page['bundle']['version']
# Download bundle.json (metadata for bundle as a file)
metadata_bytes = json.dumps(metadata, sort_keys=True).encode()
metadata_dss_file = DSSFile.from_bundle_json(metadata_bytes, bundle_uuid, replica)
yield (metadata_dss_file,
functools.partial(self._download_metadata, metadata_bytes, download_dir, bundle_dir, metadata_dss_file))

for file_ in files.values():
for file_ in metadata['bundle']['files'].values():
dss_file = DSSFile.from_dss_bundle_response(file_, replica)
filename = file_.get("name", dss_file.uuid)
walking_dir = os.path.join(download_dir, bundle_fqid)
walking_dir = bundle_dir

globs = metadata_files if file_['indexed'] else data_files
if not any(fnmatchcase(filename, glob) for glob in globs):
Expand All @@ -422,8 +421,6 @@ def _bundle_download_tasks(self,
intermediate_path, filename_base = os.path.split(filename)
if intermediate_path:
walking_dir = os.path.join(walking_dir, intermediate_path)
if not os.path.isdir(walking_dir):
os.makedirs(walking_dir)

logger.info("File %s: Retrieving...", filename)
file_path = os.path.join(walking_dir, filename_base)
Expand All @@ -434,6 +431,44 @@ def _bundle_download_tasks(self,
num_retries=num_retries,
min_delay_seconds=min_delay_seconds)

def _download_metadata(self, metadata, download_dir, bundle_dir, dss_file):
dest_path = self._file_path(dss_file.sha256, download_dir)
if os.path.exists(dest_path):
logger.info("Skipping download of '%s' because it already exists at '%s'.", dss_file.name, dest_path)
else:
self._make_path_if_necessary(dest_path)
with atomic_write(dest_path, mode="wb", overwrite=True) as fh:
fh.write(metadata)
file_path = os.path.join(bundle_dir, dss_file.name)
self._make_path_if_necessary(file_path)
hardlink(dest_path, file_path)

def _get_full_bundle_metadata(self, bundle_uuid, replica, version):
"""
Takes care of paging through the bundle and checks for name collisions
"""
pages = self.get_bundle.paginate(uuid=bundle_uuid, replica=replica, version=version if version else None)
files = {}
for page in pages:
for file_ in page['bundle']['files']:
# The file name collision check is case-insensitive even if the local file system we're running on is
# case-sensitive. We do this in order to get consistent download behavior on all operating systems and
# file systems. The case of file names downloaded to a case-sensitive system will still match exactly
# what's specified in the bundle manifest. We just don't want a bundle with files 'Foo' and 'foo' to
# create two files on one system and one file on the other. Allowing this to happen would, in the best
# case, overwrite Foo with foo locally. A resumed download could produce a local file called foo that
# contains a mix of data from Foo and foo.
filename = file_.get("name", file_["uuid"]).lower()
if files.setdefault(filename, file_) is not file_:
raise ValueError("Bundle {bundle_uuid} version {version} contains multiple files named "
"'{filename}' or a case derivation thereof"
.format(filename=filename, bundle_uuid=bundle_uuid, version=version))
metadata = page
# there will always be one page (or else we would have gotten a 404)
# noinspection PyUnboundLocalVariable
metadata['bundle']['files'] = files
return metadata

def _download_manifest_tasks(self, manifest, replica, num_retries, min_delay_seconds, download_dir):
"""
Returns an iterator of tasks for downloading all of the files in a bundle. Note that these tasks all
Expand Down Expand Up @@ -474,6 +509,7 @@ def _download_and_link_to_filestore(self, download_dir, dss_file, file_path, num
dss_file,
num_retries=num_retries,
min_delay_seconds=min_delay_seconds)
self._make_path_if_necessary(file_path)
hardlink(file_store_path, file_path)

def _download_file(self, dss_file, dest_path, num_retries=10, min_delay_seconds=0.25):
Expand All @@ -485,13 +521,7 @@ def _download_file(self, dss_file, dest_path, num_retries=10, min_delay_seconds=
If we can, we will attempt HTTP resume. However, we verify that the server supports HTTP resume. If the
ranged get doesn't yield the correct header, then we start over.
"""
directory, _ = os.path.split(dest_path)
if directory:
try:
os.makedirs(directory)
except OSError as e:
if e.errno != errno.EEXIST:
raise
self._make_path_if_necessary(dest_path)
with atomic_write(dest_path, mode="wb", overwrite=True) as fh:
if dss_file.size == 0:
return
Expand All @@ -504,6 +534,15 @@ def _download_file(self, dss_file, dest_path, num_retries=10, min_delay_seconds=
raise ValueError("Expected sha256 {} Received sha256 {}".format(
dss_file.sha256.lower(), download_hash.lower()))

def _make_path_if_necessary(self, dest_path):
directory, _ = os.path.split(dest_path)
if directory:
try:
os.makedirs(directory)
except OSError as e:
if e.errno != errno.EEXIST:
raise

def _do_download_file(self, dss_file, fh, num_retries, min_delay_seconds):
"""
Abstracts away complications for downloading a file, handles retries and delays, and computes its hash
Expand Down
1 change: 1 addition & 0 deletions test/integration/dss/test_dss_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def test_python_upload_download(self):
# but only if globs are non-empty
assert not all(glob in [(), ('',)] for glob in [metadata_globs, data_globs])
downloaded_files.remove('.hca')
downloaded_files.remove('bundle.json')
self.assertEqual(expect_downloaded_files, downloaded_files)
for file in downloaded_files:
manifest_entry = next(entry for entry in manifest['files'] if entry['name'] == file)
Expand Down
53 changes: 53 additions & 0 deletions test/unit/test_dss_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import errno
import hashlib
import logging
import os
import platform
Expand Down Expand Up @@ -66,6 +67,9 @@ def _fake_get_bundle_paginate(*args, **kwargs):
}
]
}
# This ensures that each bundle.json is distinct
for f in bundle_dict['files']:
f['bundle_uuid'] = kwargs['uuid']
yield {'bundle': bundle_dict}


Expand Down Expand Up @@ -289,6 +293,7 @@ def data_files(self, prefix='.'):
os.path.join(prefix, 'a_uuid.1_version', 'a_file_name'),
os.path.join(prefix, 'b_uuid.1_version', 'b_file_name'),
os.path.join(prefix, 'c_uuid.1_version', 'c_file_name'),
os.path.join(prefix, 'c_uuid.1_version', 'bundle.json'),
}

def metadata_files(self, prefix='.'):
Expand Down Expand Up @@ -393,6 +398,30 @@ def test_link_fail(self):
patch('hca.dss.DSSClient._download_file', side_effect=_fake_download_file):
self.assertRaises(RuntimeError, self.dss.download_manifest, self.manifest_file, 'aws', layout='bundle')

@patch('hca.dss.DSSClient.get_bundle')
@patch('hca.dss.DSSClient._download_file')
def test_bundle_json(self, mock_download_file, mock_get_bundle):
"""
Assert that the correct content is written to bundle.json and the hashes match
"""
mock_get_bundle.paginate = _fake_get_bundle_paginate
jobs = list(self.dss._bundle_download_tasks('a_uuid', 'aws'))
dss_file, task = jobs[0]
self.assertEqual(dss_file.name, 'bundle.json')
task()
actual_files = self._files_present()
expected_hash = '5eea2e2bf59b04758dd8265d8acaaef4d382a9cc899a0a01f7ed42d7b0591b94'
bundle_json_path = os.path.join('.', 'a_uuid.1_version', 'bundle.json')
expected_files = {
bundle_json_path,
os.path.join('.', 'manifest.tsv'),
os.path.join('.', self.version_dir, '5e', 'ea2e', expected_hash)
}
self.assertEqual(expected_files, actual_files)
with open(bundle_json_path, 'rb') as f:
actual_hash = hashlib.sha256(f.read()).hexdigest()
self.assertEqual(actual_hash, expected_hash)


class TestDownload(AbstractTestDSSClient):

Expand Down Expand Up @@ -439,5 +468,29 @@ def test_download_dir(self):
def test_download_dir_dot_dir(self):
self._test_download_dir(os.path.join('.', 'a_nested_dir'))

@patch('hca.dss.DSSClient.get_bundle')
@patch('hca.dss.DSSClient._download_file')
def test_bundle_json(self, mock_download_file, mock_get_bundle):
"""
Assert that the correct content is written to bundle.json and the hashes match
"""
mock_get_bundle.paginate = _fake_get_bundle_paginate
jobs = list(self.dss._bundle_download_tasks('a_uuid', 'aws'))
dss_file, task = jobs[0]
self.assertEqual(dss_file.name, 'bundle.json')
task()
actual_files = self._files_present()
expected_hash = '5eea2e2bf59b04758dd8265d8acaaef4d382a9cc899a0a01f7ed42d7b0591b94'
bundle_json_path = os.path.join('.', 'a_uuid.1_version', 'bundle.json')
expected_files = {
bundle_json_path,
os.path.join('.', 'manifest.tsv'),
os.path.join('.', self.version_dir, '5e', 'ea2e', expected_hash)
}
self.assertEqual(expected_files, actual_files)
with open(bundle_json_path, 'rb') as f:
actual_hash = hashlib.sha256(f.read()).hexdigest()
self.assertEqual(actual_hash, expected_hash)

if __name__ == "__main__":
unittest.main()

0 comments on commit aac72c3

Please sign in to comment.