In [None]:
import torch
from torch import nn
import torchvision
import requests
import zipfile
from pathlib import Path
import matplotlib.pyplot as plt
import os

print(f"PyTorch version: {torch.__version__}\ntorchvision version: {torchvision.__version__}")
device = "cuda" if torch.cuda.is_available() else "cpu"

# Loading in our Data using ImageFolder

In [None]:
import requests
import zipfile
from pathlib import Path

data_path = Path("data/")
image_path = data_path / "casting_data"


if image_path.is_dir():
    print(f"{image_path} directory exists.")
else:
    print(f"Did not find {image_path} directory, creating one...")
    image_path.mkdir(parents=True, exist_ok=True)
    

    with zipfile.ZipFile(data_path / "casting_data.zip", "r") as zip_ref:
        print("Unzipping data...") 
        zip_ref.extractall(image_path)

In [None]:
import os
def walk_through_dir(dir_path):
  """
  Walks through dir_path returning its contents.
  Args:
    dir_path (str or pathlib.Path): target directory
  
  Returns:
    A print out of:
      number of subdiretories in dir_path
      number of images (files) in each subdirectory
      name of each subdirectory
  """
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

In [None]:
walk_through_dir(image_path)

In [None]:
# Setup train and testing paths
train_dir = image_path / "train"
test_dir = image_path / "test"

train_dir, test_dir

In [None]:
import random
from PIL import Image
random.seed()
image_path_list = list(image_path.glob("*/*/*.jpeg"))
random_image_path = random.choice(image_path_list)
image_class = random_image_path.parent.stem #here we're essentially using the name of the folder to define the image class
img = Image.open(random_image_path)

print(f"Random image path: {random_image_path}")
print(f"Image class: {image_class}")
print(f"Image height: {img.height}") 
print(f"Image width: {img.width}")
img

In [None]:
#visualization using matplotlib

import numpy as np
import matplotlib.pyplot as plt

# Turn the image into an array
img_as_array = np.asarray(img)

# Plot the image with matplotlib
plt.figure(figsize=(10, 7))
plt.imshow(img_as_array)
plt.title(f"Image class: {image_class} | Image shape: {img_as_array.shape} -> [height, width, color_channels]")
plt.axis(False);

# Transforming/augmenting the images

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.transforms import v2

data_transform = v2.Compose([
                v2.Resize(size=(224, 224)),
                v2.ToTensor()])

#original
train_data_transform = v2.Compose([
                v2.Resize(size=(224, 224)),
                v2.RandomHorizontalFlip(p = 0.5),
                v2.ColorJitter(brightness = (0.7, 1.3), contrast = (0.7, 1.3), saturation = (0.7, 1.3), hue = (-0.3, 0.3)),
                v2.RandomVerticalFlip(p = 0.5),
                #v2.Grayscale(num_output_channels= 1),
                v2.ToTensor()])

#Testing out some different augmentations
train_data_transform2 = v2.Compose([
                v2.Resize(size=(224, 224)),
                v2.RandomHorizontalFlip(p = 0.3),
                v2.ColorJitter(brightness = (0.7, 1.3), contrast = (0.7, 1.3), saturation = (0.7,1.3), hue = (-0.3, 0.3)),
                v2.RandomInvert(p = 0.05),
                v2.RandomAdjustSharpness(3, p = 0.05),
                v2.RandomVerticalFlip(p = 0.3),
                v2.ToTensor()])

test_data_transform = v2.Compose([
                v2.Resize(size=(224, 224)),
                #v2.Grayscale(num_output_channels= 1),
                v2.ToTensor()])



#print(data_transform(img))
print(data_transform(img).shape)
print(data_transform(img).dtype)

#print(train_data_transform(img))
print(train_data_transform(img).shape)
print(train_data_transform(img).dtype)

#print(test_data_transform(img))
print(test_data_transform(img).shape)
print(test_data_transform(img).dtype)


