# <font style="color:blue">Combine them all: LeNet5 pipeline with Trainer</font>

Let's take a look at how we can build the training pipeline using the Trainer helper class and the other helper classes we've discussed before in this notebook.
Import all the necessary classes and functions:

In [1]:
# %matplotlib notebook
# %load_ext autoreload
# %autoreload 2

from operator import itemgetter

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from torchvision.transforms import functional as Fn
from torchvision import datasets, transforms

from torchvision import datasets, transforms
from torch.optim.lr_scheduler import MultiStepLR

from trainer import Trainer, hooks, configuration
from trainer.utils import setup_system, patch_configs
from trainer.metrics import AccuracyEstimator
from trainer.tensorboard_visualizer import TensorBoardVisualizer

import matplotlib.pyplot as plt

import os
import numpy as np
import pandas as pd
import time

from torch.utils.data import Dataset, DataLoader, random_split
# from typing import Iterable
# from dataclasses import dataclass

from PIL import Image
import torch.nn.functional as F

  from .autonotebook import tqdm as notebook_tqdm
2025-02-22 13:14:11.795545: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1740258851.809865  433071 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1740258851.814152  433071 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-22 13:14:11.828874: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## <font style="color:Green">1. Get Training and Validation Data Loader</font>


Define the data wrappers and transformations (the same way as before):

In [2]:
class KenyanFood13Dataset(Dataset):
    """
    This custom dataset class takes root directory and train flag, 
    and returns dataset training dataset if train flag is true 
    else it returns validation dataset.
    """
    
    def __init__(self, data_root, image_shape=None, transform=None):
        
        """
        init method of the class.
        
         Parameters:
         
         data_root (string): path of root directory.
         
         train (boolean): True for training dataset and False for test dataset.
         
         image_shape (int or tuple or list): [optional] int or tuple or list. Defaut is None. 
                                             If it is not None image will resize to the given shape.
                                 
         transform (method): method that will take PIL image and transform it.
         
        """
        
        # get label to species mapping
        label_csv_path = os.path.join(data_root, 'train.csv')
        self.data_df = pd.read_csv(label_csv_path, delimiter=' *, *', engine='python')
        self.classes = self.data_df.iloc[:, 1].unique()
        self.num_classes = len(self.classes)
        self.image_ids = self.data_df.iloc[:, 0]

        self.class_given_label = {image_id : image_class for image_id, image_class in enumerate(self.classes)}
        self.label_given_class = {image_class : image_id for image_id, image_class in enumerate(self.classes)}
        
        # set image_resize attribute
        if image_shape is not None:
            if isinstance(image_shape, int):
                self.image_shape = (image_shape, image_shape)
            
            elif isinstance(image_shape, tuple) or isinstance(image_shape, list):
                assert len(image_shape) == 1 or len(image_shape) == 2, 'Invalid image_shape tuple size'
                if len(image_shape) == 1:
                    self.image_shape = (image_shape[0], image_shape[0])
                else:
                    self.image_shape = image_shape
            else:
                raise NotImplementedError 
                
        else:
            self.image_shape = image_shape
            
        # set transform attribute
        self.transform = transform

        # initialize the data dictionary
        self.data_dict = {
            'image_path': [],
            'label': []
        }
        img_dir = os.path.join(data_root, 'images', 'images')

        # print("self.data_df", type(self.data_df))
        for data in self.data_df.iterrows():
            image_id = str(data[1]['id']) + '.jpg'
            image_path = os.path.join(img_dir, image_id)
            image_class = data[1]['class']
            label = self.label_given_class[image_class]
            self.data_dict['image_path'].append(image_path)
            self.data_dict['label'].append(label)

    
    def __len__(self):
        """
        return length of the dataset
        """
        return len(self.data_dict['label'])
    
    
    def __getitem__(self, idx):
        """
        For given index, return images with resize and preprocessing.
        """
        
        image = Image.open(self.data_dict['image_path'][idx]).convert("RGB")
        
        if self.image_shape is not None:
            image = F.resize(image, self.image_shape)
            
        if self.transform is not None:
            image = self.transform(image)
            
        target = self.data_dict['label'][idx]
        
        return image, target            
                
        
    def class_name(self, label):
        """
        class label to common name mapping
        """
        return self.class_given_label[label]

    def get_classes(self):
        return self.classes

In [3]:
from torch.utils.data import Dataset

