# Run through of model in cell mode


## Setup


### Importing PyTorch and setting up device-agnostic code


In [1]:
# Importing packages

# Type hinting
from typing import List, Tuple, Dict, Any

# Other
import random
import numpy as np

# Utils
from tqdm.notebook import tqdm
from torchinfo import summary
from timeit import default_timer as timer

# File handling
import os
import zipfile
import requests
from pathlib import Path

# Image visualization
from PIL import Image
import matplotlib.pyplot as plt

# Handling data
import pandas as pd
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

# Modelling
import torch
from torch import nn

In [2]:
# Setting up device-agnostic code
device = "mps" if torch.mps.is_available() else "cpu"

### Functions


In [3]:
def walk_through_dir(dir_path: Path) -> None:
    """Walks through dir_path returning its contents"""
    for dirpath, dirnames, filenames in os.walk(dir_path):
        print(
            f"There are {len(dirnames)} directories and {len(filenames)} images in {dirpath}."
        )

In [4]:
def plot_transformed_images(
    image_paths: List[Path],
    transform: transforms.Compose,
    n_images: int = 3,
    seed: int | None = None,
) -> None:
    """
    Selects random images from a path of images and loads/transforms them.
    Then plots the original vs the transformed version
    """

    if seed:
        random.seed(seed)

    random_image_paths = random.sample(image_paths, k=n_images)

    for image_path in random_image_paths:
        with Image.open(image_path) as f:
            fig, ax = plt.subplots(nrows=1, ncols=2)
            ax[0].imshow(f)
            ax[0].set_title(f"Original\nSize: {f.size}")
            ax[0].axis(False)

            # Transform and plot target image
            transformed_image = transform(f).permute(1, 2, 0)

            ax[1].imshow(transformed_image)
            ax[1].set_title(f"Transformed\nSize: {transformed_image.shape}")
            ax[1].axis(False)

            fig.suptitle(f"Class {image_path.parent.stem}", fontsize=16)

In [5]:
def find_classes(directory: Path | str) -> Tuple[List[str], Dict[str, int]]:
    """
    Finds the class folder names in a target directory.
    """

    classes = sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())

    if not classes:
        raise FileNotFoundError(
            f"Couldn\"t find any classes in {directory}... Please check the file structure"
        )

    class_to_idx = {class_name: i for i, class_name in enumerate(classes)}
    return classes, class_to_idx

In [6]:
def display_random_images(
    dataset: datasets.ImageFolder,
    classes: List[str],
    n: int = 10,
    display_shape: bool = True,
    seed: int | None = None,
) -> None:
    """Displays a random image from the dataset."""
    if n > 10:
        n = 10
        display_shape = False
        print(
            f"For display purposes, n shouldn\"t be larger than 10, setting it to t10 and removing shape display"
        )

    if seed:
        random.seed(seed)

    random_sample_idx = random.sample(range(len(dataset)), k=n)

    plt.figure(figsize=(16, 8))

    for i, target_sample in enumerate(random_sample_idx):
        target_image, target_label = (
            dataset[target_sample][0],
            dataset[target_sample][1],
        )

        target_image_adjust = target_image.permute(1, 2, 0)

        plt.subplot(1, n, i + 1)
        plt.imshow(target_image_adjust)
        plt.axis("off")
        if classes:
            title = f"Class: {classes[target_label]}"

            if display_shape:
                title = title + f"\nshape: {target_image_adjust.shape}"

            plt.title(title)

In [7]:
def train_step(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    loss_fn: torch.nn.Module,
    optimizer: torch.optim.Optimizer,
    device: str,
) -> Tuple[float, float]:
    """
    Trains a PyTorch model for a single epoch.

    Turns a target PyTorch model to training mode then runs through all of the required training steps
    (forward pass, loss calculation, optimizer step).

    Args:
        model (torch.nn.Module): A PyTorch model to be trained.
        dataloader (torch.utils.data.DataLoader): A DataLoader instance for the model to be trained on.
        loss_fn (torch.nn.Module): A PyTorch loss function to minimize.
        optimizer (torch.optim.Optimizer): A PyTorch optimizer to help minimize the loss function.
        device (str): A target device's name to compute on.

    Returns:
        `Tuple[float,float]`: Tuple of training loss and training accuracy metrics.
    """

    model.train()

    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        y_pred = model(X)

        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item() / len(y_pred)

    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)

    return train_loss, train_acc