In [None]:
def plot_transformed_images(image_paths, transform, n = 5, seed = None):
    """Plots a series of random images from image_paths.
    Will open n image paths from image_paths, transform them
    with transform and plot them side by side.
    Args:
        image_paths (list): List of target image paths. 
        transform (PyTorch Transforms): Transforms to apply to images.
        n (int, optional): Number of images to plot. Defaults to 3.
        seed (int, optional): Random seed for the random generator. Defaults to 42.
    """
    random.seed(seed)
    random_image_paths = random.sample(image_paths, k=n) #taking a random sample from image_paths, k = n images
    for image_path in random_image_paths:
        with Image.open(image_path) as f:
            fig, ax = plt.subplots(nrows = 1, ncols = 2)
            ax[0].imshow(f) 
            ax[0].set_title(f"Original \nSize: {f.size}")
            ax[0].axis("off")
            # Transforming and plotting images, permute() will change shape of images to suit matplotlib (PyTorch default is [C, H, W], matplotlib is [H, W, C])
            transformed_image = transform(f).permute(1, 2, 0) 
            ax[1].imshow(transformed_image) 
            ax[1].set_title(f"Transformed \nSize: {transformed_image.shape}")
            ax[1].axis("off")
            fig.suptitle(f"Class: {image_path.parent.stem}", fontsize=16)


plot_transformed_images(image_path_list, 
                        transform=train_data_transform, 
                        n=3)

In [None]:
from torchvision import datasets

train_data = datasets.ImageFolder(root=train_dir, transform=train_data_transform)
test_data = datasets.ImageFolder(root=test_dir, transform=test_data_transform)

image, label = train_data[0]
image, label

train_data

In [None]:
print(image.shape) #dimensions are color channels, height and width
print(label)


print(train_data.classes)
print(train_data.targets)

In [None]:
print(train_data)
print(test_data)

In [None]:
#getting attributes of the data
class_names = train_data.classes
class_dict = train_data.class_to_idx #target values of classes

In [None]:
BATCH_SIZE = 32
NUM_WORKERS = 0


print(f"Creating DataLoader's with batch size {BATCH_SIZE} and {NUM_WORKERS} workers.")

random.seed(42)
train_dataloader = DataLoader(train_data, 
                            batch_size=BATCH_SIZE, 
                            shuffle=True, 
                            num_workers=NUM_WORKERS)
random.seed(42)
test_dataloader = DataLoader(test_data, 
                            batch_size=BATCH_SIZE, 
                            shuffle=False, 
                            num_workers=NUM_WORKERS)

train_dataloader, test_dataloader

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels = 3, out_channels=64, kernel_size=3, stride=1, padding=2),
            #nn.BatchNorm2d(64),
            nn.ReLU(),
            #nn.Dropout(p = 0.05),
            nn.MaxPool2d(2),
            nn.Conv2d(in_channels = 64, out_channels= 64, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels = 64, out_channels=128, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(64),
            nn.ReLU(),
            #nn.Dropout(p = 0.05),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels = 128, out_channels=128, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels = 128, out_channels=64, kernel_size=3, stride=1, padding=1),
            #nn.BatchNorm2d(64),
            nn.ReLU(),
            #nn.Dropout(p = 0.05),
            nn.MaxPool2d(kernel_size=2, stride=2),
            #nn.AdaptiveAvgPool2d(output_size =(14, 14))
            nn.Flatten(),
            nn.Linear(in_features=64*7*7, out_features=32*7*7),
            #nn.BatchNorm2d(32*7*7),
            nn.Linear(in_features=32*7*7, out_features=1),
            nn.Sigmoid()
        )

    def forward(self, x: torch.Tensor):
        return self.block1(x)

In [None]:
model_0 = CNN().to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.0005)#, weight_decay = 1e-4)
from torcheval import torcheval
from torcheval.metrics.functional.classification import binary_recall 
from torcheval.metrics.functional.classification import binary_precision
from torcheval.metrics.functional.classification import binary_f1_score
from torcheval.metrics.functional.classification import binary_auroc

In [None]:
def train_step_binary(model: torch.nn.Module, 
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module, 
               optimizer: torch.optim.Optimizer):
    random.seed(42)
    
    model.train()
    train_loss, train_acc, train_recall, train_precision, train_f1 = 0, 0, 0, 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        train_pred = model(X).squeeze()  # Remove the extra dimension
        loss = loss_fn(train_pred, y.float())  # Ensure y is of float type 
        train_loss += loss.item() # total loss over all batches
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()      
        train_pred_binary = torch.round(train_pred)
        train_acc += (train_pred_binary == y).sum().item()/len(train_pred_binary)
        train_recall += binary_recall(train_pred_binary, y).item()
        train_precision += binary_precision(train_pred_binary, y).item()
        train_f1 += binary_f1_score(train_pred_binary, y).item()
        avg_train_recall = train_recall / (batch + 1)
        avg_train_precision = train_precision / (batch + 1)
        avg_train_loss = train_loss / (batch + 1)
        if batch % 1 == 0 and batch > 0:
            print(f"Training Batch {batch}/{len(dataloader)} | Training Loss: {avg_train_loss}")
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    train_recall = train_recall / len(dataloader)
    train_precision = train_precision / len(dataloader)
    train_f1 = train_f1 / len(dataloader)
    return train_loss, train_acc, train_recall, train_precision, train_f1

