<a href="https://colab.research.google.com/github/Sitta250/pytorch/blob/main/04_pytorch_custom_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torch import nn
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
device

In [None]:
# get data from Food101 dataset
# 1000img: 750 training and 250 testing
import requests
import zipfile
from pathlib import Path

# setup path to data folder
data_path = Path("data/")
image_path = data_path / "pizza_steack_sushi"

# if img folder doesn't exist, already existed
if image_path.is_dir():
  print(f"{image_path} dir already existed. skipping download")
else:
  print(f"{image_path} does not exist, creating one...")
  image_path.mkdir(parents=True, exist_ok=True)

# download
with open(data_path/ "pizza_steak_sushi.zip", "wb") as f:
  requests = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
  print("Downloading pizza, steak, sushi data...")
  f.write(requests.content)

# unzip data
with zipfile.ZipFile(data_path/ "pizza_steak_sushi.zip", "r") as zip_ref:
  print("unzipping pizza, steak, sushi data")
  zip_ref.extractall(image_path)

In [None]:
# data prepping
import os
def walk_through_dir(dir_path):
  for dirpath, dirnames, filename in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filename)} images in '{dir_path}")


In [None]:
walk_through_dir(image_path)

In [None]:
# setup train and testing path
train_dir = image_path/ "train"
test_dir = image_path/ "test"

train_dir, test_dir

In [None]:
# visualizing
import random
from PIL import Image


# get img path path
image_path_list = list(image_path.glob("*/*/*.jpg"))

# pick a random path
random_image_path = random.choice(image_path_list)

# extract img class = name of directory
image_class = random_image_path.parent.stem

# open img
img = Image.open(random_image_path)

# print metadata
print(f"random img path: {random_image_path}")
print(f"image class: {image_class}")
print(f"image height: {img.height}")
print(f"image width: {img.width}")
img

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# turn img to array
img_as_array = np.asarray(img)

# plot img with plt
plt.figure(figsize=(5, 3))
plt.imshow(img_as_array)
plt.title(f"image class: {image_class} | image shape: {img_as_array.shape} -> [height, width, color channel]")
plt.axis(False)

In [None]:
img_as_array

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [None]:
# write a transform for img
data_transform = transforms.Compose([
    # resize img to 64x64
    transforms.Resize(size=(64, 64)),
    # flip img randomly on horizontal
    transforms.RandomHorizontalFlip(p=0.5),
    # turn img to tensor
    transforms.ToTensor(),
])

In [None]:
data_transform(img).dtype

In [None]:
data_transform(img).shape

In [None]:
def plot_transformed_img(image_paths, transform, n=3, seed=None):
  """
  select random img from path of img and load/transform then plot original vs transformed
  """
  if seed:
    random.seed(seed)
  random_image_paths = random.sample(image_paths, k=n)
  for image_path in random_image_paths:
    with Image.open(image_path) as f:
      fig, ax = plt.subplots(nrows = 1, ncols= 2)
      ax[0].imshow(f)
      ax[0].set_title(f"original\nsize:{f.size}")
      ax[0].axis(False)

      # transform and plot target image
      transformed_image = transform(f).permute(1, 2, 0) # keep in mind that this will return color first but plt wants color at last position so we need to use .permute()
      ax[1].imshow(transformed_image)
      ax[1].set_title(f"Transfomred\nsize:{transformed_image.shape}")
      ax[1].axis("off")

      fig.suptitle(f"class: {image_path.parent.stem}", fontsize=16)

plot_transformed_img(image_paths=image_path_list,
                     transform=data_transform,
                     n=3,
                     seed=42)

In [None]:
# option1: loading using ImageFolder
from torchvision import datasets
train_data = datasets.ImageFolder(root=train_dir,
                                  transform=data_transform,
                                  target_transform=None)
test_data = datasets.ImageFolder(root=test_dir,
                                 transform=data_transform)
train_data, test_data

In [None]:
# get class name as list
class_names = train_data.classes
class_names

In [None]:
# get class names as dict
class_dict = train_data.class_to_idx
class_dict

In [None]:
len(train_data), len(test_data)

In [None]:
train_data.samples[0]

