# MNIST classification example

This example shows how to build deep learning pipelines in PyPipeline.

We create a training and inference pipeline for MNIST classification, using a
ResNet classifier and PyTorch-Lightning. PyTorch-Lightning is a nice abstraction layer on top of
pure PyTorch, which takes away a lot of the boilerplate code of neural network training.

In the inference pipeline, we scale up the ResNet classifier cell for higher throughput.

## Model code

First, install the required prerequisites:

In [None]:
!pip install torch>=1.6 torchvision>=0.7
!pip install pytorch-lightning>=1.3

In [None]:
from typing import Tuple
import torch
from torch import Tensor
from torch.nn import CrossEntropyLoss
from torchmetrics.functional import accuracy
import torchvision.models.resnet as resnet
import pytorch_lightning as pl

Then we define the PyTorch Lightning module, which is totally independent of PyPipeline.
This code defines the neural network architecture to use, the loss functions, and how the
training should look like.

In [None]:
class ResNetClassifierModule(pl.LightningModule):

    def __init__(self, num_classes: int = 10, num_input_channels: int = 1):
        super(ResNetClassifierModule, self).__init__()
        self.num_classes = num_classes
        self.num_input_channels = num_input_channels
        self.save_hyperparameters()     # saves our __init__ arguments for restoring

        # Configure the model
        self.model = resnet.resnet18(pretrained=False, num_classes=num_classes)
        self.model.conv1 = torch.nn.Conv2d(num_input_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.final_layer = torch.nn.Softmax(dim=1)

        # Configure loss function
        self.loss_fn = CrossEntropyLoss()

    def forward(self, x: Tensor) -> Tensor:
        x = self.model(x)
        x = self.final_layer(x)
        return x

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

    def training_step(self, batch: Tuple[Tensor, Tensor], batch_idx: int):
        x, y = batch
        y_hat = self(x)
        loss = self.loss_fn(y_hat.squeeze(dim=1), y)
        prediction = torch.argmax(y_hat, dim=1)
        acc = accuracy(prediction, y)
        self.log("train_loss", loss)
        self.log("train_acc", acc)
        return loss

    def evaluate(self, batch, stage=None):
        x, y = batch
        y_hat = self(x)
        loss = self.loss_fn(y_hat.squeeze(dim=1), y)
        prediction = torch.argmax(y_hat, dim=1)
        acc = accuracy(prediction, y)
        self.log(f"{stage}_loss", loss, prog_bar=True)
        self.log(f"{stage}_acc", acc, prog_bar=True)

    def validation_step(self, batch: Tuple[Tensor, Tensor], batch_idx: int):
        self.evaluate(batch, "val")

    def test_step(self, batch: Tuple[Tensor, Tensor], batch_idx: int):
        self.evaluate(batch, "test")

Now that we've created the PyTorch Lightning module, we could train or execute it outside of PyPipeline as follows:

```
classifier = ResNetClassifierModule()

# Training:
trainer = pl.Trainer(max_epochs=epochs, gpus=num_gpus)
trainer.fit(classifier, train_dataloader, val_dataloader)

# Inference
label = classifier(image)
```

## Training pipeline

To use it in PyPipeline, we have to put it inside a pipeline. We'll build the pipeline as follows:

![Training pipeline](mnistclassification-training-pipeline.png)

The training pipeline consists of 3 cells:
- Two ``DataLoaderSourceCells``, which can be configured with a PyTorch ``DataLoader`` object to load samples
  and their corresponding labels. As PyTorch dataloaders load their data in batches,
  this cell has an option to remove the batch dimension when the dataloader batch size is set to 1.
- A ``ResNetClassifierTrainingsCell``, which trains the whole network in 1 ``pull()`` call.

Note that this pipeline uses **advanced control flow**: the pipeline has only to be pulled once to fully train the model.
During that pull, the training cell pulls its inputs multiple times in different phases.

1. The ``train_image`` and ``train_label`` inputs are pulled until the train source raises a
   ``StopIteration`` when its dataloader is empty. This signals the training cell that the validation should begin.
2. The ``val_image`` and ``val_label`` inputs are pulled until the validation source raises a ``StopIteration``.
   At this moment, one epoch has finished.
3. Before starting the next epoch, a reset signal is sent to both data source cells, such that their dataloaders
   can be reinitialized (and reshuffled).

**PyPipeline provides tools to handle this advanced control flow in the ``pypipeline_lib`` package:**
- The ``pypipeline_lib.torch.DataLoaderSourceCell`` is a source cell that can be configured using a PyTorch dataloader.
- The ``pypipeline_lib.torch.CellInputDataset`` and ``pypipeline_lib.torch.CellInputDataLoader`` are PyTorch dataset
  and dataloader objects that can load their data out of a PyPipeline cell's inputs.

They handle the ``StopIteration``, reinitialization and reshuffling behind the scenes.

In [None]:
from pathlib import Path
from typing import Optional
from pytorch_lightning.callbacks import ModelCheckpoint

from pypipeline.cell import ASingleCell, ACompositeCell, Pipeline, ScalableCell
from pypipeline.cellio import Input, Output, ConfigParameter, InputPort, OutputPort
from pypipeline.connection.connection import Connection
from pypipeline_lib.torch import CellInputDataLoader, CellInputDataset, DataLoaderSourceCell

In [None]:
class ResNetClassifierTrainingsCell(ASingleCell):

    def __init__(self, parent_cell: "Optional[ACompositeCell]", name: str):
        super(ResNetClassifierTrainingsCell, self).__init__(parent_cell, name)

        # Create inputs and outputs
        self.input_train_image: Input[Tensor] = Input(self, "train_image")
        self.input_train_label: Input[Tensor] = Input(self, "train_label")
        self.input_val_image: Input[Tensor] = Input(self, "val_image")
        self.input_val_label: Input[Tensor] = Input(self, "val_label")
        self.output_model_path: Output[Path] = Output(self, "output_model_path")

        # Create configuration parameters
        # The number of processes is how many times the model must be created in parallel. If use_gpu is on,
        # then an equal amount of GPUs should be available.
        self.config_use_gpu: ConfigParameter[bool] = ConfigParameter(self, "use_gpu")
        self.config_num_processes: ConfigParameter[int] = ConfigParameter(self, "num_processes")
        self.config_batch_size: ConfigParameter[int] = ConfigParameter(self, "batch_size")
        self.config_epochs: ConfigParameter[int] = ConfigParameter(self, "epochs")

        # Only set after cell deployment
        self.classifier: Optional[ResNetClassifierModule] = None

    def _on_deploy(self) -> None:
        super(ResNetClassifierTrainingsCell, self)._on_deploy()
        self.classifier = ResNetClassifierModule(num_classes=10, num_input_channels=1)

    def _on_undeploy(self) -> None:
        super(ResNetClassifierTrainingsCell, self)._on_undeploy()
        self.classifier = None

    def _on_pull(self) -> None:
        assert self.classifier is not None

        # Request the values of our parameters
        batch_size = self.config_batch_size.get_value()
        use_gpu = self.config_use_gpu.get_value()
        num_processes = self.config_num_processes.get_value()
        epochs = self.config_epochs.get_value()

        # Wrap our image and label inputs in a torch.utils.data.Dataset object.
        # PyPipeline has a specific dataset class tailored for this: the CellInputDataset.
        train_dataset = CellInputDataset(self.input_train_image, self.input_train_label)
        train_dataloader = CellInputDataLoader(train_dataset, batch_size=batch_size)

        val_dataset = CellInputDataset(self.input_val_image, self.input_val_label)
        val_dataloader = CellInputDataLoader(val_dataset, batch_size=batch_size)

        # Start the training
        checkpoint_callback = ModelCheckpoint(verbose=False, monitor="val_loss")     # This is very configurable
        if use_gpu:
            trainer = pl.Trainer(max_epochs=epochs, gpus=num_processes,
                                 callbacks=[checkpoint_callback])
        else:
            trainer = pl.Trainer(max_epochs=epochs, num_processes=num_processes, distributed_backend="ddp_cpu",
                                 callbacks=[checkpoint_callback])

        trainer.fit(self.classifier, train_dataloader, val_dataloader)

        # Make the path to the best model checkpoint available as output of the cell.
        best_model_path = Path(checkpoint_callback.best_model_path)
        print(f"Best model path: {best_model_path}")
        self.classifier = self.classifier.load_from_checkpoint(str(best_model_path))
        self.output_model_path.set_value(best_model_path)

    def supports_scaling(self) -> bool:
        # This cell doesn't support parallelizing with PyPipeline, as the training
        # process is stateful.
        return False


As we have defined the PyPipeline cells for our classifier, we can now create a training pipeline for it.
This pipeline will also contain source cells (and possibly some other cells as well, ex. preprocessing),
which are needed to feed our classifier cell with data.


In [None]:
class ResNetClassifierTrainingPipeline(Pipeline):

    def __init__(self, parent_cell: "Optional[ACompositeCell]", name: str):
        super(ResNetClassifierTrainingPipeline, self).__init__(parent_cell, name)

        # Create the cells
        self.train_source: DataLoaderSourceCell[Tensor, Tensor] = \
            DataLoaderSourceCell(self, "train_source")
        self.val_source: DataLoaderSourceCell[Tensor, Tensor] = \
            DataLoaderSourceCell(self, "val_source")
        self.classifier = ResNetClassifierTrainingsCell(self, "classifier_training_cell")

        # Create the connections
        Connection(self.train_source.output_sample, self.classifier.input_train_image)
        Connection(self.train_source.output_label, self.classifier.input_train_label)

        Connection(self.val_source.output_sample, self.classifier.input_val_image)
        Connection(self.val_source.output_label, self.classifier.input_val_label)



We instantiate our training pipelines.
Some cells need some configuration, like the classifier cell (num_epochs, ...) and
the source cells (which dataloaders to use, ...).



In [None]:
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import MNIST
from torchvision.transforms.functional import to_tensor

In [None]:
# Prepare Dataset and Dataloader to provide to our dataloader source cells.
mnist_train = MNIST("/home/johannes/data/mnist/", train=True, download=True, transform=to_tensor)
mnist_train, mnist_val = random_split(mnist_train, [55000, 5000])       # type: ignore
mnist_test = MNIST("/home/johannes/data/mnist/", train=False, download=True, transform=to_tensor)

# During training, we want our data to flow one by one through the pipeline, so we use batch size one
mnist_train_loader = DataLoader(mnist_train, batch_size=1, num_workers=4, shuffle=True)
mnist_val_loader = DataLoader(mnist_val, batch_size=1, num_workers=4, shuffle=True)
# But we'll do inference on batches of 4:
mnist_test_loader = DataLoader(mnist_test, batch_size=4, num_workers=4, shuffle=True)

# Training pipeline
# =================
# Creation
training_pipeline = ResNetClassifierTrainingPipeline(None, "mnist_training_pipeline")

# Configuration
training_pipeline.classifier.config_use_gpu.set_value(True)
training_pipeline.classifier.config_num_processes.set_value(1)
training_pipeline.classifier.config_batch_size.set_value(100)
training_pipeline.classifier.config_epochs.set_value(20)

training_pipeline.train_source.config_dataloader.set_value(mnist_train_loader)
training_pipeline.train_source.config_remove_batch_dimension.set_value(True)
training_pipeline.val_source.config_dataloader.set_value(mnist_val_loader)
training_pipeline.val_source.config_remove_batch_dimension.set_value(True)

# Train our model
training_pipeline.deploy()
training_pipeline.pull()
best_checkpoint_path = training_pipeline.classifier.output_model_path.get_value()
training_pipeline.undeploy()

## Inference pipeline

To use it in PyPipeline, we have to put it inside a pipeline. We'll create the following inference
pipeline:

![Inference pipeline](mnistclassification-inference-pipeline.png)

This example shows batch-wise inference with batches of size 4. Note how the ``mnist_test_loader`` was
defined with batch size 4, and how the ``DataLoaderSourceCell.remove_batch_dimension`` configuration parameter is set
to False.

In [None]:

class ResNetClassifierCell(ASingleCell):

    def __init__(self, parent_cell: "Optional[ACompositeCell]", name: str):
        super(ResNetClassifierCell, self).__init__(parent_cell, name)

        # Create inputs and outputs
        self.input_images: Input[Tensor] = Input(self, "images")
        self.output_labels: Output[Tensor] = Output(self, "labels")

        # Create configuration parameters
        self.config_use_gpu: ConfigParameter[bool] = ConfigParameter(self, "use_gpu")
        self.config_checkpoint: ConfigParameter[Path] = ConfigParameter(self, "checkpoint_to_load")

        # Only set after cell deployment
        self.classifier: Optional[ResNetClassifierModule] = None

    def _on_deploy(self) -> None:
        super(ResNetClassifierCell, self)._on_deploy()
        ckpt_path: Path = self.config_checkpoint.get_value()
        self.classifier = ResNetClassifierModule.load_from_checkpoint(str(ckpt_path))
        self.classifier.eval()

    def _on_undeploy(self) -> None:
        super(ResNetClassifierCell, self)._on_undeploy()
        self.classifier = None

    def _on_pull(self) -> None:
        assert self.classifier is not None

        # Pull the inputs and set the outputs
        images: Tensor = self.input_images.pull()         # Input: shape (batch, channels, x, y), torch.float32, range [0, 1]
        labels = self.classifier(images)  # shape (batch, classes,), torch.float32, range [0, 1]
        labels = labels.cpu().detach()
        self.output_labels.set_value(labels)

    def supports_scaling(self) -> bool:
        return True

Now we define an inference pipeline for our classifier cell.
Next to the necessary source cells, we also add a ScalableCell, which will
allow our classifier cell to be scaled up as much as we need.


In [None]:

class ResNetClassifierScalableCell(ScalableCell):

    def __init__(self, parent_cell: "Optional[ACompositeCell]", name: str):
        super(ResNetClassifierScalableCell, self).__init__(parent_cell, name)

        # Create the inputs and outputs
        self.input_images: InputPort[Tensor] = InputPort(self, "images")
        self.output_labels: OutputPort[Tensor] = OutputPort(self, "labels")

        # Create the cells
        self.classifier = ResNetClassifierCell(self, "classifier_cell", )

        # Create the connections
        Connection(self.input_images, self.classifier.input_images)
        Connection(self.classifier.output_labels, self.output_labels)


In [None]:
class ResNetClassifierInferencePipeline(Pipeline):

    def __init__(self, parent_cell: "Optional[ACompositeCell]", name: str):
        super(ResNetClassifierInferencePipeline, self).__init__(parent_cell, name)

        # Create the cells
        # TODO also make a sink cell to store the result?
        # In the inference pipeline, we want to do batch predictions, so don't remove the batch dim:
        self.source: DataLoaderSourceCell[Tensor, Tensor] = DataLoaderSourceCell(self, "source")
        self.classifier_scalable = ResNetClassifierScalableCell(self, "classifier_scalable")

        # Create the connections
        Connection(self.source.output_sample, self.classifier_scalable.input_images)


In [None]:
# Inference pipeline
# ==================
# Now that we have trained our classifier, we can use it in an inference pipeline which supports upscaling.
inference_pipeline = ResNetClassifierInferencePipeline(None, "mnist_inference_pipeline")

# Configuration
inference_pipeline.classifier_scalable.classifier.config_use_gpu.set_value(True)
inference_pipeline.classifier_scalable.classifier.config_checkpoint.set_value(best_checkpoint_path)

inference_pipeline.source.config_dataloader.set_value(mnist_test_loader)
inference_pipeline.source.config_remove_batch_dimension.set_value(False)

# Scale up our scalable cell to 2 parallel instances
inference_pipeline.classifier_scalable.scale_up(times=2)
inference_pipeline.deploy()

print("Start inference")
for i in range(inference_pipeline.get_nb_available_pulls()):
    inference_pipeline.pull()

    # We could add a sink cell that writes the labels to disk or visualizes them together with the input image,
    # but for now we just print them out:
    labels = inference_pipeline.classifier_scalable.output_labels.get_value()
    print(f"Classification probabilities: {labels}")

inference_pipeline.undeploy()
print("Finished")