# Custom Datasets

Format of the input: **[batch_size, no_of_channels, height, width]**

---

In [None]:
# Check GPU information
!nvidia-smi

In [None]:
import torch
from torch import nn
from scripts.utils import plot_predictions, plot_train_test_loss, print_train_time, eval_model_classification
from pathlib import Path
import torchvision
from torchvision import datasets, transforms
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
RANDOM_SEED = 42

In [None]:
print(torch.__version__)
print(torchvision.__version__)

## 1. Get data ready (turn into tensor)
Our dataset is a subset of the Food101 dataset. Food101 starts 101 different classes of food and 1000 images per class (750 training, 250 testing). Our dataset starts with 3 classes of food and only 10% of the images (~75 training, 25 testing).

Why do this?
- When starting out ML projects, it's important to try things on a small scale and then increase the scale when necessary.
- The whole point is to speed up how fast you can experiment.

### 1.1 Get the data

In [None]:
import requests
import zipfile
from pathlib import Path

# Setup path to a data folder
data_path = Path("data/")
image_path = data_path / "pizza_steak_sushi"

# If the image folder doesn't exist, download it and prepare it...
if image_path.is_dir():
  print(f"{image_path} directory already exists... skipping download")
else:
  print(f"{image_path} does not exist, creating one...")
  image_path.mkdir(parents=True, exist_ok=True)

  # Download pizza, steak and suhsi data
  with open(data_path / "pizza_steak_sushi.zip", "wb") as f:
    request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
    print("Downloading pizza, steak, suhsi data...")
    f.write(request.content)

  # Unzip pizza, steak, sushi data
  with zipfile.ZipFile(data_path / "pizza_steak_sushi.zip", "r") as zip_ref:
    print("Unzipping pizza, steak and sushi data...")
    zip_ref.extractall(image_path)

In [None]:
# Check details about data

import os
def walk_through_dir(dir_path):
    """Walks through dir_path returning its contents."""
    for dirpath, dirnames, filenames in os.walk(dir_path):
        print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

walk_through_dir(image_path)

In [None]:
# Setup train and testing paths
train_dir = image_path / "train"
test_dir = image_path / "test"

In [None]:
# Visualize the data (randomly take some images)

import random 
from PIL import Image

# Set seed
# random.seed(RANDOM_SEED)

# 1. Get all image paths 
image_path_list = list(image_path.glob("*/*/*.jpg"))

# 2. Pick a random image path
random_image_path = random.choice(image_path_list)

# 3. Get image class from path name (the image class is the name of the directory where the image is stored)
image_class = random_image_path.parent.stem

# 4. Open image
img = Image.open(random_image_path)

# 5. Print metadata 
print(f"Random image path: {random_image_path}")
print(f"Image class: {image_class}")
print(f"Image height: {img.height}")
print(f"Image width: {img.width}")
img

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Turn the image into an array
img_as_array = np.asarray(img)

# Plot the image with matplotlib
plt.figure(figsize=(10, 7))
plt.imshow(img_as_array)
plt.title(f"Image class: {image_class} | Image shape: {img_as_array.shape} -> [height, width, color_channels] (HWC)")
plt.axis(False)

### 1.2 Transform the data

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms


# Write a transform for image
data_transform = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor() 
])

print(f"Shape of image before transform: {img_as_array.shape}")
print(f"Shape of image after transform: {data_transform(img).shape}")

In [None]:
def plot_transformed_images(image_paths: list, transform, n=3, seed=None):
  """
  Selects random images from a path of images and loads/transforms 
  them then plots the original vs the transformed version.
  """
  if seed:
    random.seed(seed)
  random_image_paths = random.sample(image_paths, k=n)
  for image_path in random_image_paths:
    with Image.open(image_path) as f:
      fig, ax = plt.subplots(nrows=1, ncols=2)
      ax[0].imshow(f)
      ax[0].set_title(f"Original\nSize: {f.size}")
      ax[0].axis(False)

      # Transform and plot target image
      transformed_image = transform(f).permute(1, 2, 0) # note we will need to change shape for matplotlib (C, H, W) -> (H, W, C)
      ax[1].imshow(transformed_image)
      ax[1].set_title(f"Transformed\nShape: {transformed_image.shape}")
      ax[1].axis("off")

      fig.suptitle(f"Class: {image_path.parent.stem}", fontsize=16)

