In [None]:
#default_exp data.pipeline

In [None]:
#export
from local.imports import *
from local.test import *
from local.core import *
from local.notebook.showdoc import show_doc

# Transforms and Pipeline

> Low-level transform pipelines

## Convenience functions

In [None]:
# export core
def opt_call(f, fname='__call__', *args, **kwargs):
    "Call `f.{fname}(*args, **kwargs)`, or `noop` if not defined"
    return getattr(f,fname,noop)(*args, **kwargs)

In [None]:
test_eq(opt_call(operator.neg, '__call__', 2), -2)
test_eq(opt_call(list, 'foobar', [2]), [2])

a=[2,1]
opt_call(list, 'sort', a)
test_eq(a, [1,2])

## Transform -

In [None]:
# export
@docs
class Transform():
    "A function that `encodes` if `filt` matches, and optionally `decodes`, with an optional `setup`"
    order,filt = 0,None

    def __init__(self, encodes=None, **kwargs):
        if encodes is not None: self.encodes=encodes
        for k,v in kwargs.items(): setattr(self, k, v)

    @classmethod
    def create(cls, f, filt=None):
        "classmethod: Turn `f` into a `Transform` unless it already is one"
        return f if hasattr(f,'decode') or isinstance(f,Transform) else cls(f)
    
    def _filt_match(self, filt): return self.filt is None or self.filt==filt
    def __call__(self, o, filt=None, **kwargs): return self.encodes(o, **kwargs) if self._filt_match(filt) else o
    def __getitem__(self, x): return self(x)
    def decode  (self, o, filt=None, **kwargs): return self.decodes(o, **kwargs) if self._filt_match(filt) else o
    def decodes(self, o, *args, **kwargs): return o
    def __repr__(self): return str(self.encodes) if self.__class__==Transform else str(self.__class__)
    def show(self, o, filt=None, **kwargs): return self.shows(self.decode(o, filt=filt), **kwargs)
    
    _docs=dict(__call__="Call `self.encodes` unless `filt` is passed and it doesn't match `self.filt`",
              decode="Call `self.decodes` unless `filt` is passed and it doesn't match `self.filt`",
              decodes="Override to implement custom decoding",
              show="Call `shows` with decoded `o`")

In a transformation pipeline some steps need to be reversible - for instance, if you turn a string (such as *dog*) into an int (such as *1*) for modeling, then for display purposes you'll want to turn it back to a string again (e.g. when you have a prediction). In addition, you may wish to only run the transformation for a particular data subset, such as the training set.

`Transform` provides all this functionality. `filt` is some dataset index (e.g. provided by `DataSource`), and you provide `encodes` and optional `decodes` functions for your code. You can pass `encodes` and `decodes` functions directly to the constructor for quickly creating simple transforms.

In [None]:
tneg = Transform(operator.neg,decodes=operator.neg)
tfloat = Transform(float,decodes=int,shows=print)

start = 4
t = tneg(start)
test_eq(t, -4)
test_eq(t, tneg[start])
test_eq(tneg.decode(t), start)

More commonly, you'll subclass `Transform` and define `encodes` and `decodes`.

In [None]:
class _AddTfm(Transform):
    def encodes(self, x, a=1): return x+a
    def decodes(self, x, a=1): return x-a
    
addt  = _AddTfm()
start = 4
t = addt(start)
test_eq(t, 5)
test_eq(addt.decode(5), start)

### Methods

In [None]:
show_doc(Transform.__call__)

<h4 id="Transform.__call__" class="doc_header"><code>__call__</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Transform--" class="source_link" style="float:right">[source]</a></h4>

> <code>__call__</code>(**`o`**, **`filt`**=*`None`*, **\*\*`kwargs`**)

Call `self.encodes` unless `filt` is passed and it doesn't match `self.filt`

In [None]:
show_doc(Transform.decode)

<h4 id="Transform.decode" class="doc_header"><code>decode</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Transform--" class="source_link" style="float:right">[source]</a></h4>

> <code>decode</code>(**`o`**, **`filt`**=*`None`*, **\*\*`kwargs`**)

