Skip to content

Commit

Permalink
Merge branch 'member_data' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
dotsdl committed Jul 7, 2015
2 parents 24e1fe7 + 690560c commit 8670d5a
Show file tree
Hide file tree
Showing 7 changed files with 676 additions and 209 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ exclude_lines =
exclude_lines =
pragma: no cover
def __repr__
def _repr_html_
def __str__
raise NotImplementedError
if __name__ == .__main__.:
Expand Down
21 changes: 19 additions & 2 deletions mdsynthesis/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Container(object):
"""Core class for all Containers.
"""
_containertype = 'Container'

def __init__(self, container, location='.', coordinator=None,
categories=None, tags=None):
Expand Down Expand Up @@ -58,15 +59,21 @@ def __init__(self, container, location='.', coordinator=None,
"""
if os.path.exists(container):
self._regenerate('Container', container)
self._regenerate(self._containertype, container)
else:
self._generate('Container', container, location=location,
self._generate(self._containertype, container, location=location,
coordinator=coordinator, categories=categories,
tags=tags)

def __repr__(self):
return "<Container: '{}'>".format(self.name)

def __getstate__(self):
return self.filepath

def __setstate__(self, state):
self._regenerate(self._containertype, state)

def __eq__(self, other):
try:
return (self.name + self.uuid) == (other.name + other.uuid)
Expand Down Expand Up @@ -314,6 +321,13 @@ def basedir(self):
"""
return self._backend.get_location()

@property
def filepath(self):
"""Absolute path to the Container's state file.
"""
return self._backend.filename

@property
def coordinators(self):
"""The locations of the associated Coordinators.
Expand Down Expand Up @@ -427,6 +441,7 @@ class Sim(Container):
"""The Sim object is an interface to data for single simulations.
"""
_containertype = 'Sim'

def __init__(self, sim, universe=None, uname='main', location='.',
coordinator=None, categories=None, tags=None):
Expand Down Expand Up @@ -577,6 +592,8 @@ class Group(Container):
"""The Group object is a collection of Sims and Groups.
"""
_containertype = 'Group'

def __init__(self, group, members=None, location='.', coordinator=None,
categories=None, tags=None):
"""Generate a new or regenerate an existing (on disk) Group object.
Expand Down
150 changes: 115 additions & 35 deletions mdsynthesis/core/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,20 +745,6 @@ def __str__(self):

return out

@property
def data(self):
"""The data of the Container.
Data are user-generated pandas objects (e.g. Series, DataFrames), numpy
arrays, or any pickleable python object that are stored in the
Container for easy recall later. Each data instance is given its own
directory in the Container's tree.
"""
if not self._data:
self._data = MemberData(self)
return self._data


class MemberAgg(object):
"""Core functionality for aggregators attached to the Members aggregator.
Expand All @@ -773,7 +759,38 @@ class MemberData(MemberAgg):
"""Manipulators for member data.
"""
def _list(self, mode='any'):
def __repr__(self):
return "<Data({})>".format(self.keys(mode='any'))

def __getitem__(self, handle):
"""Retrieve aggreggated dataset from all members.
Returns datasets indexed according to member uuids.
See :meth:`MemberData.retrieve` for more information.
Raises :exc:`KeyError` if dataset doesn't exist for any members.
:Arguments:
*handle*
name of data to retrieve; may also be a list of names
:Returns:
*data*
aggregated data, indexed by member name; if *handle* was a
list, will be a list of equal length with the aggregated
datasets as members
"""
if isinstance(handle, list):
out = list()
for item in handle:
out.append(self.retrieve(item, by='uuid'))
elif isinstance(handle, basestring):
out = self.retrieve(handle, by='uuid')

return out

def keys(self, mode='any'):
"""List available datasets.
:Arguments:
Expand All @@ -793,23 +810,51 @@ def _list(self, mode='any'):
elif mode == 'all':
out = set.intersection(*datasets)

return list(out)
out = list(out)
out.sort()

# TODO: needs to work for more than just dataframes, series
def retrieve(self, handle, **kwargs):
"""Retrieve aggregated dataset from all members.
return out

The stored data structure for each member is read from disk
and aggregated. The aggregation scheme is dependent on the
form of the data structure.
def retrieve(self, handle, by='uuid', **kwargs):
"""Retrieve aggregated dataset from all members.
See :meth:`Data.retrieve` for more information on keyword usage.
This is a convenience method. The stored data structure for each member
is read from disk and aggregated. The aggregation scheme is dependent
on the form of the data structures pulled from each member:
pandas DataFrames or Series
the structures are appended together, with a new level added
to the index giving the member (see *by*) each set of rows
came from
pandas Panel or Panel4D, numpy arrays, pickled python objects
the structures are returned as a dictionary, with keys giving
the member (see *by*) and each value giving the corresponding
data structure
This method tries to do smart things with the data it reads from each
member. In particular:
- members for which there is no data with the given handle are
skipped
- the lowest-common-denominator data structure is output; this
means that if all data structures read are pandas DataFrames,
then a multi-index DataFrame is returned; if some structures are
pandas DataFrames, while some are anything else, a dictionary is
returned
:Arguments:
*handle*
name of data to retrieve
:Keywords:
*by*
top-level index of output data structure; 'name' uses member
names, 'uuid' uses member uuids; if names are not unique,
it is better to go with 'uuid' ['uuid']
See :meth:`Data.retrieve` for more information on keyword usage.
:Keywords for pandas data structures:
*where*
conditions for what rows/columns to return
*start*
Expand All @@ -826,21 +871,56 @@ def retrieve(self, handle, **kwargs):
:Returns:
*data*
aggregated data
aggregated data structure
"""
agg = None
def dict2multiindex(agg):
agg_mi = None
for member in agg:
d = agg[member]
label = len(d.index)*[member]
index = pd.MultiIndex.from_arrays([label, d.index])
d.index = index

if agg_mi is not None:
agg_mi = agg_mi.append(d)
else:
agg_mi = d

return agg_mi

# first, check for existence in any member
if handle not in self.keys('any'):
raise KeyError(
"No dataset '{}' found in any member".format(handle))

# get indexer from *by* keyword
if by == 'uuid':
def get_index(member): return member.uuid
elif by == 'name':
def get_index(member): return member.name
names = [member.name for member in self._members]
if len(set(names)) != len(names):
self._members._logger.warning(
"Member names not unique; data structure may not" +
" look as expected. Set *by* to 'uuid' to avoid this.")
else:
raise ValueError(
"*by* keyword must be either 'name' or 'uuid'")

# first, collect all the data into a dictionary, the
# lowest-common-denominator aggregation structure
agg = dict()
for member in self._members:
d = member.data.retrieve(handle, **kwargs)
label = len(d.index)*[member.name]
index = pd.MultiIndex.from_arrays([label, d.index])
# FIXME: BROKEN!
d.index = index

if agg is not None:
agg = agg.append(d)
else:
agg = d
agg[get_index(member)] = member.data.retrieve(handle, **kwargs)

# if data are all Series or all DataFrames, we build a multi-index
# Series or DataFrame (respectively)
all_s = all([isinstance(d, pd.Series) for d in agg.values()])
all_df = all([isinstance(d, pd.DataFrame) for d in agg.values()])

if all_s or all_df:
agg = dict2multiindex(agg)

return agg

Expand Down
70 changes: 67 additions & 3 deletions mdsynthesis/core/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import persistence
import filesystem
import numpy as np
import multiprocessing as mp
import mdsynthesis as mds


Expand Down Expand Up @@ -181,6 +182,62 @@ def _list(self):

return memberlist

@property
def data(self):
"""Access the data of each member, collectively.
"""
if not self._data:
self._data = aggregators.MemberData(self)
return self._data

def map(self, function, processes=1, **kwargs):
"""Apply a function to each member, perhaps in parallel.
A pool of processes is created for *processes* > 1; for example,
with 40 members and 'processes=4', 4 processes will be created,
each working on a single member at any given time. When each process
completes work on a member, it grabs another, until no members remain.
*kwargs* are passed to the given function when applied to each member
:Arguments:
*function*
function to apply to each member; must take only a single
container instance as input, but may take any number of keyword
arguments
:Keywords:
*processes*
how many processes to use; if 1, applies function to each
member in member order
:Returns:
*results*
list giving the result of the function for each member,
in member order; if the function returns ``None`` for each
member, then only ``None`` is returned instead of a list
"""
if processes > 1:
pool = mp.Pool(processes=processes)
results = dict()
for member in self:
results[member.uuid] = pool.apply_async(
function, args=(member,), kwds=kwargs).get()
pool.close()
pool.join()

# sort by member order
results = [results[uuid] for uuid in self.uuids]
else:
results = [function(member, **kwargs) for member in self]

# check if list is all ``None``: if so, we return ``None``
if all([(i is None) for i in results]):
results = None

return results


class _BundleBackend():
"""Backend class for Bundle.
Expand All @@ -196,7 +253,13 @@ class _BundleBackend():
def __init__(self):
# our table will be a structured array matching the schema of the
# GroupFile _Members Table
self.table = None
self.table = np.array(
[],
dtype={'names': ['uuid', 'containertype', 'abspath'],
'formats': ['a{}'.format(persistence.uuidlength),
'a{}'.format(persistence.namelength),
'a{}'.format(persistence.pathlength)]
}).reshape(1, -1)

def _member2record(self, uuid, containertype, basedir):
"""Return a record array from a member's information.
Expand Down Expand Up @@ -227,7 +290,7 @@ def add_member(self, uuid, containertype, basedir):
basedir of the new member in the filesystem
"""
if self.table is None:
if self.table.shape == (1, 0):
self.table = self._member2record(uuid, containertype, basedir)
else:
# check if uuid already present
Expand All @@ -254,7 +317,7 @@ def del_member(self, *uuid, **kwargs):
purge = kwargs.pop('all', False)

if purge:
self.table = None
self.__init__()
else:
# remove redundant uuids from given list if present
uuids = set([str(uid) for uid in uuid])
Expand Down Expand Up @@ -359,6 +422,7 @@ def __init__(self, *containers, **kwargs):
"""
self._backend = _BundleBackend()
self._cache = dict()
self._data = None

self.add(*containers)

Expand Down

0 comments on commit 8670d5a

Please sign in to comment.