In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import logging

In [3]:
from src import paths
from src.utils import save_json, load_json
from src.log import logger
from src.data.datasets import cached_datasets

In [4]:
logger.setLevel(logging.DEBUG)

In [5]:
cp  = paths['catalog_path']

In [6]:
import doctest

## Dataset DAG Spec

A transformer function takes in `input_datasets` and produces `output_datasets`.
Edges can be thought of as directed, indicating a dependency. e.g. `output_datasets` depend on `input_datasets`


```
{
    "hyperedge_1": {
        "input_datasets":[],
        "output_datasets":[],
        transformations: [
            (function_1, kwargs_dict_1 ),
            (function_2, kwargs_dict_2 ),
            ...
        ],
        "suppress_output": False,  # defaults to True
    },
    "sink_edge": {
        "datasource_name": "ds_name",
        "datasource_opts": {},
        "output_dataset: "ds_name",
    }
}
```



## Questions

* Are Sink edges special?
  Right now they define a 1-1 map between a datasource and a dataset. (i.e. these are not hyperedges).

* How to the list of transformers work with many in many out. Take/emit a dict of all of them?

* How to implement traversal. (Apply_transforms should take a dataset name - optionally, and give me an ordered list of what to run)

* add force flag to add_transformer, in case you are overwriting a key