In [None]:
# index on train_data dataset to get single img and label
img, label = train_data[0][0], train_data[0][1]
print(f"image tensor:\n {img}")
print(f"image shape: {img.shape}")
print(f"image datatype: {img.dtype}")
print(f"image label: {label}")
print(f"data datatype: {type(label)}")

In [None]:
img_permute = img.permute(1,2,0)

# print out shape
print(f"original shape: {img.shape} -> [color_channels, height, width]")
print(f"img permute: {img_permute.shape} -> [height, width, color]")

# plotting
plt.figure(figsize=(10, 7))
plt.imshow(img_permute)
plt.axis("off")
plt.title(class_names[label], fontsize=14)

In [None]:
class_names[label]

In [None]:
import os
os.cpu_count()

In [None]:
# turn train and test dataset ino DataLoader
from torch.utils.data import DataLoader

BATCH_SIZE = 32
train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              num_workers=1,
                              shuffle=True)
test_dataloader = DataLoader(dataset=test_data,
                          batch_size=BATCH_SIZE,
                          num_workers=1,
                          shuffle=False)
train_dataloader, test_dataloader

In [None]:
len(train_dataloader), len(test_dataloader)

In [None]:
len(train_data), len(test_data)

In [None]:
img, label = next(iter(train_dataloader))

print(f"img shape: {img.shape}-> [batch_size, color, height, width]")
print(f"label shape: {label.shape}")

In [None]:
# option2: loading data with Custom Dataset

# pro: can create Dataset out of anything, not limited to prebuilt 'Dataset' function

# cons: doesn't guarantee that Dataset will always work, prone to errors or performance issues


import os
import pathlib
import torch

from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms
from typing import Tuple, Dict, List

In [None]:
# instance of torchvision.datasets.ImageFolder()
train_data.classes, train_data.class_to_idx

In [None]:
# create helper func to get class names
# get class name using os.scandir() and raise error if class name aren't found


# setup path for target directory
target_directory = train_dir

#get class name
class_names_found = sorted([entry.name for entry in list(os.scandir(target_directory))])
class_names_found

In [None]:
list(os.scandir(target_directory))

In [None]:
def find_classes(directory: str)->Tuple[List[str], Dict[str,int]]:
  # get class name
  classes = sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())
  if not classes:
    raise FileNotFoundError(f"couldn't find any classes in {directory}... please check file structure")

  class_to_idx = {class_name: i for i, class_name in enumerate(classes)}
  return classes, class_to_idx

In [None]:
find_classes(target_directory)

In [None]:
# create custom Dataset
# subclass torch.utils.data.Dataset
'''
create several attributes:
- paths - paths for img
- transform
- classes - list of target class
- class_to_idx
'''

class ImageFolderCustom(Dataset):
  def __init__(self,
               targ_dir:str,
               transform=None):
    self.paths=list(pathlib.Path(targ_dir).glob("*/*.jpg"))
    # setup transform
    self.transform = transform
    # create classes and class_to_idx attributes
    self.classes, self.class_to_idx=find_classes(targ_dir)
    # load img
  def load_image(self, index: int) -> Image.Image:
    "Opens an image via a path and returns it"
    image_path = self.paths[index]
    return Image.open(image_path)

  # overwrite__len__()
  def __len__(self)->int:
    "return the total number of samples"
    return len(self.paths)

  # overwrite __getitem__() method to return a particular sample
  def __getitem__(self, index: int) -> Tuple[torch.Tensor,int]:
    "Return one sample of data, data and label (X, y)"
    img = self.load_image(index)
    class_name = self.paths[index].parent.name
    class_idx = self.class_to_idx[class_name]

    # transform when necessary
    if self.transform:
      return self.transform(img), class_idx
    else:
      return img, class_idx


In [None]:
# create transform
from torchvision import transforms
train_transforms = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor()
])

test_transforms = transforms.Compose([
    transforms.Resize(size=(64,64)),
    transforms.ToTensor()
])

In [None]:
train_data_custom = ImageFolderCustom(targ_dir=train_dir,
                                      transform = train_transforms)
test_data_custom = ImageFolderCustom(targ_dir=test_dir,
                                     transform=test_transforms)