In [None]:
def test_step_binary(model: torch.nn.Module, 
              dataloader: torch.utils.data.DataLoader, 
              loss_fn: torch.nn.Module):
    model.eval() 
    test_loss, test_acc, test_recall, test_precision, test_f1 = 0, 0, 0, 0, 0
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            y = y.to(torch.float32)
            test_pred = model(X).squeeze()
            loss = loss_fn(test_pred, y)
            test_loss += loss.item()
            test_pred_binary = torch.round(test_pred)
            test_acc += (test_pred_binary == y).sum().item()/len(test_pred_binary)
            test_recall += binary_recall(test_pred_binary, y).item()
            test_precision += binary_precision(test_pred_binary, y).item()
            test_f1 += binary_f1_score(test_pred_binary, y).item()
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    test_recall = test_recall / len(dataloader)
    test_precision = test_precision / len(dataloader)
    test_f1 = test_f1 / len(dataloader)
    return test_loss, test_acc, test_recall, test_precision, test_f1

In [None]:
def train(model: torch.nn.Module, 
          train_dataloader: torch.utils.data.DataLoader, 
          test_dataloader: torch.utils.data.DataLoader, 
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module,
          epochs: int):


    # 2. Create empty results dictionary
    #results = {"train_loss": [],
    #    "train_acc": [],
    #    "test_loss": [],
    #    "test_acc": [],
    #}
    from tqdm.auto import tqdm

    for epoch in tqdm(range(epochs)):
        print(f"\n|-|-|-|-|-|-|-|-|-|-|-|-|-| BEGINNING EPOCH {epoch + 1} |-|-|-|-|-|-|-|-|-|-|-|-|-|\n")
        
        train_loss, train_acc, train_recall, train_precision, train_f1 = train_step_binary(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        
        test_loss, test_acc, test_recall, test_precision, test_f1 = test_step_binary(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)
        
        print(
            f"|-|-|-|-|-|-|-|-|-|-|-|-|-| EPOCH {epoch+1} FINISHED |-|-|-|-|-|-|-|-|-|-|-|-|-|\n\n"
            f"train loss: {train_loss:.4f}   |   "
            f"train acc: {train_acc:.4f}   |   "
         #   f"train precision: {train_precision:.4f} | "
         #   f"train recall: {train_recall:.4f} | "
            f"train f1: {train_f1:.4f}\n"
            f"test loss: {test_loss:.4f}   |   "
            f"test acc: {test_acc:.4f}   |   "
         #   f"test precision: {test_precision:.4f} | "
          #  f"test recall: {test_recall:.4f} | "
            f"test f1: {test_f1:.4f}\n\n\n"
        )


        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)
        results["train_recall"].append(train_recall)
        results["train_precision"].append(train_precision)
        results["train_f1"].append(train_f1)
        results["test_recall"].append(test_recall)
        results["test_precision"].append(test_precision)
        results["test_f1"].append(test_f1)

    return results

In [None]:


results = {"train_loss": [],
        "train_acc": [],
        "train_recall":[],
        "train_precision":[],
        "train_f1":[],
        "test_loss": [],
        "test_acc": [],
        "test_recall":[],
        "test_precision":[],
        "test_f1":[],

}

In [None]:
from timeit import default_timer as Timer
torch.manual_seed(42) 
torch.cuda.manual_seed(42)

model_0 = CNN().to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(params=model_0.parameters(), lr=0.0005)
NUM_EPOCHS = 15
start_time = timer()

model_0_results = train(model=model_0, 
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn, 
                        epochs=NUM_EPOCHS)
end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")

In [None]:
import pandas as pd
pd.DataFrame(results)

In [None]:
from typing import Tuple, Dict, List
def plot_loss_curves(results: Dict[str, List[float]]):

    loss = results['train_loss']
    test_loss = results['test_loss']
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']
    epochs = range(len(results['train_loss']))

    plt.figure(figsize=(15, 7))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

In [None]:
plot_loss_curves(model_0_results)