class TransformedSubset(Dataset):
    def __init__(self, subset, transform):
        self.subset = subset
        self.transform = transform

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        image, label = self.subset[idx]  # Get item from subset
        if self.transform:
            image = self.transform(image)  # Apply transformation
        return image, label

In [4]:
def get_mean_std(dataset, batch_size=8, num_workers=4):
    
    # transform = image_preprocess_transforms()
    
    # loader = data_loader(data_root, transform)
    loader = torch.utils.data.DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers
    )
    
    batch_mean = torch.zeros(3)
    batch_mean_sqrd = torch.zeros(3)

    for batch_data, _ in loader:
        batch_mean += batch_data.mean(dim=(0, 2, 3)) # E[batch_i] 
        batch_mean_sqrd += (batch_data ** 2).mean(dim=(0, 2, 3)) #  E[batch_i**2]
    
    # E[dataset] = E[E[batch_1], E[batch_2], ...]
    mean = batch_mean / len(loader)
    
    # var[X] = E[X**2] - E[X]**2
    
    # E[X**2] = E[E[batch_1**2], E[batch_2**2], ...]
    # E[X]**2 = E[E[batch_1], E[batch_2], ...] ** 2
    
    var = (batch_mean_sqrd / len(loader)) - (mean ** 2)
        
    std = var ** 0.5
    print('mean: {}, std: {}'.format(mean, std))
    
    return mean, std

In [5]:
def get_data(batch_size, data_root='data', num_workers=1):
    compulsary_preprocess = transforms.Compose([
        # Resize to 32X32
        # transforms.Resize((32, 32)),
        # this re-scale image tensor values between 0-1. image_tensor /= 255
        # transforms.ToTensor(),
        # subtract mean (0.1307) and divide by variance (0.3081).
        # This mean and variance is calculated on training data (verify yourself)
        # transforms.Normalize((0.1307, ), (0.3081, ))
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor()
    ])

    dataset =  KenyanFood13Dataset(data_root, image_shape=None, transform=compulsary_preprocess)
    classes = dataset.get_classes()
    
    train_size = int(0.8 * len(dataset)) # 80% for training
    test_size = len(dataset) - train_size # 20% for validation

    train_dataset_compulsary_prepocess, test_dataset = random_split(dataset, [train_size, test_size])

    # test dataloader
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers
    )

    train_mean, train_std = get_mean_std(train_dataset_compulsary_prepocess, batch_size=batch_size, num_workers=num_workers)

    train_preprocess = transforms.Compose([
        transforms.ToPILImage(),
        # transforms.RandomRotation(20),
        transforms.ToTensor(),
        # transforms.RandomHorizontalFlip(),
        # transforms.RandomVerticalFlip(),
        # transforms.RandomCrop(28, padding=4),
        # transforms.PILToTensor(),
        # transforms.ConvertImageDtype(torch.float),
        # transforms.RandomPerspective(distortion_scale=0.6, p=1),
        # transforms.ColorJitter(brightness=.5, hue=.3),
        # transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        transforms.Normalize(mean=train_mean, std=train_std)
    ])

    # Apply transformation to the subset
    train_dataset_subset = TransformedSubset(train_dataset_compulsary_prepocess, train_preprocess)

    train_loader = torch.utils.data.DataLoader(
        train_dataset_subset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers
    )
    
    return train_loader, test_loader, train_mean, train_std, classes

In [6]:
def PlotLoader(loader):
    # Plot few images
    plt.rcParams["figure.figsize"] = (15, 9)
    plt.figure
    for images, labels in loader:
        for i in range(len(labels)):
            plt.subplot(3, 5, i+1)
            img = Fn.to_pil_image(images[i])
            plt.imshow(img)
            plt.gca().set_title('Target: {0}'.format(labels[i]))
        plt.show()
        break

In [7]:
# train_loader, test_loader, train_mean, train_std, classes = get_data(batch_size=15, data_root='../../../../data/Week7_project2_classification/KenyanFood13Dataset', num_workers=1)
# print(classes)
# PlotLoader(train_loader)
# print("---------")
# PlotLoader(test_loader)

## <font style="color:Green">2. Define the Model</font>

Define the model (the same way as before):

In [8]:
# class LeNet5(nn.Module):
#     def __init__(self):
#         super().__init__()