In [None]:
len(train_data), len(train_data_custom)

In [None]:
len(test_data), len(test_data_custom)

In [None]:
train_data_custom.classes

In [None]:
train_data_custom.class_to_idx

In [None]:
print(train_data_custom.classes==train_data.classes)
print(test_data_custom.classes==test_data.classes)

In [None]:
import math

In [None]:
# display random image
def display_random_images(dataset: torch.utils.data.Dataset,
                          classes:List[str]=None,
                          n: int=10,
                          display_shape:bool=True,
                          seed: int = None):
  if n>10:
    n=10
    display_shape=False
    print(f"for display purposes, n shouldn't be larger than 10, setting to 10 and removing shape display")

  if seed:
    random.seed(seed)

  random_samples_idx = random.sample(range(len(dataset)), k=n)

  rows=2
  cols=math.ceil(n/rows)
  plt.figure(figsize=(cols*3, rows*3))

  for i, targ_sample in enumerate(random_samples_idx):
    targ_image, targ_label = dataset[targ_sample][0], dataset[targ_sample][1]
    targ_image_adjust = targ_image.permute(1,2,0)
    plt.subplot(rows, cols, i+1)
    plt.imshow(targ_image_adjust)
    plt.axis("off")
    if classes:
      title= f"classes:{classes[targ_label]}"
      if display_shape:
        title=title+ f"\nshape: {targ_image_adjust.shape}"
    plt.title(title)

In [None]:
# display from ImageFolder
display_random_images(train_data,
                      n=5,
                      classes=class_names,
                      seed=42)

In [None]:
# display from custom dataset
display_random_images(train_data_custom,
                      n=20,
                      classes=class_names,
                      seed=42)

In [None]:
# turn custom loaded images into DataLoader
BATCH_SIZE=32
train_dataloader_custom = DataLoader(dataset = train_data_custom,
                                     batch_size=BATCH_SIZE,
                                     num_workers=0,
                                     shuffle=True)
test_dataloader_custom = DataLoader(dataset=test_data_custom,
                                    batch_size=BATCH_SIZE,
                                    num_workers=0,
                                    shuffle=False)

train_dataloader_custom, test_dataloader_custom

In [None]:
img_custom, label_custom = next(iter(train_dataloader_custom))
img_custom.shape, label_custom.shape

In [None]:
# data augmentation

# trivial augment
from torchvision import transforms

train_transform = transforms.Compose([
    transforms.Resize(size=(224,224)),
    transforms.TrivialAugmentWide(num_magnitude_bins=31),
    transforms.ToTensor()
])

test_transform = transforms.Compose([
    transforms.Resize(size=(224, 224)),
    transforms.ToTensor()
])

In [None]:
image_path_list = list(image_path.glob("*/*/*.jpg"))
image_path_list[:10]

In [None]:
plot_transformed_img(
    image_paths = image_path_list,
    transform=train_transform,
    n=3,
    seed=None
)

In [None]:
# model 0: TinyVGG without data augmentation
# conv -> relu -> conv -> relu -> maxpool

simple_transform = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.ToTensor()
])

train_data_simple = datasets.ImageFolder(root=train_dir,
                                  transform=simple_transform)
test_data_simple = datasets.ImageFolder(root=test_dir,
                                 transform=simple_transform)
print(f"train:{train_data} \n test:{test_data}")

In [None]:
# dataset to dataloader
BATCH_SIZE=32
NUM_WORKERS = os.cpu_count()
train_dataloader_simple = DataLoader(dataset=train_data_simple,
                                     batch_size=BATCH_SIZE,
                                     num_workers=NUM_WORKERS,
                                     shuffle=True)
test_dataloader_simple = DataLoader(dataset=test_data_simple,
                                    batch_size=BATCH_SIZE,
                                    num_workers=NUM_WORKERS,
                                    shuffle=False)
print(f"train: {train_dataloader_simple} \n test:{test_dataloader_simple}")