In [8]:
def test_step(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    loss_fn: torch.nn.Module,
    device: str,
) -> Tuple[float, float]:
    """
    Tests a PyTorch model for a single epoch.

    Turns a target PyTorch model to eval mode then runs through forward pass on test dataset.

    Args:
        model (torch.nn.Module): A PyTorch model to be trained.
        dataloader (torch.utils.data.DataLoader): A DataLoader instance for the model to be tested on.
        loss_fn (torch.nn.Module): A PyTorch loss function to calculate loss on test data.
        device (str): A target device's name to compute on.

    Returns:
        `Tuple[float,float]`: Tuple of testing loss and testing accuracy metrics.
    """

    model.eval()

    test_loss, test_acc = 0, 0

    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            test_pred_logits = model(X)

            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += (test_pred_labels == y).sum().item() / len(test_pred_labels)

    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [9]:
def train(
    model: nn.Module,
    train_dataloader: DataLoader,
    test_dataloader: DataLoader,
    optimizer: torch.optim.Optimizer,
    loss_fn: nn.Module = nn.CrossEntropyLoss(),
    epochs: int = 5,
    device: str = "cpu",
) -> Dict[str, List[float]]:
    """
    Trains a PyTorch model for a single epoch.

    Turns a target PyTorch model to training mode then runs through all of the required training steps
    (forward pass, loss calculation, optimizer step).

    Args:
        model (torch.nn.Module): A PyTorch model to be trained.
        train_dataloader (torch.utils.data.DataLoader): A DataLoader instance for the model to be trained on.
        test_dataloader (torch.utils.data.DataLoader): A DataLoader instance for the model to be tested on.
        optimizer (torch.optim.Optimizer): A PyTorch optimizer to help minimize the loss function.
        loss_fn (torch.nn.Module): A PyTorch loss function to minimize.
        device (str): A target device's name to compute on.

    Returns:
        `Dict[str,List[float]]`: Dictionary of training and testing loss, as well as training and testing
        accuracy metrics. Each metric has a value in a list for each epoch.

    """

    results = {
        "train_loss": [],
        "test_loss": [],
        "train_acc": [],
        "test_acc": [],
    }

    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(
            model=model,
            dataloader=train_dataloader,
            loss_fn=loss_fn,
            optimizer=optimizer,
            device=device,
        )
        test_loss, test_acc = test_step(
            model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn,
            device=device,
        )

        print(
            f"""
======================================      
Epoch:                       |   {epoch + 1}   |  
======================================      
Train Loss:                  | {train_loss:.3f} |

Train Accuracy:              | {train_acc:.3f} |            
--------------------------------------
Test Loss:                   | {test_loss:.3f} |

Test Accuracy:               | {test_acc:.3f} |
              """
        )

        results["train_loss"].append(
            train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss
        )
        results["train_acc"].append(
            train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc
        )
        results["test_loss"].append(
            test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss
        )
        results["test_acc"].append(
            test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc
        )

    return results

