In [8]:
import torch
import torch.nn as nn
import requests
import zipfile
from pathlib import Path
import random
import os
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import time

# Setup the device to GPU if available, else CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Set a fixed random seed for reproducibility
random.seed(42)

if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Set path to data folder
data_path = Path("data/")
image_path = data_path / "images"

# If the image folder doesn't exist, download it and prepare it
if not image_path.is_dir():
    print(f"Directory doesn't exist, creating one...")
    image_path.mkdir(parents=True, exist_ok=True)

    # Download images
    with open(data_path / "images.zip", "wb") as f:
        request = requests.get("https://github.com/mrdbourke/pytorch-deep-learning/raw/main/data/pizza_steak_sushi.zip")
        print("Downloading images")
        f.write(request.content)

    # Unzip images
    with zipfile.ZipFile(data_path / "images.zip", "r") as zip_ref:
        print("Unzipping images...")
        zip_ref.extractall(image_path)

# Function to walk through directories and count files
def walk_through_dir(dir_path):
    num_dirs = sum([len(dirs) for _, dirs, _ in os.walk(dir_path)])
    num_files = sum([len(files) for _, _, files in os.walk(dir_path)])
    print(f"There are {num_dirs} directories and {num_files} images in '{dir_path}'.")

walk_through_dir(image_path)

# Setup training and testing paths
train_dir = image_path / "train"
test_dir = image_path / "test"

# Transform images to datasets
data_transform = transforms.Compose([
   transforms.Resize(size=(64, 64)),  # resize the image to 64x64
   transforms.RandomHorizontalFlip(p=0.5),  # flip the images randomly on the horizontal
   transforms.ToTensor()  # Turn the image to a tensor
])

train_data = datasets.ImageFolder(root=train_dir, transform=data_transform)
test_data = datasets.ImageFolder(root=test_dir, transform=data_transform)

# Turn train and test Datasets into DataLoaders
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

train_dataloader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
test_dataloader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

# Model definition
class TinyVGG(nn.Module):
    """
    Model architecture copying TinyVGG from:
    https://poloclub.github.io/cnn-explainer/
    """
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape, out_channels=hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=hidden_units, out_channels=hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(hidden_units, hidden_units, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(0.5),
            nn.Linear(in_features=hidden_units*16*16, out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.classifier(x)
        return x

# Create train_step(), test_step(), train()
def train_step(model: nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: nn.Module,
               optimizer: torch.optim.Optimizer):
    """
    Function to perform one training step

    Args:
        model (nn.Module): The neural network model
        dataloader (torch.utils.data.DataLoader): Training data loader
        loss_fn (nn.Module): Loss function
        optimizer (torch.optim.Optimizer): Optimizer

    Returns:
        float: Training accuracy
    """
    model.train()
    train_acc = 0

    for batch, (X,y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()

        y_pred_class = y_pred.argmax(dim=1)
        train_acc += (y_pred_class == y).float().mean().item()

    train_acc /= len(dataloader)
    return train_acc

def test_step(model: nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: nn.Module):
    """
    Function to perform one testing step

    Args:
        model (nn.Module): The neural network model
        dataloader (torch.utils.data.DataLoader): Testing data loader
        loss_fn (nn.Module): Loss function

    Returns:
        float: Testing accuracy
    """
    model.eval()
    test_acc = 0

    with torch.no_grad():
        for batch, (X,y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            test_pred_logits = model(X)
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += (test_pred_labels == y).float().mean().item()

    test_acc /= len(dataloader)
    return test_acc

def train(model: nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):
    """
    Function to train the model

    Args:
        model (nn.Module): The neural network model
        train_dataloader (torch.utils.data.DataLoader): Training data loader
        test_dataloader (torch.utils.data.DataLoader): Testing data loader
        optimizer (torch.optim.Optimizer): Optimizer
        loss_fn (nn.Module, optional): Loss function. Defaults to nn.CrossEntropyLoss().
        epochs (int, optional): Number of epochs for training. Defaults to 5.

    Returns:
        dict: Dictionary containing training and testing accuracies
    """
    results = {
        "train_acc": [],
        "test_acc": []
    }

    for epoch in range(epochs):
        train_acc = train_step(model=model,
                               dataloader=train_dataloader, loss_fn=loss_fn,
                               optimizer=optimizer)
        test_acc = test_step(model=model,
                             dataloader=test_dataloader,
                             loss_fn=loss_fn)

        # Print out training and testing accuracies for each epoch
        print(
            f"Epoch: {epoch+1} | "
            f"Train Accuracy: {train_acc:.4f} | "
            f"Test Accuracy: {test_acc:.4f}"
        )

        # Update results dictionary
        results["train_acc"].append(train_acc)
        results["test_acc"].append(test_acc)

    return results

# Define data augmentation and transformation
train_transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor()
])

test_transform = transforms.Compose([
    transforms.Resize((64,64)),
    transforms.ToTensor()
])

# Create augmented training dataset and simple testing dataset
train_data_augmented = datasets.ImageFolder(train_dir, transform=train_transform)
test_data_simple = datasets.ImageFolder(test_dir, transform=test_transform)

# Create data loaders for augmented training data and simple testing data
train_dataloader_augmented = DataLoader(train_data_augmented,
                                        batch_size=BATCH_SIZE,
                                        num_workers=NUM_WORKERS,
                                        shuffle=True)
test_dataloader_simple = DataLoader(test_data_simple,
                                    batch_size=BATCH_SIZE,
                                    num_workers=NUM_WORKERS,
                                    shuffle=False)

# Initialize the model
model_1 = TinyVGG(
    input_shape=3,
    hidden_units=10,
    output_shape=len(train_data_augmented.classes)).to(device)

# Set number of epochs
NUM_EPOCHS = 10

# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_1.parameters(), lr=0.001)  # Adjust learning rate

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.01)

# Start the timer
start_time = time.time()

# Train the model
model_1_results = train(model=model_1,
                        train_dataloader=train_dataloader_augmented,
                        test_dataloader=test_dataloader_simple,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS)

# End the timer and print out the training time
end_time = time.time()
print(f"Total training time: {end_time - start_time:.3f} seconds")



There are 8 directories and 300 images in 'data/images'.
Epoch: 1 | Train Accuracy: 0.2930 | Test Accuracy: 0.2604
Epoch: 2 | Train Accuracy: 0.3086 | Test Accuracy: 0.5521
Epoch: 3 | Train Accuracy: 0.3008 | Test Accuracy: 0.3703
Epoch: 4 | Train Accuracy: 0.4219 | Test Accuracy: 0.2604
Epoch: 5 | Train Accuracy: 0.4297 | Test Accuracy: 0.2604
Epoch: 6 | Train Accuracy: 0.4375 | Test Accuracy: 0.2604
Epoch: 7 | Train Accuracy: 0.4453 | Test Accuracy: 0.3021
Epoch: 8 | Train Accuracy: 0.4336 | Test Accuracy: 0.2604
Epoch: 9 | Train Accuracy: 0.5273 | Test Accuracy: 0.3125
Epoch: 10 | Train Accuracy: 0.4219 | Test Accuracy: 0.4138
Total training time: 23.319 seconds