plot_transformed_images(image_paths=image_path_list,
                        transform=data_transform,
                        n=3,
                        seed=None)

### 1.3 Convert data to pytorch dataset

#### 1.3.1 Convert to pytorch dataset using inbuilt `ImageFolder` within datasets

In [None]:
# Use ImageFolder to create dataset(s)
from torchvision import datasets

train_data = datasets.ImageFolder(
    root=train_dir,
    transform=data_transform, # a transform for the data
    target_transform=None # a transform for the label/target 
)

test_data = datasets.ImageFolder(
    root=test_dir,
    transform=data_transform)

train_data, test_data

In [None]:
# Get class names as list
class_names = train_data.classes
class_names

# Get class names as dict
class_dict = train_data.class_to_idx
class_dict

In [None]:
# Index on the train_data Dataset to get a single image and label
img, label = train_data[0][0], train_data[0][1]
print(f"Image tensor:\n {img}")
print(f"Image shape: {img.shape}")
print(f"Image datatype: {img.dtype}")
print(f"Image label: {label}")
print(f"Label datatype: {type(label)}")
print(f"Train data length: {len(train_data)}")
print(f"Test data length: {len(test_data)}")

#### 1.3.2 Convert to pytorch dataset with Custom `Dataset`

1. Want to be able to load images from file
2. Want to be able to get class names from the Dataset
3. Want to be able to get classes as dictionary from the Dataset

Pros:
- Can create a `Dataset` out of almost anything
- Not limited to PyTorch pre-built `Dataset` functions

Cons:
- Even though you could create `Dataset` out of almost anything, it doesn't mean it will work.
- Using a custom `Dataset` often results in us writing more code, which could be prone to errors or performance issues

In [None]:
import os
import pathlib
import torch

from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
from typing import Tuple, Dict, List


def find_classes(directory: str) -> Tuple[List[str], Dict[str, int]]:
    """Finds the class folder names in a target directory."""
    # 1. Get the class names by scanning the target directory
    classes = sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())

    # 2. Raise an error if class names could not be found
    if not classes:
        raise FileNotFoundError(f"Couldn't find any classes in {directory}... please check file structure.")

    # 3. Create a dictionary of index labels (computers prefer numbers rather than strings as labels)
    class_to_idx = {class_name: i for i, class_name in enumerate(classes)}
    return classes, class_to_idx

find_classes(train_dir)

In [None]:
# 0. Write a custom dataset class
from torch.utils.data import Dataset

# 1. Subclass torch.utils.data.Dataset
class ImageFolderCustom(Dataset):
    # 2. Initialize our custom dataset
    def __init__(self, targ_dir: str, transform=None):
        # 3. Create class attributes
        # Get all of the image paths
        self.paths = list(pathlib.Path(targ_dir).glob("*/*.jpg"))
        # Setup transform
        self.transform = transform
        # Create classes and class_to_idx attributes
        self.classes, self.class_to_idx = find_classes(targ_dir)

    # 4. Create a function to load images
    def load_image(self, index: int) -> Image.Image:
        "Opens an image via a path and returns it."
        image_path = self.paths[index]
        return Image.open(image_path)

    # 5. Overwrite __len__()
    def __len__(self) -> int:
        "Returns the total number of samples."
        return len(self.paths)

    # 6. Overwrite __getitem__() method to return a particular sample
    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        img = self.load_image(index)
        class_name = self.paths[index].parent.name # expects path in format: data_folder/class_name/image.jpg
        class_idx = self.class_to_idx[class_name]

        # Transform if necessary
        if self.transform:
            return self.transform(img), class_idx # return data, label (X, y)
        else:
            return img, class_idx # return untransformed image and label

In [None]:
# Test out ImageFolderCustom
import pandas as pd

train_data_custom = ImageFolderCustom(targ_dir=train_dir, transform=transforms)
test_data_custom = ImageFolderCustom(targ_dir=test_dir, transform=transforms)