In [10]:
def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values,
    """

    loss = results["train_loss"]
    test_loss = results["test_loss"]

    accuracy = results["train_acc"]
    test_accuracy = results["test_acc"]

    epochs = range(len(results["train_loss"]))

    plt.figure(figsize=(15, 7))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label="train_loss")
    plt.plot(epochs, test_loss, label="test_loss")
    plt.title("Loss")
    plt.xlabel("Epochs")
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label="train_accuracy")
    plt.plot(epochs, test_accuracy, label="test_accuracy")
    plt.title("Accuracy")
    plt.xlabel("Epochs")
    plt.legend()

In [11]:
def save_model(model: torch.nn.Module, target_dir: str | Path, model_name: str) -> None:
    target_dir_path = target_dir
    if not isinstance(target_dir_path, Path):
        target_dir_path = Path(target_dir)
    target_dir_path.mkdir(parents=True, exist_ok=True)

    assert model_name.endswith(".pth") or model_name.endswith(
        ".pt"
    ), "model_name should end with '.pt' or '.pth'."
    model_save_path = target_dir_path / model_name

    print(f"[INFO] Saving model to: {model_save_path}")
    torch.save(obj=model.state_dict(), f=model_save_path)

## 1. Downloading data


In [12]:
# Setting up path to data folder
data_path = Path("../data/")
image_path = data_path / "pizza_steak_sushi"

# Downloading data if it doesn't exist
if image_path.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {image_path} directory, creating one...")
    image_path.mkdir(parents=True, exist_ok=True)

with open(data_path / "pizza_steak_sushi.zip", "wb") as f:
    request = requests.get(
        "https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip"
    )
    print("Downloading pizza, steak, sushi data...")
    f.write(request.content)

# Unzipping downloaded file
with zipfile.ZipFile(data_path / "pizza_steak_sushi.zip", "r") as zip_ref:
    print("Unzipping pizza, steak, sushi data...")
    zip_ref.extractall(image_path)

# Removing zip file
os.remove(data_path / "pizza_steak_sushi.zip")

Did not find ../data/pizza_steak_sushi directory, creating one...
Downloading pizza, steak, sushi data...
Unzipping pizza, steak, sushi data...


In [13]:
# Setting up paths
base_dir = Path("../data/pizza_steak_sushi")

train_dir = base_dir / "train"
test_dir = base_dir / "test"

## 2. Create Datasets and DataLoaders


In [None]:
# Creating transforms
simple_transform = transforms.Compose(
    [transforms.Resize(size=(64, 64)), transforms.ToTensor()]
)

train_transform = transforms.Compose(
    [
        transforms.Resize((64, 64)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]
)

In [15]:
# Loading data
train_data = datasets.ImageFolder(train_dir, transform=simple_transform)
test_data = datasets.ImageFolder(test_dir, transform=simple_transform)

In [16]:
# Setting batch size
BATCH_SIZE = 32

In [17]:
# Setting up Dataloaders
train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)

test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE)

In [18]:
# Getting sample datapoint
sample_data = next(iter(test_dataloader))

In [19]:
# Getting classes
class_to_idx = train_data.class_to_idx
class_to_idx

{'pizza': 0, 'steak': 1, 'sushi': 2}

## 2. Creating model


In [20]:
# Creating model class
class TinyVGG(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()

        # Convolutional blocks
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(
                in_channels=input_shape,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0,
            ),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0,
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0,
            ),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=hidden_units,
                out_channels=hidden_units,
                kernel_size=3,
                stride=1,
                padding=0,
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units * 13 * 13, out_features=output_shape),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.classifier(
            self.conv_block_2(self.conv_block_1(x))
        )  # operator fusion

## 3. Train and evaluate model


In [21]:
# Setting random seeds
torch.manual_seed(42)
torch.mps.manual_seed(42)

# Setting number of epochs
NUM_EPOCHS = 50

# Instantiating model
model = TinyVGG(
    input_shape=3,
    hidden_units=10,
    output_shape=len(class_to_idx),
).to(device)

# Setting up loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model.parameters(), lr=0.001)

# Setting timer
start_time = timer()

# Training base model
results = train(
    model=model,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    optimizer=optimizer,
    loss_fn=loss_fn,
    epochs=NUM_EPOCHS,
    device=device,
)

# Ending timer
end_time = timer()
print(
    f"""
======================================
Total training time: {end_time - start_time:.3f} seconds |
======================================
    """
)

# Saving the model
save_model(model=model, target_dir="../models", model_name="cell_mode_model.pth")

  0%|          | 0/50 [00:00<?, ?it/s]


Epoch:                       |   1   |  
Train Loss:                  | 1.106 |

Train Accuracy:              | 0.305 |            
--------------------------------------
Test Loss:                   | 1.098 |

Test Accuracy:               | 0.301 |
              

Epoch:                       |   2   |  
Train Loss:                  | 1.100 |

Train Accuracy:              | 0.328 |            
--------------------------------------
Test Loss:                   | 1.070 |

Test Accuracy:               | 0.542 |
              

Epoch:                       |   3   |  
Train Loss:                  | 1.087 |

Train Accuracy:              | 0.488 |            
--------------------------------------
Test Loss:                   | 1.081 |

Test Accuracy:               | 0.492 |
              

Epoch:                       |   4   |  
Train Loss:                  | 1.084 |

Train Accuracy:              | 0.402 |            
--------------------------------------
Test Loss:                   |