Call `self.decodes` unless `filt` is passed and it doesn't match `self.filt`

In [None]:
show_doc(Transform.create)

<h4 id="Transform.create" class="doc_header"><code>create</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Transform--" class="source_link" style="float:right">[source]</a></h4>

> <code>create</code>(**`f`**, **`filt`**=*`None`*)

classmethod: Turn `f` into a [`Transform`](/data.pipeline.html#Transform) unless it already is one

## Pipeline -

In [None]:
#export
@docs
class Pipeline():
    "A pipeline of composed (for encode/decode) transforms, setup one at a time"
    def __init__(self, tfms):
        self.tfms,self._tfms = [],[Transform.create(t) for t in L(tfms)]

    def setup(self, items=None):
        "Transform setup"
        self.add(self._tfms, items)
        self._tfms = None

    def add(self, tfms, items=None):
        "Call `setup` on all `tfms` and append them to this pipeline"
        for t in sorted(L(tfms), key=lambda o: getattr(o, 'order', 0)):
            self.tfms.append(t)
            if hasattr(t, 'setup'): t.setup(items)
    
    def composed(self, x, rev=False, fname='__call__', **kwargs):
        "Compose `{fname}` of all `self.tfms` (reversed if `rev`) on `x`"
        tfms = reversed(self.tfms) if rev else self.tfms
        for f in tfms: x = opt_call(f, fname, x, **kwargs)
        return x

    def __call__(self, x, **kwargs): return self.composed(x, **kwargs)
    def __getitem__(self, x): return self(x)
    def decode(self, x, **kwargs): return self.composed(x, rev=True, fname='decode', **kwargs)
    def decode_at(self, idx): return self.decode(self[idx])
    def show_at(self, idx): return self.show(self[idx])
    def __repr__(self): return str(self.tfms)
    def delete(self, idx): del(self.tfms[idx])
    def remove(self, tfm): self.tfms.remove(tfm)
        
    def show(self, o, *args, **kwargs):
        "Find last transform that supports `shows` and call it"
        for t in reversed(self.tfms):
            if hasattr(t, 'shows'): return t.show(o, *args, **kwargs)
            o = getattr(t, 'decode', noop)(o)
            
    _docs = dict(__call__="Compose `__call__` of all `tfms` on `x`",
                decode="Compose `decode` of all `tfms` on `x`",
                decode_at="Decoded item at `idx`",
                show_at="Show item at `idx`",
                delete="Delete transform `idx` from pipeline",
                remove="Remove `tfm` from pipeline")

A list of transforms are often applied in a particular order, and decoded by applying in the reverse order. `Pipeline` provides this functionality, and also ensures that any `setup` methods are called, without including later transforms in those calls. NB: `setup` must be run before encoding/decoding.

Here's some simple examples:

In [None]:
tfms = [tneg,tfloat]
pipe = Pipeline(tfms)
pipe.setup()

start = 2
t = pipe(2)
test_eq(t, -2.0)
test_eq(type(t), float)
test_eq(t, pipe[2])
test_eq(pipe.decode(t), start)
# `show` is on `tfloat` so `show_at` decodes that tfm only
test_stdout(lambda:pipe.show_at(1), '-1')

### Methods

In [None]:
show_doc(Pipeline.__call__)

<h4 id="Pipeline.__call__" class="doc_header"><code>__call__</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>__call__</code>(**`x`**, **\*\*`kwargs`**)

Compose `__call__` of all `tfms` on `x`

In [None]:
show_doc(Pipeline.decode)

<h4 id="Pipeline.decode" class="doc_header"><code>decode</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>decode</code>(**`x`**, **\*\*`kwargs`**)

Compose `decode` of all `tfms` on `x`

In [None]:
show_doc(Pipeline.delete)

<h4 id="Pipeline.delete" class="doc_header"><code>delete</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>delete</code>(**`idx`**)

Delete transform `idx` from pipeline

In [None]:
show_doc(Pipeline.remove)

<h4 id="Pipeline.remove" class="doc_header"><code>remove</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>remove</code>(**`tfm`**)

Remove `tfm` from pipeline

In [None]:
show_doc(Pipeline.add)

<h4 id="Pipeline.add" class="doc_header"><code>add</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>add</code>(**`tfms`**, **`items`**=*`None`*)

Call `setup` on all `tfms` and append them to this pipeline

In [None]:
show_doc(Pipeline.show_at)

<h4 id="Pipeline.show_at" class="doc_header"><code>show_at</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>show_at</code>(**`idx`**)

Show item at `idx`

In [None]:
show_doc(Pipeline.decode_at)

<h4 id="Pipeline.decode_at" class="doc_header"><code>decode_at</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#Pipeline--" class="source_link" style="float:right">[source]</a></h4>

> <code>decode_at</code>(**`idx`**)

Decoded item at `idx`

## PipedList -

In [None]:
#export
@docs
class PipedList(GetAttr):
    "A `Pipeline` of transforms applied to a collection of `items`"
    _xtra = 'decode __call__ show'.split()
    
    def __init__(self, items, tfms):
        self.items = L(items)
        self.default = self.tfm = Pipeline(tfms)
        self.tfm.setup(self)

    def __getitem__(self, i):
        "Transformed item(s) at `i`"
        its = self.items[i]
        return its.mapped(self.tfm) if is_iter(i) else self.tfm(its)

    def decode_batch(self, b, **kwargs):
        "Decode `b`, a list of lists of pipeline outputs (i.e. output of a `DataLoader`)"
        transp = L(zip(*L(b)))
        return transp.mapped(partial(self.decode, **kwargs)).zipped()

    def decode_at(self, idx): return self.decode(self[idx])
    def show_at(self, idx): return self.show(self[idx])
    def __eq__(self, b): return all_equal(self, b)
    def __len__(self): return len(self.items)
    def __iter__(self): return (self[i] for i in range_of(self))
    def __repr__(self): return f"{self.__class__.__name__}: {self.items}\ntfms - {self.tfm}"
    
    _docs = dict(decode_at="Decoded item at `idx`",
                 show_at  ="Show item at `idx`")

In [None]:
pipe = PipedList([1,2,3], tfms)
t = pipe[1]
test_eq(t, -2.0)
test_eq(type(t), float)
test_eq(pipe.decode_at(1), 2)
test_eq(pipe.decode(t), 2)
test_stdout(lambda: pipe.show_at(2), '-3')
pipe

PipedList: (#3) [1,2,3]
tfms - [<built-in function neg>, <class 'float'>]

Here's how we can use `PipedList.setup` to implement a simple category list, getting labels from a mock file list:

In [None]:
class _Cat(Transform):
    order=1
    def encodes(self, o): return self.o2i[o] if hasattr(self,'o2i') else o
    def decodes(self, o): return self.vocab[o]
    def setup(self, items): self.vocab,self.o2i = uniqueify(items, sort=True, bidir=True)
    def shows(self, o): print(f"I'm a {o}")

def _lbl(o): return o.split('_')[0]

test_fns = ['dog_0.jpg','cat_0.jpg','cat_2.jpg','cat_1.jpg','dog_1.jpg']
tcat = _Cat()
pipe = PipedList(test_fns, [tcat,_lbl])

test_eq(tcat.vocab, ['cat','dog'])
test_eq([1,0,0,0,1], pipe)
test_eq(1, pipe[-1])
test_eq([1,0], pipe[0,1])
t = list(pipe)
test_eq([1,0,0,0,1], t)
test_eq(['dog','cat','cat','cat','dog'], map(pipe.decode,t))
test_stdout(lambda:pipe.show_at(0), "I'm a dog")
pipe

PipedList: (#5) [dog_0.jpg,cat_0.jpg,cat_2.jpg,cat_1.jpg,dog_1.jpg]
tfms - [<function _lbl at 0x7f7156e259d8>, <class '__main__._Cat'>]

### Methods

In [None]:
show_doc(PipedList.__getitem__)

<h4 id="PipedList.__getitem__" class="doc_header"><code>__getitem__</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#PipedList--" class="source_link" style="float:right">[source]</a></h4>

> <code>__getitem__</code>(**`i`**)

Transformed item(s) at `i`

In [None]:
pipe.decode(pipe[1])

'cat'

In [None]:
test_eq(pipe.decode_at(1),'cat')

In [None]:
show_doc(PipedList.show_at)

<h4 id="PipedList.show_at" class="doc_header"><code>show_at</code><a href="https://nbviewer.jupyter.org/github/fastai/fastai_docs/blob/master/dev/02_data_pipeline.ipynb#PipedList--" class="source_link" style="float:right">[source]</a></h4>

> <code>show_at</code>(**`idx`**)

Show item at `idx`

In [None]:
pipe.show_at(1)

I'm a cat


## Pipelines -

In [None]:
#export
class Pipelines(Transform):
    "Create a `Pipeline` for each tfm in `tfms`. Generally used inside a `PipedList`"
    def __init__(self, tfms): self.activ,self.tfms = None,[Pipeline(t) for t in L(tfms)]
    def __repr__(self): return f'Pipelines({self.tfms})'

    def encodes(self, o, *args, **kwargs):
        "List of output of each of `tfms` on `o`"
        if self.activ is not None: return self.activ(o, *args, **kwargs)
        return [t(o, *args, **kwargs) for t in self.tfms]
    
    def decodes(self, o, **kwargs):
        return [t.decode(p, **kwargs) for p,t in zip(o,self.tfms)]
    
    def show(self, o, ctx=None, **kwargs):
        "Show result of `show` from each of `tfms`"
        for p,t in zip(o,self.tfms): ctx = t.show(p, ctx=ctx, **kwargs)
    def shows(self): pass # needed for `Pipeline` method search for `show`

    def setup(self, o):
        "Setup each of `tfms` independently"
        for tfm in self.tfms:
            self.activ = tfm
            tfm.setup(o)
        self.activ=None
    
    @classmethod
    def create(cls, items, tfms, xtra=None):
        "PipedList over `items` with `tfms` `Pipelines` as first tfm optionally followed by any `xtra` tfms"
        return PipedList(items, cls(tfms)+L(xtra))

    xt,yt = add_props(lambda i,x:x.tfms[i])

In [None]:
class _TNorm(Transform):
    def __init__(self): self.m,self.s = 0,1
    def encodes(self, o): return (o-self.m)/self.s
    def decodes(self, o): return (o*self.s)+self.m
    def shows(self, o, **kwargs): print(o)
    def setup(self, items):
        its = tensor(items)
        self.m,self.s = its.mean(),its.std()

tnrm = _TNorm()
items = [1,2,3,4]
pl = Pipelines.create(items, [tneg, [tneg,tnrm]])
x,y = zip(*pl)
test_close(tensor(y).mean(), 0)
test_close(tensor(y).std(), 1)
test_eq(x, [-1,-2,-3,-4])
test_stdout(lambda:pl.show_at(1), 'tensor(-2.)')

In [None]:
# Create a "batch"
b = list(zip(*pl)); b

[(-1, -2, -3, -4),
 (tensor(1.1619), tensor(0.3873), tensor(-0.3873), tensor(-1.1619))]

In [None]:
bd = pl.decode_batch(b)

test_eq(len(bd),2)
test_eq(bd[0],items)
test_eq(bd[1],items)
test_eq(type(bd[1][0]),Tensor)

In [None]:
def show_batch(b, tfm, **kwargs):
    "Show `b`, a list of lists of pipeline outputs (i.e. output of a `DataLoader`)"
    for o in zip(*L(b)): tfm.show(o)

## Export -

In [None]:
#hide
from local.notebook.export import notebook2script
notebook2script(all_fs=True)

Converted 00_test.ipynb.
Converted 01_core.ipynb.
Converted 02_data_pipeline.ipynb.
Converted 03_data_external.ipynb.
Converted 04_data_core.ipynb.
Converted 05_data_source.ipynb.
Converted 06_vision_core.ipynb.
Converted 07_pets_tutorial.ipynb.
Converted 90_notebook_core.ipynb.
Converted 91_notebook_export.ipynb.
Converted 92_notebook_showdoc.ipynb.
Converted 93_notebook_export2html.ipynb.
Converted 94_index.ipynb.