compare = pd.DataFrame(
    {
        "Train Data": [len(train_data), train_data.classes, train_data.class_to_idx],
        "Train Data Custom": [len(train_data_custom), train_data_custom.classes, train_data_custom.class_to_idx],
        "Test Data": [len(test_data), test_data.classes, test_data.class_to_idx],
        "Test Data Custom": [len(test_data_custom), test_data_custom.classes, test_data_custom.class_to_idx]
    },
    index=["Length", "Classes", "Class to Index Dict"]
)
compare

### 1.4 Prepare DataLoader

In [None]:
from torch.utils.data import DataLoader
import os


BATCH_SIZE = 10

train_dataloader = DataLoader(
    dataset=train_data,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=os.cpu_count()
)
test_dataloader = DataLoader(
    dataset=test_data,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=os.cpu_count()
)

len(train_dataloader), len(test_dataloader)

In [None]:
# Visualize one sample from train_dataloader
import numpy as np

train_features_batch, train_labels_batch = next(iter(train_dataloader))
print(f"train_features_batch: {train_features_batch.shape}")
print(f"train_labels_batch: {train_labels_batch.shape}")

torch.manual_seed(RANDOM_SEED)
rand_idx = torch.randint(0, len(train_features_batch), size=[1]).item()
img, label = train_features_batch[rand_idx], train_labels_batch[rand_idx]

img_as_array = np.asarray(img.permute(1, 2, 0))
plt.imshow(img_as_array)
plt.title(f"{label}: {class_names[label]}")
plt.axis(False)
print(f"Image size: {img.shape}")
print(f"Label size: {label.shape}")

### 1.5 Data Augmentation
- Data augmentation is the process of artificially adding diversity to your training data.
- In the case of image data, this may mean applying various image transformations to the training images.
- This practice hopefully results in a model that's more generalizable to unseen data.

In [None]:

# Let's look at trivailaugment - https://pytorch.org/vision/stable/auto_examples/plot_transforms.html#trivialaugmentwide 
from torchvision import transforms

train_transform = transforms.Compose(
    [
        transforms.Resize(size=(224, 224)),
        transforms.TrivialAugmentWide(num_magnitude_bins=31),
        transforms.ToTensor()
    ]
)

test_transform = transforms.Compose(
    [
        transforms.Resize(size=(224, 224)),
        transforms.ToTensor()
    ]
)

# Plot random transformed images
plot_transformed_images(
    image_paths=image_path_list,
    transform=train_transform,
    n=3,
    seed=None
)

## 2. Build or pick a pretrained model for training

When starting to build a series of machine learning modelling experiments, it's best practice to start with a baseline model. A baseline model is a simple model you will try and improve upon with subsequent models/experiments. 

### 2.0 Build a model

#### 2.0.1 Creating transforms and loading data for Model 0

In [None]:
# Create simple transform
simple_transform = transforms.Compose(
    [
        transforms.Resize(size=(64, 64)),
        transforms.ToTensor()
    ]
)

# 1. Load and transform data
from torchvision import datasets

train_data_simple = datasets.ImageFolder(root=train_dir, transform=simple_transform)
test_data_simple = datasets.ImageFolder(root=test_dir, transform=simple_transform)

# 2. Turn the datasets into DataLoaders
import os
from torch.utils.data import DataLoader

# Setup batch size and number of works
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

# Create DataLoader's
train_dataloader_simple = DataLoader(
    dataset=train_data_simple,
    batch_size=BATCH_SIZE,
    shuffle=True, 
    num_workers=NUM_WORKERS
)
test_dataloader_simple = DataLoader(
    dataset=test_data_simple,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

#### 2.0.2 Create TinyVGG model class

In [None]:
# CNN Explained: https://poloclub.github.io/cnn-explainer/
class TinyVGG(nn.Module):
    """
    Model architecture copying TinyVGG from CNN Explainer: https://poloclub.github.io/cnn-explainer/
    """
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            # trick to calculate in_features - use print in forward function before using this layer
            # pass in one tensor to see the shape and then calculate
            nn.Linear(in_features=hidden_units * 13 * 13, out_features=output_shape)
        ).to(device=DEVICE)

    def forward(self, X):
        # X = self.conv_block_1(X)
        # X = self.conv_block_2(X)
        # # print(X.shape)          # this will help to find out `in_features` for the classifier layer
        # X = self.classifier(X)
        # return X
        return self.classifier(self.conv_block_2(self.conv_block_1(X)))

