## Introduction

In this notebook, we create a model and training loop to classify food images from the Food101 into different categories. We start by importing the packages we need below

In [1]:
# Import packages
import os
import torch
import random
import numpy as np
from torch import nn
from PIL import Image
from pathlib import Path

from torchvision import datasets
from torchvision import transforms
from matplotlib import pyplot as plt

from torch.utils.data import DataLoader

import torchinfo
from torchinfo import summary

from tqdm.auto import tqdm
from timeit import default_timer as timer 

import wandb

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
# List files in the data
def show_classes(data_path):
    """
    Return class list
    """
    return os.listdir(data_path)



def display_random_image(data_path, class_name):
    """
    Display a few randomly
    picked images from training
    data
    """
    class_path = data_path / class_name
    img_paths = [class_path / f for f in os.listdir(class_path)]
    
    n_images = len(img_paths)
    image_id = np.random.randint(0, n_images)
    
    img_path = img_paths[image_id]
    img = Image.open(img_path)
    img.show()


def plot_transformed_images(image_paths, transform, n=3, seed=42):
    """Plots a series of random images from image_paths.

    Will open n image paths from image_paths, transform them
    with transform and plot them side by side.

    Args:
        image_paths (list): List of target image paths. 
        transform (PyTorch Transforms): Transforms to apply to images.
        n (int, optional): Number of images to plot. Defaults to 3.
        seed (int, optional): Random seed for the random generator. Defaults to 42.
    """
    random.seed(seed)
    random_image_paths = random.sample(image_paths, k=n)
    for image_path in random_image_paths:
        with Image.open(image_path) as f:
            fig, ax = plt.subplots(1, 2)
            ax[0].imshow(f) 
            ax[0].set_title(f"Original \nSize: {f.size}")
            ax[0].axis("off")

            # Transform and plot image
            # Note: permute() will change shape of image to suit matplotlib 
            # (PyTorch default is [C, H, W] but Matplotlib is [H, W, C])
            transformed_image = transform(f).permute(1, 2, 0) 
            ax[1].imshow(transformed_image) 
            ax[1].set_title(f"Transformed \nSize: {transformed_image.shape}")
            ax[1].axis("off")

            fig.suptitle(f"Class: {image_path.parent.stem}", fontsize=16)

def test_transform(train_data_path, class_name):
    """
    Test transforms
    """
    image_path_list = [] 

    for f in os.listdir(train_data_path / class_name):
        image_path_list.append(train_data_path / class_name / f)

    plot_transformed_images(image_path_list, 
                            transform=data_transform, n=3)


def test_forward_pass(model, train_dataloader):
    """
    Test forward pass
    with a single image
    """
    img_batch, label_batch = next(iter(train_dataloader))
    img_single, label_single = img_batch[0].unsqueeze(dim=0), label_batch[0]
    model.eval()
    with torch.inference_mode():
        pred = model(img_single.to(device))

    print(f"Output logits:\n{pred}\n")
    print(f"Output prediction probabilities:\n{torch.softmax(pred, dim=1)}\n")
    print(f"Output prediction label:\n{torch.argmax(torch.softmax(pred, dim=1), dim=1)}\n")
    print(f"Actual label:\n{label_single}")

In [6]:
class TinyVGG(nn.Module):
    """
    Model architecture copying TinyVGG from: 
    https://poloclub.github.io/cnn-explainer/
    """
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, 
                      out_channels=hidden_units, 
                      kernel_size=3, # how big is the square that's going over the image?
                      stride=1, # default
                      padding=1), # options = "valid" (no padding) or "same" (output has same shape as input) or int for specific number 
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, 
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride=2) # default stride value is same as kernel_size
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            # Where did this in_features shape come from? 
            # It's because each layer of our network compresses and changes the shape of our inputs data.
            nn.Linear(in_features=hidden_units*16*16,
                      out_features=output_shape)
        )
    
    def forward(self, x: torch.Tensor):
        # leverage the benefits of operator fusion
        return self.classifier(self.conv_block_2(self.conv_block_1(x)))

In [None]:
def train_step(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader, 
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()
    
    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0
    
    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item() 

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc


def test_step(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval() 
    
    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0
    
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)
    
            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            
            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            
    # Adjust metrics to get average loss and accuracy per batch 
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc


# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          learning_rate: np.float64,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):
    """
    Main training loop
    """
    optimizer = torch.optim.Adam(params=model_0.parameters(), 
                                 lr=learning_rate)
    
    config = {
        "learning_rate": learning_rate,
        "architecture": "TinyVGG",
        "dataset": "CIFAR-100",
        "epochs": epochs,
    }
    
    wandb.init(project="food-vision", config=config)
    
    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
                                        dataloader=test_dataloader,
                                        loss_fn=loss_fn)
        
        # 4. Print out what's happening
        wandb.log({"train_acc": train_acc, 
                   "train_loss": train_loss, 
                   "test_loss": test_loss, 
                   "test_acc": test_acc})

    wandb.finish()

In [13]:
if __name__ == '__main__':


      # Set up device agnostic code
      device = "cuda" if torch.cuda.is_available() else "cpu"
      print(device)

      # Check version
      print(torch.__version__)
      
      train_data_path = Path('data') / 'train'

      test_data_path = Path('data') / 'test'

      class_name = 'pizza'

      # Set random seeds
      torch.manual_seed(42) 

      # Set random GPU seed
      torch.cuda.manual_seed(42)

      # Set number of epochs
      NUM_EPOCHS = 5

      learning_rate = 0.001


      data_transform = transforms.Compose([transforms.Resize(size=(64, 64)),
                                          transforms.RandomHorizontalFlip(p=0.5),
                                          transforms.ToTensor()])

      show_classes(train_data_path)

      display_random_image(train_data_path, class_name)


      train_data = datasets.ImageFolder(root=train_data_path, 
                                    transform=data_transform, 
                                    target_transform=None)


      test_data = datasets.ImageFolder(root=test_data_path, 
                                    transform=data_transform, 
                                    target_transform=None)


      train_dataloader = DataLoader(dataset=train_data,
                                    batch_size=1,
                                    num_workers=os.cpu_count(),
                                    shuffle=True)


      test_dataloader = DataLoader(dataset=test_data,
                              batch_size=1,
                              num_workers=os.cpu_count(),
                              shuffle=True)


      # Recreate an instance of TinyVGG
      model_0 = TinyVGG(input_shape=3, # number of color channels (3 for RGB) 
                        hidden_units=10, 
                        output_shape=len(train_data.classes)).to(device)


      # Setup loss function and optimizer
      loss_fn = nn.CrossEntropyLoss()

      start_time = timer()

      # Train model_0 
      train(model=model_0, 
            train_dataloader=train_dataloader,
            test_dataloader=test_dataloader,
            learning_rate=learning_rate,
            loss_fn=loss_fn, 
            epochs=NUM_EPOCHS)

      # End the timer and print out how long it took
      end_time = timer()

      print(f"Total training time: {end_time-start_time:.3f} seconds")

[34m[1mwandb[0m: Currently logged in as: [33makshatgoel92[0m. Use [1m`wandb login --relogin`[0m to force relogin


100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [07:08<00:00, 85.65s/it]


0,1
test_acc,▅▁▃▆█
test_loss,███▇▁
train_acc,▂▁▂▄█
train_loss,█▆▆▆▁

0,1
test_acc,0.50667
test_loss,1.00462
train_acc,0.50222
train_loss,1.04941


Total training time: 441.612 seconds