#         self._body = nn.Sequential(
#             nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2),
#             nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(kernel_size=2),
#         )
#         self._head = nn.Sequential(
#             nn.Linear(in_features=16 * 5 * 5, out_features=120), nn.ReLU(inplace=True),
#             nn.Linear(in_features=120, out_features=84), nn.ReLU(inplace=True),
#             nn.Linear(in_features=84, out_features=10)
#         )

#     def forward(self, x):
#         x = self._body(x)
#         x = x.view(x.size()[0], -1)
#         x = self._head(x)
#         return x

In [9]:
class LeNet5(nn.Module):
    def __init__(self):
        super().__init__()

        # convolution layers
        self._body = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=7),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
        )

        
        # Fully connected layers
        self._head = nn.Sequential(
            nn.Linear(in_features=64*52*52, out_features=1024), 
            nn.ReLU(inplace=True),
            
            nn.Linear(in_features=1024, out_features=13)
            
        )
    
    def forward(self, x):
        
        # apply feature extractor
        x = self._body(x)
        # flatten the output of conv layers
        # dimension should be batch_size * number_of weight_in_last conv_layer
        x = x.view(x.size()[0], -1)
        # apply classification head
        x = self._head(x)
        
        
        return x

## <font style="color:Green">3. Start Experiment / Training</font>


Define the experiment with the given model and given data. It's the same idea again: we keep the less-likely-to-change things inside the object and configure it with the things that are more likely to change.

You may wonder, why do we put the specific metric and optimizer into the experiment code and not specify them as parameters. But the experiment class is just a handy way to store all the parts of your experiment in one place. If you change the loss function, or the optimizer, or the model - it seems like another experiment, right? So it deserves to be a separate class.

The Trainer class inner structure is a bit more complicated compared to what we've discussed above - it is just to be able to cope with the different kinds of the tasks we will discuss in this course. We will elaborate a bit more on the Trainer inner structure in the following lectures and now take a look at how compact and self-descriptive the code is:

In [10]:
class Experiment:
    def __init__(
        self,
        system_config: configuration.SystemConfig = configuration.SystemConfig(),
        dataset_config: configuration.DatasetConfig = configuration.DatasetConfig(),
        dataloader_config: configuration.DataloaderConfig = configuration.DataloaderConfig(),
        optimizer_config: configuration.OptimizerConfig = configuration.OptimizerConfig()
    ):
        self.loader_train, self.loader_test, self.train_mean, self.train_std, self.labels = get_data(
            batch_size=dataloader_config.batch_size,
            num_workers=dataloader_config.num_workers,
            data_root=dataset_config.root_dir
        )
        
        setup_system(system_config)

        self.model = LeNet5()
        self.loss_fn = nn.CrossEntropyLoss()
        self.metric_fn = AccuracyEstimator(topk=(1, ))
        self.optimizer = optim.SGD(
            self.model.parameters(),
            lr=optimizer_config.learning_rate,
            weight_decay=optimizer_config.weight_decay,
            momentum=optimizer_config.momentum
        )
        self.lr_scheduler = MultiStepLR(
            self.optimizer, milestones=optimizer_config.lr_step_milestones, gamma=optimizer_config.lr_gamma
        )
        self.visualizer = TensorBoardVisualizer()

    def run(self, trainer_config: configuration.TrainerConfig) -> dict:

        device = torch.device(trainer_config.device)
        self.model = self.model.to(device)
        self.loss_fn = self.loss_fn.to(device)

        model_trainer = Trainer(
            model=self.model,
            loader_train=self.loader_train,
            loader_test=self.loader_test,
            loss_fn=self.loss_fn,
            metric_fn=self.metric_fn,
            optimizer=self.optimizer,
            lr_scheduler=self.lr_scheduler,
            device=device,
            data_getter=itemgetter(0),
            target_getter=itemgetter(1),
            stage_progress=trainer_config.progress_bar,
            get_key_metric=itemgetter("top1"),
            visualizer=self.visualizer,
            model_saving_frequency=trainer_config.model_saving_frequency,
            save_dir=trainer_config.model_dir
        )
        
        model_trainer.register_hook("end_epoch", hooks.end_epoch_hook_classification)
        self.metrics = model_trainer.fit(trainer_config.epoch_num)
        return self.metrics, self.train_mean, self.train_std, self.labels