In [None]:
class TinyVGG(nn.Module):
    """
    Model architecture copying TinyVGG from:
    https://poloclub.github.io/cnn-explainer/
    """
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape,
                      out_channels=hidden_units,
                      kernel_size=3, # how big is the square that's going over the image?
                      stride=1, # default
                      padding=1), # options = "valid" (no padding) or "same" (output has same shape as input) or int for specific number
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,
                         stride=2) # default stride value is same as kernel_size
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            # Where did this in_features shape come from?
            # It's because each layer of our network compresses and changes the shape of our input data.
            nn.Linear(in_features=hidden_units*16*16,
                      out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        # print(x.shape)
        x = self.conv_block_2(x)
        # print(x.shape)
        x = self.classifier(x)
        # print(x.shape)
        return x

In [None]:
model_0 = TinyVGG(input_shape=3, # number of color channels (3 for RGB)
                  hidden_units=10,
                  output_shape=len(train_data.classes)).to(device)
model_0

In [None]:
# try forward pass on single image to see the shape
image_batch, label_batch = next(iter(train_dataloader_simple))
image_batch.shape, label_batch.shape

In [None]:
model_0(image_batch)

In [None]:
try:
  import torchinfo
except:
  !pip install torchinfo
  import torchinfo

from torchinfo import summary
summary(model_0, input_size=[1, 3, 64, 64])

In [None]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metrics across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
from tqdm.auto import tqdm

# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):

    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)

        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        # Ensure all data is moved to CPU and converted to float for storage
        results["train_loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
        results["train_acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
        results["test_loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
        results["test_acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

In [None]:
# Set random seeds
torch.manual_seed(42)
torch.cuda.manual_seed(42)

# Set number of epochs
NUM_EPOCHS = 5

# Recreate an instance of TinyVGG
model_0 = TinyVGG(input_shape=3, # number of color channels (3 for RGB)
                  hidden_units=10,
                  output_shape=len(train_data.classes)).to(device)

# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.001)

# Start the timer
from timeit import default_timer as timer
start_time = timer()

# Train model_0
model_0_results = train(model=model_0,
                        train_dataloader=train_dataloader_simple,
                        test_dataloader=test_dataloader_simple,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS)

# End the timer and print out how long it took
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

In [None]:
model_0_results

In [None]:
# loss curve
model_0_results.keys()

In [None]:
def plot_loss_curves(results: Dict[str, List[float]]):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values, e.g.
            {"train_loss": [...],
             "train_acc": [...],
             "test_loss": [...],
             "test_acc": [...]}
    """

    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot
    plt.figure(figsize=(15, 7))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend();

In [None]:
plot_loss_curves(model_0_results)

In [None]:
# tiny vgg with data augmentation

train_transform_trivial = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.TrivialAugmentWide(num_magnitude_bins=31),
    transforms.ToTensor()
])

test_transforms_ismple = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.ToTensor()
])

In [None]:
# create train and test dataset and dataloader with data augmentation
train_data_augmented = datasets.ImageFolder(root=train_dir,
                                            transform=train_transform_trivial,
                                            )
test_data_simple = datasets.ImageFolder(root=test_dir,
                                        transform=test_transforms_ismple)

In [None]:
# turn to dataloader
import os
BATCH_SIZE = 32

torch.manual_seed(42)
train_datalaoder_augmented = DataLoader(dataset = train_data_augmented,
                                        batch_size=BATCH_SIZE,
                                        shuffle=True,
                                        num_workers=NUM_WORKERS)
test_dataloader_simple = DataLoader(dataset = test_data_simple,
                                    batch_size=BATCH_SIZE,
                                    num_workers=NUM_WORKERS)

In [None]:
torch.manual_seed(42)
model_1=TinyVGG(
    input_shape=3,
    hidden_units=10,
    output_shape=len(train_data_augmented.classes)
).to(device)

model_1

In [None]:
torch.manual_seed(42)

NUM_EPOCHS=5

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_1.parameters(), lr=0.001)

from timeit import default_timer as timer
start_time = timer()

model_1_results = train(model=model_1,
                        train_dataloader = train_datalaoder_augmented,
                        test_dataloader=test_dataloader_simple,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS)

end_time = timer()
print(f"total training time for model 1: {end_time -start_time:.3f} seconds")

# 1:00:31:43