torch.manual_seed(RANDOM_SEED)
model_0 = TinyVGG(input_shape=3, hidden_units=10, output_shape=len(class_names))
model_0.to(DEVICE)

In [None]:
# Check if model is correctly built

# dummy_x = torch.rand([1, 3, 64, 64])
# model_0(dummy_x)

#### 2.0.3 Summary of model using torchinfo

In [None]:
from torchinfo import summary

summary(model_0, input_size=[1, 3, 64, 64])

### 2.1 Pick loss function and optimizer
We will do this in the next section.

### 2.2 Build a training loop to train the model

In [None]:
from timeit import default_timer as timer
from scripts.utils import train, plot_loss_curves


torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)

EPOCHS = 10

# Recreate an instance of TinyVGG
model_0 = TinyVGG(input_shape=3, hidden_units=30, output_shape=len(train_data.classes)).to(DEVICE)

# Setup loss function and optimizer 
loss_fn = nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.001)

start_timer = timer()
model_0_res = train(
    model=model_0,
    train_dataloader=train_dataloader_simple,
    test_dataloader=test_dataloader_simple,
    loss_fn=loss_fn,
    optimizer=optimizer,
    epochs=EPOCHS,
    device=DEVICE
)
end_timer = timer()
train_time_0 = print_train_time(start_timer, end_timer, DEVICE)

print(model_0_res)
plot_loss_curves(model_0_res)

## 3. Make prediction

## 4. Evaluate the model

In [None]:
from sklearn.metrics import accuracy_score

model_0_res = eval_model_classification(
    model=model_0,
    data_loader=test_dataloader_simple,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_score
)
print(model_0_res)

## 5. Improve through experimentation

### 5.1 Build new model

#### 5.1.1 Creating transforms and loading data for Model 0

In [None]:
# Create training transform with TriviailAugment
from torchvision import transforms

train_transform_trivial = transforms.Compose(
    [
        transforms.Resize(size=(64, 64)),
        transforms.TrivialAugmentWide(num_magnitude_bins=31),
        transforms.ToTensor()
    ]
)

test_transform_simple = transforms.Compose(
    [
        transforms.Resize(size=(64, 64)),
        transforms.ToTensor()
    ]
)

# Turn image folders into Datasets
from torchvision import datasets

train_data_augmented = datasets.ImageFolder(root=train_dir, transform=train_transform_trivial)
test_data_simple = datasets.ImageFolder(root=test_dir, transform=test_transform_simple)

# Turn our Datasets into DataLoaders
import os
from torch.utils.data import DataLoader

BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

torch.manual_seed(RANDOM_SEED)
train_dataloader_augmented = DataLoader(
    dataset=train_data_augmented,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS
)

test_dataloader_simple = DataLoader(
    dataset=test_data_simple,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

#### 5.1.2 Create new model
We will use the same TinyVGG model class.

### 5.2 Pick loss function and optimizer
We will do this in the next section.

### 5.3 Build training loop

In [None]:
from timeit import default_timer as timer
from scripts.utils import train, plot_loss_curves


torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed(RANDOM_SEED)

EPOCHS = 5

# Recreate an instance of TinyVGG
model_1 = TinyVGG(input_shape=3, hidden_units=10, output_shape=len(train_data_augmented.classes))
model_1 = model_1.to(DEVICE)

# Setup loss function and optimizer 
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_1.parameters(), lr=0.001)

start_timer = timer()
model_1_res = train(
    model=model_1,
    train_dataloader=train_dataloader_augmented,
    test_dataloader=test_dataloader_simple,
    loss_fn=loss_fn,
    optimizer=optimizer,
    epochs=EPOCHS,
    device=DEVICE
)
end_timer = timer()
train_time_1 = print_train_time(start_timer, end_timer, DEVICE)

print(model_1_res)
plot_loss_curves(model_1_res)

### 5.4 Evaluate the new model

In [None]:
# For model_1

model_1_res = eval_model_classification(
    model=model_1,
    data_loader=test_dataloader,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_score
)
model_1_res

In [None]:
# Evaluate all models
import pandas as pd