In [11]:
def main():
    '''Run the experiment
    '''
    # patch configs depending on cuda availability
    dataloader_config, trainer_config = patch_configs(epoch_num_to_set=1)#5)
    # dataset_config = configuration.DatasetConfig(root_dir="data")
    dataset_config = configuration.DatasetConfig(root_dir="../../../../data/Week7_project2_classification/KenyanFood13Dataset")
    experiment = Experiment(dataset_config=dataset_config, dataloader_config=dataloader_config)
    results, train_mean, train_std, labels = experiment.run(trainer_config)

    return results, train_mean, train_std, labels

In [12]:
# if __name__ == '__main__':
#     results, train_mean, train_std, labels = main()
#     print(train_mean, train_std, labels)

# <font style="color:blue">Predictions</font><a name="predictions"></a>

## <font style="color:blue">Make Predictions</font>

## <font style="color:blue">Get Predictions on a Batch</font>

In [13]:
class KenyanFood13DatasetTest(Dataset):
    """
    This custom dataset class takes root directory and train flag, 
    and returns dataset training dataset if train flag is true 
    else it returns validation dataset.
    """
    
    def __init__(self, data_root, image_shape=None, transform=None):
        
        """
        init method of the class.
        
         Parameters:
         
         data_root (string): path of root directory.
         
         train (boolean): True for training dataset and False for test dataset.
         
         image_shape (int or tuple or list): [optional] int or tuple or list. Defaut is None. 
                                             If it is not None image will resize to the given shape.
                                 
         transform (method): method that will take PIL image and transform it.
         
        """
        
        # get label to species mapping
        label_csv_path = os.path.join(data_root, 'test.csv')
        # label_csv_path = os.path.join(data_root, 'test_trial1.csv')
        self.data_df = pd.read_csv(label_csv_path, delimiter=' *, *', engine='python')
        self.image_ids = self.data_df.iloc[:, 0]
        
        # set image_resize attribute
        if image_shape is not None:
            if isinstance(image_shape, int):
                self.image_shape = (image_shape, image_shape)
            
            elif isinstance(image_shape, tuple) or isinstance(image_shape, list):
                assert len(image_shape) == 1 or len(image_shape) == 2, 'Invalid image_shape tuple size'
                if len(image_shape) == 1:
                    self.image_shape = (image_shape[0], image_shape[0])
                else:
                    self.image_shape = image_shape
            else:
                raise NotImplementedError 
                
        else:
            self.image_shape = image_shape
            
        # set transform attribute
        self.transform = transform

        # initialize the data dictionary
        self.data_dict = {
            'image_path': [],
        }
        img_dir = os.path.join(data_root, 'images', 'images')

        for data in self.data_df.iterrows():
            image_id = str(data[1]['id']) + '.jpg'
            image_path = os.path.join(img_dir, image_id)
            self.data_dict['image_path'].append(image_path)
    
    def __len__(self):
        """
        return length of the dataset
        """
        return len(self.data_dict['image_path'])
    
    
    def __getitem__(self, idx):
        """
        For given index, return images with resize and preprocessing.
        """
        image = Image.open(self.data_dict['image_path'][idx]).convert("RGB")
        
        if self.image_shape is not None:
            image = Fn.resize(image, self.image_shape)
            
        if self.transform is not None:
            image = self.transform(image)
            
        return image, self.image_ids.iat[idx]
    
    # def get_image_id(self, idx):
    #     return self.image_ids.iat[idx]
    

In [14]:
data_root = '../../../../data/Week7_project2_classification/KenyanFood13Dataset'

# dataset =  KenyanFood13DatasetTest(data_root, image_shape=256)

# # print('Length of the dataset: {}'.format(len(dataset)))

# img, img_id = dataset[5]
# print(img.size)
# print('Image_id: {}'.format(img_id))
# plt.imshow(img)
# plt.show()

In [15]:
def load_model(model, model_dir, model_file_name):
    model_path = os.path.join(model_dir, model_file_name)

    # loading the model and getting model parameters by using load_state_dict
    checkpoint = torch.load(model_path)
    
    model.load_state_dict(checkpoint['model_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']

    return model, epoch, loss

In [16]:
def prediction(model, device, batch_input):
    
#     data = batch_input.to(device)
    data = batch_input.to("cpu")
    
    with torch.no_grad():
        output = model(data)

    # Score to probability using softmax
    prob = F.softmax(output, dim=1)

    # get the max probability
    pred_prob = prob.data.max(dim=1)[0]
    
    # get the index of the max probability
    pred_index = prob.data.max(dim=1)[1]
    
    return pred_index.cpu().numpy(), pred_prob.cpu().numpy()

### <font style="color:green">Compulsary Preprocessing Transforms</font>

In [17]:
def image_compulsary_transforms():
    preprocess = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor()
        ])
    
    return preprocess