* should it be input_dataset or input_datasets? The most common use case is probably the former
```


In [7]:
def normalize_to_list(str_or_iterable):
    """Convert strings to lists. Pass lists (or None) unchanged.
    """
    if isinstance(str_or_iterable, str):
        return [str_or_iterable]
    if str_or_iterable is None:
        return []
    return str_or_iterable

def get_transformers(
        transformer_path=None,
        transformer_file=None,
        include_filename=False,
    ):
    """Get the dictionary of transformers (edges in the transformer graph)

    Returns
    -------
    If include_filename is True:
        A tuple: (transformer_dict, transformer_file_fq)
    else:
        transformer_dict

    Parameters
    ----------
    include_filename: boolean
        if True, returns a tuple: (list, filename)
    transformer_path: path. (default: paths['catalog_dir'])
        Location of `transformer_file`
    transformer_file: str, default 'transformers.json'
        Name of json file that contains the transformer pipeline
    """
    if transformer_path is None:
        transformer_path = paths['catalog_path']
    else:
        transformer_path = pathlib.Path(dag_path)
    if transformer_file is None:
        transformer_file = 'transformers.json'

    transformer_file_fq = transformer_path / transformer_file
    try:
        transformer_dict = load_json(transformer_file_fq)
    except FileNotFoundError:
        transformer_dict = {}
        
    if not isinstance(transformer_dict, dict):
        raise Exception(f"Obsolete file format: {transformer_file} must contain a dict.")

    if include_filename:
        return transformer_dict, transformer_file_fq
    return transformer_dict


def add_transformer(
    name=None,
    datasource_name=None,
    datasource_opts=None,
    input_datasets=None,
    suppress_output=False,
    output_datasets=None,
    transformations=None,
    dag_path=None,
    edge_file=None,
    node_file=None,
    write_to_catalog=True,
    ):
    """Create and add a dataset transformation pipeline to the workflow.

    Transformer pipelines apply a sequence of transformer functions to a Dataset (or DataSource),
    to produce new Dataset objects.

    Parameters
    ----------
    name: string
        Name for this transformer instance (must be unique).
        By default, one will be created from the input and output dataset names; e.g.
        _input_ds1_input_ds2_to_output_ds1
    input_datasets: string or iterable
        Upstream data dependencies. These must be present
    output_datasets: string or Iterable
        These datasets will be generated
    datasource_name: string
        Name of a DataSource to use to generate the output
        Setting this option will create a source node in the dataset flow graph
        (or a sink node in the data dependency graph).
        Transformers of this type must specify at most one entry in `output_datasets`
    datasource_opts: dict
        Options to use when generating a Dataset from this DataSource
    suppress_output: boolean
        If True, the terminal dataset object is not written to disk.
        This is useful when one of the intervening tranformers handles the writing; e.g. train/test split.
    transformations: list of tuples
        Squence of transformer functions to apply. tuples consist of:
        (transformer_name, transformer_opts)
    dag_path: path. (default: paths['catalog_path'])
        Location of `dag_file`
    edge_file: string, default 'transformers.json'
        Name of json file that contains the transformer pipeline
    node_file: string, default 'datasets.json'
        Name of json file that contains the dataset metadata
    write_to_catalog: Boolean, Default True
        If False, don't actually write this entry to the catalog.
    Examples
    --------
    
    If you only have one input or output, it may be specified simply as a string;
    i.e. these are identical
    >>> add_transformer(input_datasets='other', output_datasets='p_other', write_to_catalog=False)
    {'_p_other': {'input_datasets': ['other'], 'output_datasets': ['p_other']}}
    >>> add_transformer(input_datasets=['other'], output_datasets='p_other', write_to_catalog=False)
    {'_p_other': {'input_datasets': ['other'], 'output_datasets': ['p_other']}}
    
    >>> add_transformer(input_datasets=['cc-by', 'cc-by-nc'], output_datasets='cc', write_to_catalog=False)
    {'_cc': {'input_datasets': ['cc-by', 'cc-by-nc'], 'output_datasets': ['cc']}}
    >>> add_transformer(input_datasets=['cc-by', 'cc-by-nc'], output_datasets='cc', write_to_catalog=False)
    {'_cc': {'input_datasets': ['cc-by', 'cc-by-nc'], 'output_datasets': ['cc']}}
    
    Names can be given explicitly:
    
    >>> add_transformer(input_datasets=['cc'], output_datasets=['cc_train','cc_test'], write_to_catalog=False)
    {'_cc_train_cc_test': {'input_datasets': ['cc'], 'output_datasets': ['cc_train', 'cc_test']}}
    >>> add_transformer(input_datasets=['cc'], output_datasets=['cc_train','cc_test'], name='tts', write_to_catalog=False)
    {'tts': {'input_datasets': ['cc'], 'output_datasets': ['cc_train', 'cc_test']}}
    
    
    Invalid use cases:
    
    >>> add_transformer(datasource_name="foo", output_datasets=['bar', 'baz'])
    Traceback (most recent call last):
    ...
    Exception: Edges from data sources must have only one output_dataset.

    >>> add_transformer(datasource_name="foo", input_datasets='bar')
    Traceback (most recent call last):
    ...
    Exception: Cannot set both `datasource_name` and `input_datasets`

    >>> add_transformer(datasource_opts={'foo':'bar'})
    Traceback (most recent call last):
    ...
    Exception: Must specify `datasource_name` when using `datasource_opts`
        
    >>> add_transformer(output_datasets="foo")
    Traceback (most recent call last):
    ...
    Exception: Must specify one of from `datasource_name` or `input_datasets`
    
    >>> add_transformer(input_datasets="foo")
    Traceback (most recent call last):
    ...
    Exception: Must specify `output_dataset` (or use `suppress_output`)
    """
    input_datasets = normalize_to_list(input_datasets)
    output_datasets = normalize_to_list(output_datasets)

    if datasource_name is not None:
        if input_datasets:
            raise Exception('Cannot set both `datasource_name` and `input_datasets`')
        if output_datasets is not None and len(output_datasets) > 1:
            raise Exception("Edges from data sources must have only one output_dataset.")
    if datasource_name is None and datasource_opts is not None:
        raise Exception('Must specify `datasource_name` when using `datasource_opts`')

    if write_to_catalog:
        ds_dag, ds_dag_fq = get_transformers(transformer_path=dag_path,
                                             transformer_file=edge_file,
                                             include_filename=True)
    transformer = {}
    if datasource_name:
        transformer['datasource_name'] = datasource_name
        if not output_datasets and not suppress_output:
            output_datasets = [datasource_name]
    elif input_datasets:
        transformer['input_datasets'] = input_datasets
    else:
        raise Exception("Must specify one of from `datasource_name` or `input_datasets`")

    if datasource_opts:
        transformer['datasource_opts'] = datasource_opts

    if transformations:
        transformer['transformations'] = transformations

    if not suppress_output:
        if not output_datasets:
            raise Exception("Must specify `output_dataset` (or use `suppress_output`)")
        else:
            transformer['output_datasets'] = output_datasets

    if name is None:
        name = f"_{'_'.join([ids for ids in output_datasets])}"
        
    if write_to_catalog:
        ds_dag[name] = transformer
        save_json(ds_dag_fq, ds_dag)
    return {name:transformer}


In [8]:
doctest.testmod()

TestResults(failed=0, attempted=11)

In [9]:
for source in ['cc-by', 'cc-by-nc', 'other']:
    add_transformer(datasource_name=source)

In [10]:
add_transformer(input_datasets=['cc-by', 'cc-by-nc'],
                output_datasets=['cc'],
                transformations=[('merge', {})])

{'_cc': {'input_datasets': ['cc-by', 'cc-by-nc'],
  'transformations': [('merge', {})],
  'output_datasets': ['cc']}}

In [11]:
add_transformer(input_datasets='cc',
                output_datasets='p_cc',
                transformations=[('process_1',{'opts_1':None}), ('process_2', {'opts_2':None})])

{'_p_cc': {'input_datasets': ['cc'],
  'transformations': [('process_1', {'opts_1': None}),
   ('process_2', {'opts_2': None})],
  'output_datasets': ['p_cc']}}

In [12]:
add_transformer(input_datasets='p_cc',
                output_datasets=['p_cc_train', 'p_cc_test'],
                transformations=[('ttsplit',{'test_percent':15})])

{'_p_cc_train_p_cc_test': {'input_datasets': ['p_cc'],
  'transformations': [('ttsplit', {'test_percent': 15})],
  'output_datasets': ['p_cc_train', 'p_cc_test']}}

In [13]:
add_transformer(input_datasets=['cc', 'other'],
                output_datasets="p_all",
                transformations=[('join', {'right':'cc', 'left':'other'})])

{'_p_all': {'input_datasets': ['cc', 'other'],
  'transformations': [('join', {'right': 'cc', 'left': 'other'})],
  'output_datasets': ['p_all']}}

In [14]:
add_transformer(input_datasets='p_all',
                output_datasets=['p_all_train', 'p_all_test'],
                transformations=[('ttsplit', {'test_percent':10})])

{'_p_all_train_p_all_test': {'input_datasets': ['p_all'],
  'transformations': [('ttsplit', {'test_percent': 10})],
  'output_datasets': ['p_all_train', 'p_all_test']}}

In [15]:
dag = get_transformers()

In [16]:
for he_name, he in dag.items():
    for out in he['output_datasets']:
        print(out)

cc
cc-by
cc-by-nc
other
p_all
p_all_train
p_all_test
p_cc
p_cc_train
p_cc_test


In [17]:
from collections import Counter

In [18]:
class TransformerGraph:
    """Transformer side of the bipartite Data Dependency Graph"""
    
    def __init__(self, **kwargs):
        """Initialization parmeters are the same as for `get_transformers`"""
        self._dag = get_transformers(**kwargs)
        self.edges_out = {}
        self.edges_in = {}
        for n in self.nodes:
            self.edges_in[n] = 0
            self.edges_out[n] = 0
        for he_name, he in self._dag.items():
            for node in he['output_datasets']:
                self.edges_in[node] += 1
            for node in he.get('input_datasets', []):
                self.edges_out[node] += 1
            else:
                if self.is_source(he_name):
                    self.edges_in[node] = 0               

    @property
    def nodes(self):
        ret = set()
        for he in self._dag.values():
            for node in he['output_datasets']:
                ret.add(node)
        return ret

    @property
    def edges(self):
        return self._dag
    
    @property
    def edge(self):
        return self._dag
    
    @property
    def sources(self):
        return [n for (n, count) in self.edges_in.items() if count < 1]
   
    @property
    def sinks(self):
        return [n for (n, count) in self.edges_out.items() if count < 1]
    
    def find_child(self, node):
        """Find its parents, siblings and the edge that produced a given child node.
        Parameters
        ----------
        node: String
            name of an output node
        
        Returns
        -------
        (parents, edge, siblings) where
        
        parents: Set(str)
            parents needed to generate this child node
        edge: str
            name of the edge that generated this node
        siblings: Set(str)
            set of all the output nodes generated by this edge
        
        """
        for hename, he in self._dag.items():
            if node in he['output_datasets']:
                return set(he.get('input_datasets', [])), hename, set(he['output_datasets'])
    
    def traverse(self, start, kind="breadth-first"):
        """Given a start node, trace the graph back to source nodes
        
        Parameters
        ----------
        start: string
            Name of start node. Dendencies will be traced form this node back to sources
            
        kind: {'depth-first', 'breadth-first'}. Default 'breadth-first'
        
        Returns
        -------
        (nodes, edges) where:
        nodes: List(str)
            list of node names traversed in the dependency graph
        edges: List(str)
            list of edge names traversed in the dependcy graph
        """
        if kind == 'breadth-first':
            pop_loc = 0
        elif kind == 'depth-first':
            pop_loc = -1
        else:
            raise Exception(f"Unknown kind: {kind}")
        visited = []
        edges = []
        queue = [start]
        while queue:
            vertex = queue.pop(pop_loc)
            if vertex not in visited:
                visited += [vertex]
                parents, edge, children = self.find_child(vertex)
                queue.extend(parents - set(visited))
                edges += [edge]
        return list(reversed(visited)), list(reversed(edges))
                
    def is_source(self, edge):
        """Is this a source?

        Source edges terminate at a DataSource, and are identified
        by the presence of a `datasource_name` field
        """
        he = self._dag[edge]
        if not he.get('datasource_name', False) and not he.get('input_datasets', False):
            raise Exception("Invalid Edge: missing both `datasource_name` and `input_datasets`")
        return he.get('datasource_name', False)

    def traverse2(self, node, kind="breadth-first", force=False):
        """Find the path needed to regenerate the given node
        
        Traverse the graph as far as necessary to regenerate `node`.
        
        
        This will stop at the first upstream node whose parents are fully satisfied,
        or all the way to source nodes, depending on the setting of `force`.
        
        Parameters
        ----------
        start: string
            Name of start node. Dendencies will be traced form this node back to sources
            
        kind: {'depth-first', 'breadth-first'}. Default 'breadth-first'
        force: Boolean
            if True, stop when all upstream dependencies are satisfied
            if False, always traverse all the way to source nodes.
    
        Returns
        -------
        (nodes, edges)
        where:
            nodes: List(str)
                list of node names traversed in the dependency graph
            edges: List(str)
                list of edge names traversed in the dependcy graph
        """
        if kind == 'breadth-first':
            pop_loc = 0
        elif kind == 'depth-first':
            pop_loc = -1
        else:
            raise Exception(f"Unknown kind: {kind}")
        visited = []
        edges = []
        queue = [node]
        while queue:
            vertex = queue.pop(pop_loc)
            if vertex not in visited:
                visited += [vertex]
                parents, edge, children = self.find_child(vertex)
                if not self.fully_satisfied(edge) or force:
                    logger.debug(f"Parent dependencies {parents} not satisfied for edge={edge}.")
                    queue.extend(parents - set(visited))
                edges += [edge]
        return list(reversed(visited)), list(reversed(edges))

    def fully_satisfied(self, edge, update_meta=False):
        """Determine whether all dependencies of the given edge (transformer) are satisfied

        Satisfied here means all input datasets are present on disk with valid hashes.
        Sources are always considered satisfied
        
        update_meta: Boolean
            if True, cached dataset metadata (including hashes) will be overwritten with whatever
            is currently on disk
            if False
        """
        if self.is_source(edge):
            return True
        
        input_datasets = self._dag[edge].get('input_datasets', [])
        for ds_name in input_datasets:
            ds_meta = Dataset.load(ds_name, metadata_only=True, errors=False)
            if not ds_meta:  # does not exist
                logger.debug(f"No dataset on-disk for dataset={ds_name}")
                break
            if self._nodes[ds_name]['hashes'] != ds_meta['hashes']:
                if update_hashes:
                    logger.debug(f"Updating hashes for dataset={ds_name}: {ds_meta['hashes']}")
                    self._nodes[ds_name] = ds_meta
                else: # invalid hashes
                    logger.warning(f"Hashes invalid for on-disk dataset={ds_name}: "
                                   f"{self._nodes[ds_name]['hashes']} != {ds_meta['hashes']}")
                    break
        else:  # (no break) all requirements satisfied
            return True
        
        return False
            
        
        

In [19]:
from src.workflow import available_datasets
from src.data.datasets import cached_datasets
from src.data.datasets import Dataset

In [20]:
!rm ../catalog/datasets.json

In [21]:
available_datasets()



set()

In [22]:
cached_datasets()

{'beer_review_all', 'foo'}

In [23]:
ds = Dataset('foo')
ds.update_catalog()

2020-04-07 15:32:17,285 - datasets - DEBUG - Updating dataset catalog with 'foo' metadata


In [24]:
cached_datasets()

{'beer_review_all', 'foo'}

In [25]:
available_datasets()

{'foo'}

In [26]:
ds.dump(force=True)

2020-04-07 15:32:17,386 - datasets - DEBUG - Wrote Dataset Metadata: foo.metadata
2020-04-07 15:32:17,388 - datasets - DEBUG - Updating dataset catalog with 'foo' metadata
2020-04-07 15:32:17,389 - datasets - DEBUG - Wrote Dataset: foo.dataset


In [27]:
!cat ../catalog/datasets.json

{
  "foo": {
    "dataset_name": "foo",
    "hashes": {
      "data": [
        "sha1",
        "38f65f3b11da4851aaaccc19b1f0cf4d3806f83b"
      ],
      "target": [
        "sha1",
        "38f65f3b11da4851aaaccc19b1f0cf4d3806f83b"
      ]
    }
  }
}

In [28]:
Dataset.load('foo',metadata_only=True)

KeyError: 'hashes'

In [None]:
%debug

> [0;32m/Users/kjell/Documents/devel/git/timc/reproallthethings/src/data/datasets.py[0m(329)[0;36mload[0;34m()[0m
[0;32m    327 [0;31m        [0;32mif[0m [0mcheck_hashes[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    328 [0;31m            [0;32mif[0m [0mdataset_cache[0m[0;34m[[0m[0mdataset_name[0m[0;34m][0m[0;34m[[0m[0;34m'hashes'[0m[0;34m][0m [0;34m!=[0m [0mmeta[0m[0;34m[[0m[0;34m'hashes'[0m[0;34m][0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m--> 329 [0;31m                [0;32mraise[0m [0mException[0m[0;34m([0m[0;34mf"Hash mismatch for {dataset_name}: {meta['hashes']} != cache: {dataset_cache['hashes']}."[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    330 [0;31m        [0;32mif[0m [0mmetadata_only[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    331 [0;31m            [0;32mreturn[0m [0mmeta[0m[0;34m[0m[0;34m[0m[0m
[0m
ipdb> meta
{'dataset_name': 'foo', 'hashes': {'data': ('sha1', '38f65f3b1

In [None]:
xxx

In [None]:
cached_datasets()

In [None]:
dd = TransformerGraph()

In [None]:
dd.nodes

In [None]:
dd.edges

In [None]:
dd.edges_in

In [None]:
dd.edges_out

In [None]:
dd.sinks

In [None]:
dd.sources

In [None]:
dd.traverse('p_all_train', kind="depth-first")

In [None]:
dd.traverse('p_all_train', kind="breadth-first")

In [None]:
dd.traverse2('p_all_train', kind="breadth-first")

In [None]:
def get_node_metadata(node):
    return None

In [None]:
def kwdict_to_list(kw):
    return [f"{k}={v}" for k,v in kw.items()]

In [None]:
from src.data import Dataset

In [None]:
def datasets_on_disk(dataset_path=None, keys_only=True):
    """Get a list of available datasets.

    Parameters
    ----------
    dataset_path: path
        location of saved dataset files
    """
    if dataset_path is None:
        dataset_path = paths['processed_data_path']
    else:
        dataset_path = pathlib.Path(dataset_path)

    ds_dict = {}
    for dsfile in dataset_path.glob("*.metadata"):
        ds_stem = str(dsfile.stem)
        ds_meta = Dataset.load(ds_stem, data_path=dataset_path, metadata_only=True)
        ds_dict[ds_stem] = ds_meta

    if keys_only:
        return list(ds_dict.keys())
    return ds_dict
dod = datasets_on_disk(); dod

In [None]:
def apply_transformers(self, ds_name, force=False, update_hashes=False):
    """Apply all transformers neede to generate `ds_name`

    This walks the dataset dependency graph backwards, applying transformers
    until it is able to successfully generate `ds_name`.

    XXX: WIP. This currently does all the work, all the time. Need an example to work with to try this out.
    
    ds_name: str
        Name of dataset to be generated
    force: Boolean
        if True, always apply transformer, even if cached copy exists.
        This will always walk back to sources.
        If False, the process will stop as soon as the dataset can be generated
    update_hashes: Boolean
        If True, the dataset catalog will be updated with the new hash
        if False, process will stop with an error if the hash changes on any step.
    """
    _, edge_list = dd.traverse(ds_name, kind="breadth-first")
    
    cached_nodes = cached_datasets(keys_only=False)
    for e in edge_list:
        logger.debug(f"Checking Transformer: '{e}'")
        he = self.edge[e]
        if he.get('datasource_name', False): # Source node?
            assert len(he['output_datasets']) == 1, "Invalid Source Node: {e}. Too many output datasets"
            output_node = he['output_datasets'][0]
            logger.debug(f"Source Node: '{output_node}'")
            if output_node in self.cached_nodes:
                logger.error(f"TODO: Check hashes on {output_node}")
                continue
            dsdict = {output_node:get_node_metadata(output_node)}
            kwargs = [f"'{he['datasource_name']}'"] + kwdict_to_list(he.get('datasource_opts', {}))
            kwarg_str = ", ".join(kwargs)
            logger.debug(f"Generating Dataset: {dsdict} <= process_dataset({kwarg_str})")
        else: # intermediate or sink node
            dsdict = {k:get_node_metadata(k) for k in he['input_datasets']}
            for t, topts in he.get('transformations', ()):
                output_dsdict = {k:get_node_metadata(k) for k in he['output_datasets']}
                kwarg_str = ", ".join([str(dsdict)] + kwdict_to_list(topts))
                logger.debug(f"{output_dsdict} <= {t}({kwarg_str})")
                dsdict = output_dsdict
            # check if generated hashes match the expected hashes
            logger.debug(f"Writing {output_dsdict} to disk.")
            if update_hashes is True:
                logger.debug(f"Updating hashes in dataset dictionary")


In [None]:
apply_transformers(dd, 'p_cc_test')

In [None]:
dd.cached_nodes