# Define and run pipelines

In Notebook 2 we learn pipelines. Pipeline is a sequence of actions we want to apply to batch. At first we describe what we want to do with batch. Then at some point in the code we pass dataset to pipeline and the caclucations actually run. Such "lazy run" makes code compact and clear.

## Define pipeline

The following example shows how to define a pipeline. It simply lists actions:

In [1]:
import sys
import os
sys.path.append(os.path.join("..", "..", ".."))

import ecg.dataset as ds
from ecg.batch import EcgBatch

preprocess_pipeline = (ds.Pipeline()
                       .load(fmt="wfdb", components=["signal", "meta"])
                       .random_resample_signals("normal", loc=300, scale=10)
                       .drop_short_signals(4000)
                       .segment_signals(3000, 3000)
                       .run(batch_size=300, shuffle=False, drop_last=False, n_epochs=1, lazy=True))

Using TensorFlow backend.


The only thing to be clarified is the last action ```run```. We set parameter ```lazy=True``` since we want to run pipeline somewhere later. When we actually run this pipeline it will load batches of size ```batch_size``` and apply actions of pipeline. Iteration stops when no ecgs are left. 

## Run pipeline

Let's create an ecg dataset that we will pass to the pipeline (see [Notebook 1](https://github.com/analysiscenter/ecg/blob/unify_models/doc/ecg_tutorial_part_1.ipynb) for details): 

In [2]:
index = ds.FilesIndex(path="/notebooks/data/ECG/training2017/*.hea", no_ext=True, sort=True)
eds = ds.Dataset(index, batch_class=EcgBatch)

To start caclulation we pass ecg dataset into the pipeline and call an action ```run```:

In [3]:
processed = (eds >> preprocess_pipeline).run()

Note that transformed ecgs are NOT assigned to ```processed```, which is a pipeline again. To save results you should add ```dump``` action to pipeline or save result into pipeline variable. How to work with pipeline variables will be explained in the [Notebook 3](https://github.com/analysiscenter/ecg/blob/unify_models/doc/ecg_tutorial_part_3.ipynb) of tutorial. More features of pipelines are [here](https://github.com/analysiscenter/dataset/blob/master/doc/pipeline.md)

## Add custom action

Suppose you want to include in pipeline a new action called ```add_value``` that adds given value to each signal.
New action requires two decorators:
* ```@action``` enables using the action in pipeline
* ```@inbatch_parallel``` splits batch into individual ecgs and process each ecg in a separate thread. 

Decorated function thus obtains index of ecg and every parameter passed from pipeline. From index we obtain position of corresponding signal in batch and add value to the signal. Combining it together be obtain the desired action:

```python
@ds.action
@ds.inbatch_parallel(init="indices", target="threads")
def add_value(self, index, value):
    i = self.get_pos(None, "signal", index)
    self.signal[i] += value
```
Action ```add_value``` now can be included in pipeline:
```python
ppl = (eds.pipeline()
       .do_some_actions_before()
       .add_value(0.01)
       .do_some_actions_after())
```

In the next [Notebook 3](https://github.com/analysiscenter/ecg/blob/unify_models/doc/ecg_tutorial_part_3.ipynb) we will work with models.