### <font style="color:green">Common Image Transforms</font>

In [18]:
def image_common_transforms(mean=(0.4611, 0.4359, 0.3905), std=(0.2193, 0.2150, 0.2109)):
    preprocess = image_compulsary_transforms()
    
    common_transforms = transforms.Compose([
        preprocess,
        transforms.Normalize(mean, std)
    ])
    
    return common_transforms

In [19]:
def get_sample_prediction(model, data_root, train_mean, train_std, labels, output_root):
    transforms.Normalize(train_mean, train_std)
    
    
    test_dataset_trans =  KenyanFood13DatasetTest(data_root, image_shape=None, transform=image_common_transforms(train_mean, train_std))
    
    batch_size = 15
    num_workers = 4


    

    # test_loader_trans = torch.utils.data.DataLoader(
    #     test_dataset_trans,
    #     batch_size=batch_size,
    #     shuffle=False,
    #     num_workers=num_workers
    # )

    # PlotLoader(test_loader)
    # print("----------------")
    # PlotLoader(test_loader_trans)
    

    if torch.cuda.is_available():
        device = "cuda"
        num_workers = 8
    else:
        device = "cpu"
        num_workers = 2
    
    # It is important to do model.eval() before prediction
    model.eval()
    
    # Send model to cpu/cuda according to your system configuration
#     model.to(device)
    model.to("cpu")


    
    
    data_len = test_dataset_trans.__len__()
    print("data_len: ", data_len)
    
    interval = 1 #int(data_len/batch_size)
    classes = []
    image_ids = []
    for start in range(0, data_len, batch_size):
        # end_ = (data_len - start) % batch_size
        end = start + batch_size
        end = min(end, data_len)
        # print('start: {}, end: {}'.format(start, end))

        trans_images = []
        for index in range(start, end):
            trans_image, image_id = test_dataset_trans[index]
            # print('index: {}, img_id: {}'.format(index, img_id))
    
            trans_images.append(trans_image)
            image_ids.append(image_id)
        
        trans_images = torch.stack(trans_images)
        classes_index, prob = prediction(model, device, batch_input=trans_images)
        # print("classes_index:", classes_index)
        
        classes.extend([labels[class_index] for class_index in classes_index])
    
    data = {
        'id': image_ids,
        'class': classes
    }
    df = pd.DataFrame(data)
    
    label_csv_path = os.path.join(output_root, 'output.csv')
    df.to_csv(label_csv_path, sep=",", index=False)
    
    return

## <font style="color:blue">Load Model and Run Inference</font>

In [22]:
m = LeNet5()
model_dir = "./checkpoints"
model_file_name = "checkpoint1.pt"
model, epoch, loss = load_model(m, model_dir, model_file_name)
print(epoch, loss)

0 2.5089458285308464


In [23]:
train_mean=torch.tensor([0.5772715211, 0.4631873667, 0.3466044068])
train_std =torch.tensor([0.2699360847, 0.2737641633, 0.2830057442])
labels = ['githeri', 'ugali', 'kachumbari', 'matoke', 'sukumawiki', 'bhaji', 'mandazi',
 'kukuchoma', 'nyamachoma', 'pilau', 'chapati', 'masalachips', 'mukimo']
get_sample_prediction(model, data_root, train_mean, train_std, labels, "./submissions/")

# PlotLoader(test_loader)

data_len:  1638


  return torch._C._cuda_getDeviceCount() > 0


So in a few lines of code, we got a more robust system that we had before - we have richer visualizations, a more configurable training process, and we separated the pipeline for the training from the model - so we can concentrate on the things that matter the most.

# <font style="color:blue">References</font>

You may wonder whether it is a common way of doing deep learning or we're doing overengineering here. We may assure you that this is a common way to do deep learning research in an industry - most of the companies and research groups invest in building these DL training frameworks for their projects, and some of them are even published to the open-source. To name a couple of them:
- https://github.com/NVlabs/SPADE
- https://github.com/pytorch/ignite
- https://github.com/PyTorchLightning/pytorch-lightning
- https://github.com/catalyst-team/catalyst
- https://github.com/open-mmlab/mmdetection
- https://github.com/fastai/fastai