Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion datafs/core/data_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,35 @@ def create(
**res)

def get_archive(self, archive_name, default_version=None):
'''
Retrieve a data archive

Parameters
----------

archive_name : str

Name of the archive to retrieve

default_version : version

str or :py:class:`~distutils.StrictVersion` giving the default
version number to be used on read operations

Returns
-------
archive : object

New :py:class:`~datafs.core.data_archive.DataArchive` object

Raises
------

KeyError :

A KeyError is raised when the ``archive_name`` is not found

'''

res = self.manager.get_archive(archive_name)

Expand All @@ -169,6 +198,50 @@ def get_archive(self, archive_name, default_version=None):
default_version=default_version,
**res)

def batch_get_archive(self, archive_names, default_versions=None):
'''
Batch version of :py:meth:`~DataAPI.get_archive`

Parameters
----------

archive_names : list

Iterable of archive names to retrieve

default_versions : version or dict

Default versions to assign to each returned archive. If
``default_versions`` is a dict, each ``archive_name``

Returns
-------

archives : list

List of :py:class:`~datafs.core.data_archive.DataArchive` objects.
If an archive is not found, it is omitted (`batch_get_archive` does
not raise a ``KeyError`` on invalid archive names).

'''

responses = self.manager.batch_get_archive(archive_names)

archives = {}

for res in responses:
archive_name = res['archive_name']
default_version = self._default_versions.get(archive_name, None)

archive = self._ArchiveConstructor(
api=self,
default_version=default_version,
**res)

archives[archive_name] = archive

return archives

def listdir(self, location, authority_name=None):
'''
List archive path components at a given location
Expand Down Expand Up @@ -201,7 +274,6 @@ def listdir(self, location, authority_name=None):
only one authority is attached or if
:py:attr:`DefaultAuthorityName` is assigned).


Returns
-------

Expand Down
42 changes: 40 additions & 2 deletions datafs/managers/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,43 @@ def get_archive(self, archive_name):

try:
spec = self._get_archive_spec(archive_name)
spec['archive_name'] = archive_name
return spec

except KeyError:
raise KeyError('Archive "{}" not found'.format(archive_name))

def batch_get_archive(self, archive_names):
'''
Batched version of :py:meth:`~DynamoDBManager._get_archive_listing`

Returns a list of full archive listings from an iterable of archive
names

.. note ::

Invalid archive names will simply not be returned, so the response
may not be the same length as the supplied `archive_names`.

Parameters
----------

archive_names : list

List of archive names

Returns
-------

archive_listings : list

List of archive listings

'''

return map(
self._format_archive_listing_as_constructor_spec,
self._batch_get_archive_listing(archive_names))

def get_metadata(self, archive_name):
'''
Retrieve the metadata for a given archive
Expand Down Expand Up @@ -457,7 +488,14 @@ def _get_archive_spec(self, archive_name):
if res is None:
raise KeyError

spec = ['authority_name', 'archive_path', 'versioned']
return self._format_archive_listing_as_constructor_spec(res)

@staticmethod
def _format_archive_listing_as_constructor_spec(res):

res['archive_name'] = res.pop('_id')

spec = ['archive_name', 'authority_name', 'archive_path', 'versioned']

return {k: v for k, v in res.items() if k in spec}

Expand Down
59 changes: 59 additions & 0 deletions datafs/managers/manager_dynamo.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,65 @@ def _get_archive_listing(self, archive_name):
'''
return self._table.get_item(Key={'_id': archive_name})['Item']

def _batch_get_archive_listing(self, archive_names):
'''
Batched version of :py:meth:`~DynamoDBManager._get_archive_listing`

Returns a list of full archive listings from an iterable of archive
names

.. note ::

Invalid archive names will simply not be returned, so the response
may not be the same length as the supplied `archive_names`.

Parameters
----------

archive_names : list

List of archive names

Returns
-------

archive_listings : list

List of archive listings

'''

archive_names = list(archive_names)

MAX_QUERY_LENGTH = 100

archives = []

for query_index in range(0, len(archive_names), MAX_QUERY_LENGTH):
current_query = {
'Keys': [{'_id': i} for i in archive_names[
query_index: query_index+MAX_QUERY_LENGTH]]}

attempts = 0
res = {}

while True:

if attempts > 0 and len(res.get('UnprocessedKeys', {})) > 0:
current_query = res['UnprocessedKeys'][self._table_name]

elif attempts > 0 and len(res.get('UnprocessedKeys', {})) == 0:
break

res = self._resource.batch_get_item(
RequestItems={self._table_name: current_query})

archives.extend(res['Responses'][self._table_name])

attempts += 1

return archives

def _delete_archive_record(self, archive_name):

return self._table.delete_item(Key={'_id': archive_name})
Expand Down
35 changes: 35 additions & 0 deletions datafs/managers/manager_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,41 @@ def _get_archive_listing(self, archive_name):

return res

def _batch_get_archive_listing(self, archive_names):
'''
Batched version of :py:meth:`~MongoDBManager._get_archive_listing`

Returns a list of full archive listings from an iterable of archive
names

.. note ::

Invalid archive names will simply not be returned, so the response
may not be the same length as the supplied `archive_names`.

Parameters
----------

archive_names : list

List of archive names

Returns
-------

archive_listings : list

List of archive listings

'''

res = self.collection.find({'_id': {'$in': list(archive_names)}})

if res is None:
res = []

return res

def _delete_archive_record(self, archive_name):

return self.collection.remove({'_id': archive_name})
Expand Down
14 changes: 14 additions & 0 deletions tests/test_archive_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ def test_get_all_archives(api_with_diverse_archives):
assert len(variables) == total


def test_batch_get_all_archives(api_with_diverse_archives):

# Test the total number of archives
archives = api_with_diverse_archives.batch_get_archive(
api_with_diverse_archives.filter())

total = (
api_with_diverse_archives.TEST_ATTRS['archives.variable']
+ api_with_diverse_archives.TEST_ATTRS['archives.parameter']
+ api_with_diverse_archives.TEST_ATTRS['archives.config'])

assert len(archives) == total


def test_substr_search(api_with_diverse_archives):

# Test the total number of "variable" archives
Expand Down