Skip to content

Commit

Permalink
Add pipeline dataset property to track data lineage
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Sep 17, 2019
1 parent b20132f commit b32522a
Show file tree
Hide file tree
Showing 10 changed files with 752 additions and 218 deletions.
136 changes: 122 additions & 14 deletions holoviews/core/data/__init__.py
Expand Up @@ -5,16 +5,19 @@
except ImportError:
pass

import types
import copy
import numpy as np
import param
from param.parameterized import add_metaclass, ParameterizedMetaclass

from .. import util
from ..accessors import Redim
from ..dimension import Dimension, process_dimensions
from ..element import Element
from ..ndmapping import OrderedDict
from ..ndmapping import OrderedDict, MultiDimensionalMapping
from ..spaces import HoloMap, DynamicMap
from .interface import Interface, iloc, ndloc, DataError
from .interface import Interface, iloc, ndloc
from .array import ArrayInterface
from .dictionary import DictInterface
from .grid import GridInterface
Expand Down Expand Up @@ -155,6 +158,7 @@ def __call__(self, new_type, kdims=None, vdims=None, groupby=None,
if len(kdims) == selected.ndims or not groupby:
# Propagate dataset
params['dataset'] = self._element.dataset
params['pipeline'] = self._element._pipeline
element = new_type(selected, **params)
return element.sort() if sort else element
group = selected.groupby(groupby, container_type=HoloMap,
Expand All @@ -165,7 +169,52 @@ def __call__(self, new_type, kdims=None, vdims=None, groupby=None,
return group


class PipelineMeta(ParameterizedMetaclass):

# Public methods that should not be wrapped
blacklist = ['__init__', 'clone', 'execute_pipeline']

def __new__(cls, classname, bases, classdict):

for method_name in classdict:
method_fn = classdict[method_name]
if method_name in cls.blacklist or method_name.startswith('_'):
continue
elif isinstance(method_fn, types.FunctionType):
classdict[method_name] = cls.pipelined(method_fn)

inst = type.__new__(cls, classname, bases, classdict)
inst._in_method = False
return inst

@staticmethod
def pipelined(method):
def pipelined_fn(*a, **k):
inst = a[0]
in_method = inst._in_method
if not in_method:
inst._in_method = True

result = method(*a, **k)

if not in_method:
if isinstance(result, Dataset):
result._pipeline = inst._pipeline + [
(method, list(a[1:]), k)
]
elif isinstance(result, MultiDimensionalMapping):
for key, element in result.items():
element._pipeline = inst._pipeline + [
(method, list(a[1:]), k),
(getattr(type(result), '__getitem__'), [key], {})
]
inst._in_method = False
return result

return pipelined_fn


@add_metaclass(PipelineMeta)
class Dataset(Element):
"""
Dataset provides a general baseclass for Element types that
Expand Down Expand Up @@ -201,6 +250,8 @@ class Dataset(Element):
_kdim_reductions = {}

def __init__(self, data, kdims=None, vdims=None, **kwargs):
input_dataset = kwargs.pop('dataset', None)
input_pipeline = kwargs.pop('pipeline', [])
if isinstance(data, Element):
pvals = util.get_param_values(data)
kwargs.update([(l, pvals[l]) for l in ['group', 'label']
Expand All @@ -217,6 +268,65 @@ def __init__(self, data, kdims=None, vdims=None, **kwargs):

self.redim = Redim(self, mode='dataset')

# Handle _pipeline property
self._pipeline = input_pipeline + [(
type(self),
[],
kwargs, # includes kdims and vdims
)]

# Handle initializing the dataset property.
self._dataset = None
if input_dataset is not None:
self._dataset = input_dataset.clone(dataset=None, pipeline=[])

elif type(self) is Dataset:
self._dataset = self

@property
def dataset(self):
"""
The Dataset that this object was created from
"""
from . import Dataset
if self._dataset is None:
dataset = Dataset(self, _validate_vdims=False)
if hasattr(self, '_binned'):
dataset._binned = self._binned
return dataset
else:
return self._dataset

@property
def pipeline(self):
"""
List of (function, args, kwargs) tuples that represents the sequence
of operations that was used to create this object, starting
with the Dataset stored in dataset property
"""
return self._pipeline

def execute_pipeline(self, data=None):
"""
Create a new object of the same type by executing the sequence of
operations that was used to create this object.
Args:
data: Input data to the pipeline. If None, defaults to the value
of the dataset property and the resulting object will equal the
this object.
Returns:
An object with the same type as this object
"""
new_dataset = self.dataset.clone(data=data, dataset=None, pipeline=[])
result = new_dataset
for fn, a, kw in self._pipeline:
result = fn(result, *a, **kw)

result._pipeline = copy.copy(self._pipeline)
result._dataset = new_dataset
return result

def closest(self, coords=[], **kwargs):
"""Snaps coordinate(s) to closest coordinate in Dataset
Expand Down Expand Up @@ -880,20 +990,18 @@ def clone(self, data=None, shared_data=True, new_type=None, *args, **overrides):
datatypes = [self.interface.datatype] + self.datatype
overrides['datatype'] = list(util.unique_iterator(datatypes))

if 'dataset' in overrides:
dataset = overrides.pop('dataset')
else:
dataset = self.dataset
if 'dataset' not in overrides:
overrides['dataset'] = self.dataset

new_dataset = super(Dataset, self).clone(data, shared_data, new_type, *args, **overrides)
if 'pipeline' not in overrides:
overrides['pipeline'] = self._pipeline

if dataset is not None:
try:
new_dataset._dataset = dataset.clone(data=new_dataset.data, dataset=None)
except DataError:
# New dataset doesn't have the necessary dimensions to
# propagate dataset. Do nothing
pass
if data is None:
overrides['_validate_vdims'] = False

new_dataset = super(Dataset, self).clone(
data, shared_data, new_type, *args, **overrides
)

return new_dataset

Expand Down
60 changes: 36 additions & 24 deletions holoviews/core/data/interface.py
Expand Up @@ -40,19 +40,34 @@ def __init__(self, msg, interface=None):
super(DataError, self).__init__(msg)


class iloc(object):
class Accessor(object):
def __init__(self, dataset):
self.dataset = dataset

def __getitem__(self, index):
from ..data import Dataset
res = self._perform_getitem(self.dataset, index)
if isinstance(res, Dataset):
res._pipeline = self.dataset.pipeline + [
(getattr(type(self), '_perform_getitem'), [index], {})
]
return res

@classmethod
def _perform_getitem(cls, dataset, index):
raise NotImplementedError()


class iloc(Accessor):
"""
iloc is small wrapper object that allows row, column based
indexing into a Dataset using the ``.iloc`` property. It supports
the usual numpy and pandas iloc indexing semantics including
integer indices, slices, lists and arrays of values. For more
information see the ``Dataset.iloc`` property docstring.
"""

def __init__(self, dataset):
self.dataset = dataset

def __getitem__(self, index):
@classmethod
def _perform_getitem(cls, dataset, index):
index = util.wrap_tuple(index)
if len(index) == 1:
index = (index[0], slice(None))
Expand All @@ -63,55 +78,52 @@ def __getitem__(self, index):
rows, cols = index
if rows is Ellipsis:
rows = slice(None)
data = self.dataset.interface.iloc(self.dataset.dataset, (rows, cols))
kdims = self.dataset.kdims
vdims = self.dataset.vdims
data = dataset.interface.iloc(dataset.dataset, (rows, cols))
kdims = dataset.kdims
vdims = dataset.vdims
if np.isscalar(data):
return data
elif cols == slice(None):
pass
else:
if isinstance(cols, slice):
dims = self.dataset.dimensions()[index[1]]
dims = dataset.dimensions()[index[1]]
elif np.isscalar(cols):
dims = [self.dataset.get_dimension(cols)]
dims = [dataset.get_dimension(cols)]
else:
dims = [self.dataset.get_dimension(d) for d in cols]
dims = [dataset.get_dimension(d) for d in cols]
kdims = [d for d in dims if d in kdims]
vdims = [d for d in dims if d in vdims]

datatype = [dt for dt in self.dataset.datatype
datatype = [dt for dt in dataset.datatype
if dt in Interface.interfaces and
not Interface.interfaces[dt].gridded]
if not datatype: datatype = ['dataframe', 'dictionary']
return self.dataset.clone(data, kdims=kdims, vdims=vdims,
datatype=datatype)
return dataset.clone(data, kdims=kdims, vdims=vdims,
datatype=datatype)


class ndloc(object):
class ndloc(Accessor):
"""
ndloc is a small wrapper object that allows ndarray-like indexing
for gridded Datasets using the ``.ndloc`` property. It supports
the standard NumPy ndarray indexing semantics including
integer indices, slices, lists and arrays of values. For more
information see the ``Dataset.ndloc`` property docstring.
"""

def __init__(self, dataset):
self.dataset = dataset

def __getitem__(self, indices):
ds = self.dataset
@classmethod
def _perform_getitem(cls, dataset, indices):
ds = dataset
indices = util.wrap_tuple(indices)
if not ds.interface.gridded:
raise IndexError('Cannot use ndloc on non nd-dimensional datastructure')
selected = self.dataset.interface.ndloc(ds, indices)
selected = dataset.interface.ndloc(ds, indices)
if np.isscalar(selected):
return selected
params = {}
if hasattr(ds, 'bounds'):
params['bounds'] = None
return self.dataset.clone(selected, datatype=[ds.interface.datatype]+ds.datatype, **params)
return dataset.clone(selected, datatype=[ds.interface.datatype]+ds.datatype, **params)


class Interface(param.Parameterized):
Expand Down
7 changes: 4 additions & 3 deletions holoviews/core/data/multipath.py
Expand Up @@ -59,7 +59,7 @@ def validate(cls, dataset, vdims=True):
return

from holoviews.element import Polygons
ds = cls._inner_dataset_template(dataset)
ds = cls._inner_dataset_template(dataset, validate_vdims=vdims)
for d in dataset.data:
ds.data = d
ds.interface.validate(ds, vdims)
Expand All @@ -76,15 +76,16 @@ def validate(cls, dataset, vdims=True):


@classmethod
def _inner_dataset_template(cls, dataset):
def _inner_dataset_template(cls, dataset, validate_vdims=True):
"""
Returns a Dataset template used as a wrapper around the data
contained within the multi-interface dataset.
"""
from . import Dataset
vdims = dataset.vdims if getattr(dataset, 'level', None) is None else []
return Dataset(dataset.data[0], datatype=cls.subtypes,
kdims=dataset.kdims, vdims=vdims)
kdims=dataset.kdims, vdims=vdims,
_validate_vdims=validate_vdims)

@classmethod
def dimension_type(cls, dataset, dim):
Expand Down
37 changes: 0 additions & 37 deletions holoviews/core/dimension.py
Expand Up @@ -486,41 +486,8 @@ def __init__(self, data, id=None, plot_id=None, **params):
This class also has an id instance attribute, which
may be set to associate some custom options with the object.
"""
from . import Dataset, DataError
self.data = data

# Handle initializing the dataset property.
self._dataset = None
input_dataset = params.pop('dataset', None)
if type(self) is Dataset:
self._dataset = self
elif input_dataset is not None:
# Clone dimension info from input dataset with reference to new
# data. This way we keep the metadata for all of the dimensions.
try:
self._dataset = input_dataset.clone(data=self.data)
except DataError:
# Dataset not compatible with input data
pass
if self._dataset is None:
# Create a default Dataset to wrap input data
try:
kdims = list(params.get('kdims', []))
vdims = list(params.get('vdims', []))
dims = kdims + vdims
dataset = Dataset(
self.data,
kdims=dims if dims else None
)
if len(dataset.dimensions()) == 0:
# No dimensions could be auto-detected in data
raise DataError("No dimensions detected")
self._dataset = dataset
except DataError:
# Data not supported by any storage backend. leave _dataset as
# None
pass

self._id = None
self.id = id
self._plot_id = plot_id or util.builtins.id(self)
Expand All @@ -542,10 +509,6 @@ def __init__(self, data, id=None, plot_id=None, **params):
raise ValueError("Supplied label %r contains invalid characters." %
self.label)

@property
def dataset(self):
return self._dataset

@property
def id(self):
return self._id
Expand Down

0 comments on commit b32522a

Please sign in to comment.