<h1>Advanced Tutorial 2: Pipeline</h1>

<h2>Overview</h2>

In the beginner's tutorial of `Pipeline`, we learned how to build an asynchronous, optimized data pipeline that handles data loading and preprocessing tasks efficiently. Now that you have understood some basic operations in the `Pipeline`, we will demonstrate some advanced concepts and how to leverage them to create efficient `Pipeline` in this tutorial.

In this tutorial we will discuss following topics,

* How to iterate through the pipeline data
* Handling batch padding in Pipeline
    * Dropping the last batch
    * Padding the batch
* How to benchmark Pipeline performance

<h2>How to iterate through the pipeline data</h2>

We will first see how to iterate through the pipeline batch data.

First we will create sample NumpyDataset from the data dictionary and load it into `Pipeline`.

In [1]:
import numpy as np

# sample numpy array to later create datasets from them
x_train, y_train = (np.random.sample((10, 2)), np.random.sample((10, 1)))
train_data = {"x": x_train, "y": y_train}

In [2]:
import fastestimator as fe
from fastestimator.dataset.numpy_dataset import NumpyDataset

# create NumpyDataset from the sample data
dataset_fe = NumpyDataset(train_data)

pipeline_fe = fe.Pipeline(train_data=dataset_fe, batch_size=3)

Let's get the loader object for the `Pipeline` we defined and iterate over the dataset that was loaded into the dataloader.

In [3]:
loader_fe = pipeline_fe.get_loader(mode="train")

for batch in loader_fe:
    print(batch)

{'x': tensor([[0.8101, 0.5206],
        [0.7858, 0.4574],
        [0.9156, 0.9910]], dtype=torch.float64), 'y': tensor([[0.3074],
        [0.6351],
        [0.1768]], dtype=torch.float64)}
{'x': tensor([[0.6111, 0.4331],
        [0.5809, 0.8824],
        [0.8413, 0.4020]], dtype=torch.float64), 'y': tensor([[0.8993],
        [0.5270],
        [0.1394]], dtype=torch.float64)}
{'x': tensor([[0.9195, 0.8204],
        [0.4893, 0.2745],
        [0.3625, 0.3232]], dtype=torch.float64), 'y': tensor([[0.1833],
        [0.0894],
        [0.8347]], dtype=torch.float64)}
{'x': tensor([[0.7682, 0.3446]], dtype=torch.float64), 'y': tensor([[0.4175]], dtype=torch.float64)}


<h2>Handling batch padding in Pipeline</h2>

<h3>Dropping the last batch</h3>

When we specify `batch_size` in the `Pipeline`, it will combine consecutive number of tensors into a batch and resulting shape will be <br><b>batch_size * shape of input tensor</b><br> However, if `batch_size` does not divide the input data evenly then last batch could have different batch_size than other batches.<br>
To drop the last batch we can set `drop_last` to `True`. Therefore, if the last batch is incomplete it will be dropped.

In [4]:
pipeline_fe = fe.Pipeline(train_data=dataset_fe, batch_size=3, drop_last=True)

Once we load the data and set the `drop_last` to `True` we will get three batches with three tensor data in each. Since last batch would have one tensor it will be dropped.

In [5]:
for elem in iter(pipeline_fe.get_loader(mode='train')):
    print(elem)

{'x': tensor([[0.7858, 0.4574],
        [0.6111, 0.4331],
        [0.8413, 0.4020]], dtype=torch.float64), 'y': tensor([[0.6351],
        [0.8993],
        [0.1394]], dtype=torch.float64)}
{'x': tensor([[0.9195, 0.8204],
        [0.8101, 0.5206],
        [0.7682, 0.3446]], dtype=torch.float64), 'y': tensor([[0.1833],
        [0.3074],
        [0.4175]], dtype=torch.float64)}
{'x': tensor([[0.5809, 0.8824],
        [0.9156, 0.9910],
        [0.4893, 0.2745]], dtype=torch.float64), 'y': tensor([[0.5270],
        [0.1768],
        [0.0894]], dtype=torch.float64)}


<h3>Padding the batch</h3>

In the previous section we saw that if last batch has different shape than rest of the batches then we can drop the last batch. But there might be scenario where the input tensors that are batched have different dimensions i.e. In Natural language processing problems we can have input strings can have different lengths. For that the tensors are padded out to the maximum length of the all the tensors in the dataset.


We will take numpy array that contains different shapes of array elements and load it into the `Pipeline`.

In [6]:
# define numpy arrays with different shapes
elem1 = np.array([4, 5])
elem2 = np.array([1, 2, 6])
elem3 = np.array([3])

# create train dataset
x_train = np.array([elem1, elem2, elem3])
y_train = np.random.sample((3, 1))
train_data = {"x": x_train, "y": y_train}
dataset_fe = NumpyDataset(train_data)

We will set any `pad_value` that we want to append at the end of the tensor data. `pad_value` must be either `int` or `float`

In [7]:
pipeline_fe = fe.Pipeline(train_data=dataset_fe, batch_size=3, pad_value=0)

Now let's iterate over the batch data

In [8]:
for elem in iter(pipeline_fe.get_loader(mode='train')):
    print(elem)

{'x': tensor([[4, 5, 0],
        [3, 0, 0],
        [1, 2, 6]]), 'y': tensor([[0.2983],
        [0.7672],
        [0.5528]], dtype=torch.float64)}


<h2>Benchmarking pipeline performance</h2>

In the ideal world, deep learning scientists would need to evaluate costs and speed in either in terms of data processing or model training before deploying. That makes benchmarking such tasks significant as we need good summary of the measures.<br>
`Pipeline.benchmark` provides that important feature of benchmarking processing speed of pre-processing operations in the `Pipeline`

We will take example of Fashion MNIST dataset and create a `Pipeline` for preprocessing data. We will then benchmark the processing speed of that `Pipeline`.

In [9]:
from fastestimator.dataset.data import mnist

mnist_train, mnist_eval = mnist.load_data()

We will create `Pipeline` with list of numpy operators that expand dimensions, apply minmax scaler and finally rotate the input images.

In [10]:
from fastestimator.op.numpyop.univariate import Minmax, ExpandDims
from fastestimator.op.numpyop.multivariate import Rotate

pipeline = fe.Pipeline(train_data=mnist_train,
                       eval_data=mnist_eval,
                       ops=[ExpandDims(inputs="x", outputs="x"),
                            Minmax(inputs="x", outputs="x_out"),
                            Rotate(image_in="x_out", image_out="x_out", limit=180)],
                      batch_size=2)

Let's benchmark the processing speed in the training mode.

In [11]:
pipeline.benchmark(mode="train")

FastEstimator: Step: 100, Epoch: 1, Steps/sec: 421.1976902895638
FastEstimator: Step: 200, Epoch: 1, Steps/sec: 590.3322445373035
FastEstimator: Step: 300, Epoch: 1, Steps/sec: 593.8247322852562
FastEstimator: Step: 400, Epoch: 1, Steps/sec: 594.6950855274174
FastEstimator: Step: 500, Epoch: 1, Steps/sec: 600.5430458527322
FastEstimator: Step: 600, Epoch: 1, Steps/sec: 600.5952042620336
FastEstimator: Step: 700, Epoch: 1, Steps/sec: 588.0463790296114
FastEstimator: Step: 800, Epoch: 1, Steps/sec: 587.0427478541964
FastEstimator: Step: 900, Epoch: 1, Steps/sec: 577.4463843779681
FastEstimator: Step: 1000, Epoch: 1, Steps/sec: 583.3355397999779