all_result = pd.DataFrame([
    model_0_res,
    model_1_res,
])
all_result["training_time"] = [train_time_0, train_time_1]
all_result

In [None]:
# Visualize the result
all_result.set_index("model_name")["model_accuracy"].plot(kind="barh")
plt.xlabel("Accuracy")
plt.ylabel("Models")

In [None]:
# Visualize random predictions
from scripts.utils import make_predictions

# random.seed(RANDOM_SEED)
y_preds = make_predictions(model_0, test_dataloader_simple)

rows, cols = 3, 3
fig = plt.figure(figsize=(9, 9))
for i in range(1, rows * cols + 1):
  rand_idx = torch.randint(0, len(test_data), size=[1]).item()
  X, y_truth = test_data_simple[rand_idx]
  y_truth = class_names[y_truth]
  y_pred = class_names[y_preds[rand_idx]]
  fig.add_subplot(rows, cols, i)
  img_as_array = np.asarray(X.permute(1, 2, 0))
  plt.imshow(img_as_array)
  if y_pred == y_truth:
    plt.title(f"Truth: {y_truth} | Pred: {y_pred}", c="g")
  else:
    plt.title(f"Truth: {y_truth} | Pred: {y_pred}", c="r")
  plt.axis(False)

In [None]:
from scripts.utils import make_predictions


# Make predictions with trained model
y_pred_tensor = make_predictions(model_0, test_dataloader_simple, DEVICE)
y_pred_tensor

In [None]:
from torchmetrics import ConfusionMatrix
from mlxtend.plotting import plot_confusion_matrix

# 2. Setup confusion instance and compare predictions to targets
confmat = ConfusionMatrix(task='multiclass', num_classes=len(class_names))
confmat_tensor = confmat(preds=y_pred_tensor,
                         target=torch.tensor(test_data_simple.targets))

# 3. Plot the confusion matrix
fig, ax = plot_confusion_matrix(
    conf_mat=confmat_tensor.numpy(), # matplotlib likes working with numpy
    class_names=class_names,
    figsize=(10, 7)
)

## 6. Save and reload trained model

In [None]:
# Save the model

model_folder = Path("models")
model_folder.mkdir(parents=True, exist_ok=True)
model_name = "FoodSmall101_VGG.pt"
model_path = model_folder / model_name

model_0.to(device=DEVICE)
torch.save(obj=model_0.state_dict(), f=model_path)

In [None]:
# Load the model

loaded_model = TinyVGG(input_shape=3, hidden_units=30, output_shape=len(class_names))
loaded_model.load_state_dict(torch.load(f=model_path))
loaded_model.to(device=DEVICE)
# loaded_model.state_dict()

In [None]:
# Evaluate loaded model
torch.manual_seed(RANDOM_SEED)

loaded_model_0_results = eval_model_classification(
    model=loaded_model,
    data_loader=test_dataloader,
    loss_fn=loss_fn,
    accuracy_fn=accuracy_score
)

loaded_model_0_results

In [None]:
# Check if model results are close to each other
torch.isclose(torch.tensor(model_0_res["model_loss"]),
              torch.tensor(loaded_model_0_results["model_loss"]),
              atol=1e-02)

## 7. Making prediciton on a custom image

In [None]:
# Download custom image
import requests

# Setup custom image path
custom_image_path = data_path / "04-pizza-dad.jpeg"

# Download the image if it doesn't already exist
if not custom_image_path.is_file():
  with open(custom_image_path, "wb") as f:
    # When downloading from GitHub, need to use the "raw" file link
    request = requests.get("https://raw.githubusercontent.com/mrdbourke/pytorch-deep-learning/main/images/04-pizza-dad.jpeg")
    print(f"Downloading {custom_image_path}...")
    f.write(request.content)
else:
  print(f"{custom_image_path} already exists, skipping download...")

In [None]:
# Create transform pipeline to resize image
from torchvision import transforms

custom_image_transform = transforms.Compose(
    [
        transforms.Resize(size=(64, 64))
    ]
)

In [None]:
from scripts.utils import pred_and_plot_single_image

# Pred on our custom image
pred_and_plot_single_image(
    model=model_0,
    image_path=custom_image_path,
    class_names=class_names,
    transform=custom_image_transform,
    device=DEVICE
)