### Pipeline outside of ML

This notebook shows some trial and error to create a `Pipeline` that can be used with `xarray_filters`.  

This is a continuation of goals in [Elm issue #149](https://github.com/ContinuumIO/elm/issues/149) to separate ML from GIS utils.

The goal is to be able to run something like this:
```
from xarray_filters.pipeline import Pipeline
from xarary_filters.steps import Generic, Serialize
def step_1(dset, **kw):
    return kw['a'] * dset.mean(dim=('x', 'y')) ** kw['b']

def step_2(dset, **kw):
    return kw['a'] + dset * kw['b']
    
steps = (('s1', Generic(step_1)),
         ('s2', Generic(step_2)),
         ('s3', Serialize('two_step_pipeline_out.nc')))
pipe = Pipeline(steps=steps)
pipe.set_params(s1__a=2,
                s1__b=3,
                s2__a=0,
                s2__b=0,
                s3__fname='file_with_zeros.nc')
pipe.fit_transform(X)
```
 * The example above uses scikit-learn `set_params` style of setting parameters where:
   * Steps in the `Pipeline` are named, `s1`, `s2`, and `s3` in this case
   * Double underscore notation is used to pass parameters to the `set_params` method of a given step.  Here:
     * `a` and `b` are parameters accepted by `step_1` and `step_2`
     * `fname` is accepted by `Serialize`
   * The `Dataset` or `MLDataset` `X` is run through the 3 steps
   * Note the import statements with `xarray_filters` at top of snippet is what we need to do based on this notebook
* Classes formerly part of `elm.pipeline.steps` will now inherit from `sklearn.base.BaseEstimator`


In [1]:
from xarray_filters import MLDataset
from xarray_filters.tests.test_data import new_test_dataset

from __future__ import absolute_import, division, print_function, unicode_literals

import sklearn
from sklearn.pipeline import Pipeline as _Pipeline

from abc import ABCMeta, abstractmethod
import six

class Step(six.with_metaclass(ABCMeta,
                              sklearn.base.BaseEstimator,
                              sklearn.base.TransformerMixin)):
    """Base class representing a schedulable / runnable unit of a Pipeline.
    """
    def __init__(self, *args, **kwargs):
        args_copy = list(args)
        self.func = args_copy.pop()
        self.fit_transform = self.transform
        super(Step, self).__init__(*args_copy, **kwargs)

    @abstractmethod
    def transform(self, X, y=None, **fit_params):
        """Overridden by subclasses."""
        return self.func(X, y, **fit_params)

    
class Pipeline(_Pipeline):
    """An abstraction for scheduling and running Step objects.

    Reuses much of sklearn.base.Pipeline, but supports parallelizing Estimator
    objects with Dask.
    """
    def __init__(self, steps, memory=None):
        """Convert xarray_filters steps into sklearn steps, so we can reuse its Pipeline functionality."""
        steps_copy = list(steps)
        if steps_copy[-1][1] is not None:
            steps_copy.append(('iden', None))
        super(Pipeline, self).__init__(steps_copy, memory)

    def _transform(self, X):
        """Here we can introduce dask-style computation / scheduling."""
        Xt = X
        for name, step in self.steps:
            if step is not None:
                Xt = step.transform(Xt)
        return Xt

    def _inverse_transform(self, X):
        """Here we can introduce dask-style computation / scheduling."""
        Xt = X
        for name, step in self.steps[::-1]:
            if step is not None:
                Xt = step.inverse_transform(Xt)
        return Xt

In [2]:
class Generic(Step):
    def __init__(self, func=None, a=None, b=None):
        self.a = a
        self.b = b
        super(Generic, self).__init__(func)
            
    def transform(self, dset, **kw):
        params = self.get_params()
        return self.func(dset=dset, **params)


class Serialize(Step):
    def __init__(self, fname, as_netcdf=True):
        self.fname = fname
        self.as_netcdf = as_netcdf
    
    def transform(self, dset, y=None):
        if self.as_netcdf:
            fname = self.get_params()['fname']
            dset.to_netcdf(fname)
            return dset
        else:
            pass # TODO other serializers?

In [3]:
X = new_test_dataset(('wind', 'pressure', 'temperature',))
X

<xarray.MLDataset>
Dimensions:      (t: 48, x: 20, y: 15, z: 8)
Coordinates:
  * x            (x) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
  * y            (y) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
  * z            (z) int64 0 1 2 3 4 5 6 7
  * t            (t) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
Data variables:
    wind         (x, y, z, t) float64 0.4445 0.7416 0.6008 0.2171 0.7075 ...
    pressure     (x, y, z, t) float64 0.489 0.9108 0.7866 0.653 0.8514 ...
    temperature  (x, y, z, t) float64 0.1573 0.833 0.07375 0.8194 0.7863 ...

In [4]:
def step_1(dset, **kw):
    return kw['a'] * dset.mean(dim=('x', 'y')) ** kw['b']

def step_2(dset, **kw):
    return kw['a'] + dset * kw['b']

steps = (('s1', Generic(step_1)),
         ('s2', Generic(step_2)),
         ('s3', Serialize('two_step_pipeline_out.nc')))

In [5]:
(_, s1), _, _ = steps

In [6]:
s1.set_params(a=0, b=0)
ones = s1.transform(X)
s1.set_params(a=2, b=2)
other = s1.transform(X)
other.temperature - ones.temperature

<xarray.DataArray 'temperature' (z: 8, t: 48)>
array([[ 0.423261,  0.413031,  0.528824, ...,  0.485638,  0.502162,  0.53497 ],
       [ 0.441905,  0.527673,  0.496324, ...,  0.547213,  0.517117,  0.529949],
       [ 0.529223,  0.537221,  0.493496, ...,  0.518275,  0.509642,  0.488531],
       ..., 
       [ 0.495862,  0.525155,  0.475573, ...,  0.495561,  0.567319,  0.481611],
       [ 0.506853,  0.522342,  0.500386, ...,  0.491232,  0.515668,  0.498686],
       [ 0.495552,  0.568766,  0.487428, ...,  0.562324,  0.462024,  0.577248]])
Coordinates:
  * t        (t) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ...
  * z        (z) int64 0 1 2 3 4 5 6 7

In [7]:
pipe = Pipeline(steps=steps)

In [8]:
pipe

Pipeline(memory=None,
     steps=[('s1', Generic(a=2, b=2, func=<function step_1 at 0x115d36ae8>)), ('s2', Generic(a=None, b=None, func=<function step_2 at 0x115d362f0>)), ('s3', Serialize(as_netcdf=True, fname='two_step_pipeline_out.nc')), ('iden', None)])

In [9]:
pipe.set_params(s1__a=2, s1__b=3, s2__a=0, s2__b=0, s3__fname='file_with_zeros.nc')
pipe.transform(X)

<xarray.MLDataset>
Dimensions:      (t: 48, z: 8)
Coordinates:
  * t            (t) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * z            (z) int64 0 1 2 3 4 5 6 7
Data variables:
    wind         (z, t) float64 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    pressure     (z, t) float64 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    temperature  (z, t) float64 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...

In [10]:
pipe.set_params(s1__a=2, s1__b=3, s2__a=1, s2__b=1, s3__fname='file_nonzero.nc')
pipe.transform(X)

<xarray.MLDataset>
Dimensions:      (t: 48, z: 8)
Coordinates:
  * t            (t) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...
  * z            (z) int64 0 1 2 3 4 5 6 7
Data variables:
    wind         (z, t) float64 1.194 1.258 1.239 1.272 1.272 1.253 1.227 ...
    pressure     (z, t) float64 1.263 1.24 1.255 1.239 1.28 1.248 1.261 ...
    temperature  (z, t) float64 1.195 1.188 1.272 1.269 1.247 1.297 1.282 ...

In [11]:
! ls -l *.nc

-rw-r--r--  1 gbrener  staff  18977 Sep 11 10:39 file_nonzero.nc
-rw-r--r--  1 gbrener  staff  18977 Sep 11 10:39 file_with_zeros.nc


In [12]:
pipe

Pipeline(memory=None,
     steps=[('s1', Generic(a=2, b=3, func=<function step_1 at 0x115d36ae8>)), ('s2', Generic(a=1, b=1, func=<function step_2 at 0x115d362f0>)), ('s3', Serialize(as_netcdf=True, fname='file_nonzero.nc')), ('iden', None)])