In [None]:
import torch
from torch import nn
from pathlib import Path


In [None]:
print(torch.__version__)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
!nvidia-smi

In [None]:
import os
def walk_through_dir(dir_path):
    for dirpath, dirnames, filenames in os.walk(dir_path):
        print(f"There are {len(dirnames)} directories names and {len(filenames)} images in '{dir_path}'.")

In [None]:
image_path = Path(r'D:\Sem 3\DL\Classification\data')
walk_through_dir(image_path)

In [None]:
import random
from PIL import Image

image_path_list = list(image_path.glob("**/*.png"))

random_image_path = random.choice(image_path_list)

image_class = random_image_path.parent.stem

img = Image.open(random_image_path)

print(f"Random image path: {random_image_path}")
print(f"Image class: {image_class}")
print(f"Image height: {img.height}")
print(f"Image width: {img.width}")
img

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Turn the image into an array
img_as_array = np.asarray(img)

# Plot the image
plt.figure(figsize=(5,5))
plt.imshow(img_as_array)
plt.title(f"Image class: {image_class} | Image shape: {img_as_array.shape}")
plt.axis(False);

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [None]:
data_transform = transforms.Compose([
    transforms.Resize(size=(128, 128)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor()
])

In [None]:
data_transform(img).shape, data_transform(img)

In [None]:
def plot_transformed_images(image_paths: list, transform, n=3, seed=None):
    # Select random images from a path of images and loads/transforms them then plots the original vs the transformed version.
    if seed:
        random.seed(seed)
    random_image_paths = random.sample(image_paths, k=n)
    for image_path in random_image_paths:
        with Image.open(image_path) as f:
            fig, ax = plt.subplots(nrows=1, ncols=2)
            ax[0].imshow(f)
            ax[0].set_title(f"original\nsize: {f.size}")
            
            # Transform and plot target image
            transformed_image = transform(f).permute(1, 2, 0)  # note we will need to change shape for matplotlib
            ax[1].imshow(transformed_image)
            ax[1].set_title(f"Transformed\nShape: {transformed_image.shape}")
            fig.suptitle(f"class: {image_path.parent.stem}", fontsize=16)
            
plot_transformed_images(image_paths=image_path_list,
                        transform=data_transform,
                        n=3,
                        seed=42)

In [None]:
train_dir = Path(r'D:\Sem 3\DL\Classification\data\train')
val_dir = Path(r'D:\Sem 3\DL\Classification\data\val')

In [None]:
from typing import Tuple, Dict, List

In [None]:
# train_data = datasets.ImageFolder(root=train_dir,
#                                   transform=data_transform,
#                                   target_transform=None)

# val_data = datasets.ImageFolder(root=val_dir,
#                                 transform=data_transform)

# train_data, val_data

In [None]:
# train_data.classes, train_data.class_to_idx

In [None]:
target_directory = train_dir
print(f"Target dir: {target_directory}")

class_names_found = sorted([entry.name for entry in list(os.scandir(target_directory))])
class_names_found

In [None]:
# Make function to find classes in target directory
def find_classes(directory: str) -> Tuple[List[str], Dict[str, int]]:
    # 1. Get the class names by scanning the target directory
    classes = sorted(entry.name for entry in os.scandir(directory) if entry.is_dir())
     
    # 2. Raise an error if class names not found
    # if not classes:
    #     raise FileNotFoundError(f"Couldn't find any classes in {directory}.")
    
    # 3. Create a dictionary of index labels (computers prefer numerical rather than string labels)
    class_to_idx = {cls_name: i for i, cls_name in enumerate(classes)}
    return classes, class_to_idx

In [None]:
import pathlib

In [None]:
from torch.utils.data import Dataset

class ChristmasImages(Dataset):
    def __init__(self, targ_dir: str, transform=None) -> None:
        self.paths = list(pathlib.Path(targ_dir).glob("**/*.png"))
        
        self.transform = transform
        
        self.classes, self.class_to_idx = find_classes(targ_dir)
    
    #  Make function to load images
    def load_image(self, index: int) -> Image.Image:
        image_path = self.paths[index]
        return Image.open(image_path)
    
    def __len__(self) -> int:
        return len(self.paths)
    
    def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
        img = self.load_image(index)
        class_name = self.paths[index].parent.name
        class_idx = self.class_to_idx[class_name]
        # dummy_label = 0  # Use a dummy label since there are no classes
        
        if self.transform:
            return self.transform(img), class_idx
        else:
            return img, class_idx    
    

In [None]:
def ensure_three_channels(image):
    if image.size(0) == 1:  # Grayscale to RGB
        image = image.expand(3, -1, -1)
    elif image.size(0) == 2:  # Duplicate the two channels to make it 3 channels
        image = torch.cat([image, image[:1, :, :]], dim=0)
    elif image.size(0) == 4:  # RGBA to RGB by discarding the alpha channel
        image = image[:3, :, :]
    return image


In [None]:
train_transforms = transforms.Compose([transforms.Resize(size=(128,128)),
                                       transforms.CenterCrop(128),
                                       # transforms.RandomRotation(degrees=(-180, 180)),
                                       # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1, hue=0.1),
                                       transforms.RandomHorizontalFlip(p=0.5),
                                       # transforms.GaussianBlur(3, sigma=(0.1, 2.0)),
                                       transforms.ToTensor(),
                                       transforms.Lambda(ensure_three_channels),
                                    #    transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225]),
                                       ])
val_transforms = transforms.Compose([transforms.Resize(size=(128, 128)),
                                    transforms.CenterCrop(128),
                                    transforms.ToTensor(),
                                    transforms.Lambda(ensure_three_channels),
                                    # transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225]),
                                    ])

In [None]:
train_data = ChristmasImages(targ_dir=train_dir,
                             transform=train_transforms)
val_data = ChristmasImages(targ_dir=val_dir,
                           transform=val_transforms)

In [None]:
# Get class name as list
class_names = train_data.classes
class_names

In [None]:
train_data, val_data

In [None]:
len(train_data), len(val_data)

In [None]:
train_data.classes

In [None]:
train_data.class_to_idx

In [None]:
def display_random_images(dataset: torch.utils.data.Dataset,
                          classes: List[str] = None,
                          n: int = 10,
                          display_shape: bool = True,
                          seed: int = None):
    # Adjust display if n is too high
    if n > 10:
        display_shape = False
        print(f"For display purposes, n shouldn't be larger than 10.")
        
    if seed:
        random.seed(seed)
    
    # Get the random sample indexes
    random_sample_idx = random.sample(range(len(dataset)), k=n)
    
    plt.figure(figsize=(16,8))
    
    # Loop through random indexex and plot them with matplotlib
    for i, targ_sample in enumerate(random_sample_idx):
        targ_image, targ_label = dataset[targ_sample][0], dataset[targ_sample][1]
        
        # Adjust tensor dimensions for plotting
        targ_image_adjust = targ_image.permute(1, 2, 0)   # [c, h, w] -> [h, w, c]
        
        # plot adjusted sample
        plt.subplot(1, n, i+1)
        plt.imshow(targ_image_adjust)
        plt.axis(False)
        if classes:
            title = f"Class: {classes[targ_label]}"
            if display_shape:
                title = title + f"\nshape: {targ_image_adjust.shape}"
        plt.title(title)

In [None]:
display_random_images(dataset=train_data,
                      classes=class_names,
                      n=5,
                      seed=None)

In [None]:
BATCH_SIZE = 16
train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              shuffle=True)
val_dataloader = DataLoader(dataset=val_data,
                            batch_size=BATCH_SIZE,
                            shuffle=False)
train_dataloader, val_dataloader

In [None]:
# Get image and label fromdataloader
img_custom, label_custom = next(iter(train_dataloader))
img_custom.shape, label_custom.shape

In [None]:
image_path

In [None]:
# Get all image paths
image_path_list = list(image_path.glob("**/*.png"))
image_path_list[:10]


In [None]:
plot_transformed_images(image_paths=image_path_list,
                        transform=train_transforms,
                        n=3,
                        seed=None)

In [None]:
class Network(nn.Module):
    def __init__(self, input_shape: int, hidden_units: int, output_shape: int) -> None:
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_shape,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1
                      ),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=0.5)
        )
        
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
        )
        
        self.conv_block_3 = nn.Sequential(
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(p=0.5)
        )
        
        self.conv_block_4 = nn.Sequential(
            nn.Conv2d(in_channels=hidden_units,
                      out_channels=hidden_units,
                      kernel_size=3,
                      stride=1,
                      padding=1),
            nn.BatchNorm2d(hidden_units),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=hidden_units*8*8,
                      out_features=output_shape)
        )
        
    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        # print(x.shape)
        x = self.conv_block_2(x)
        # print(x.shape)
        x = self.conv_block_3(x)
        # print(x.shape)
        x = self.conv_block_4(x)
        # print(x.shape)
        x = self.classifier(x)
        # print(x.shape)
        return x
    
torch.manual_seed(42)
model_0 = Network(input_shape=3,
                  hidden_units=20,
                  output_shape=len(train_data.classes)).cuda()

model_0.to(device)
        

In [None]:
image_batch, label_batch = next(iter(train_dataloader))
image_batch.shape, label_batch.shape

In [None]:
# 2. Get a single image from the batch and unsqueeze the image so its shape fits the model
image_single, label_single = image_batch[0].unsqueeze(dim=0), label_batch[0]
print(f"Single image shape: {image_single.shape}\n")

# 3. Perform a forward pass on a single image
model_0.eval()
with torch.inference_mode():
    pred = model_0(image_single.to(device))
    
# 4. Print out what's happening and convert model logits -> pred probs -> pred label
print(f"Output logits:\n{pred}\n")
print(f"Output prediction probabilities:\n{torch.softmax(pred, dim=1)}\n")
print(f"Output prediction label:\n{torch.argmax(torch.softmax(pred, dim=1), dim=1)}\n")
print(f"Actual label:\n{label_single}")

In [None]:
# Try forward pass
# model_0(image_batch)

In [None]:
from torchinfo import summary
summary(model_0, input_size=[1,3,128,128])

In [None]:
# for batch, (X, y) in enumerate(train_dataloader):
#     print(f"Batch {batch}: X.shape = {X.shape}")
#     # Check if all X in batch have 3 channels
#     assert all(x.shape[0] == 3 for x in X), f"Found X with channels not equal to 3 in batch {batch}"

In [None]:
# Calculate accuracy
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

# def accuracy_fn(y_pred, y):
#     acc = (y_pred.argmax(dim=1) == y).float().mean()
#     return acc.item() * 100

In [None]:
# Setup loss function and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=model_0.parameters(),
                            lr=0.001)

In [None]:
def train_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer,
               accurucy_fn,
               device: torch.device = device):
    
    train_loss, train_acc = 0, 0
    model.train()
    for batch, (X, y) in enumerate(data_loader):
        
        X, y = X.to(device), y.to(device)
        
        # 1. Forward pass
        y_pred = model(X)
        
        # 2. Calculate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_acc += accuracy_fn(y_true=y,
                                 y_pred=y_pred.argmax(dim=1))
        
        # 3. optimizer zero grad
        optimizer.zero_grad()
        
        # 4. loss backward
        loss.backward()
        
        # 5. optimizer step
        optimizer.step()
    
    # Calculate loss and accuracy per epoch and print out what's happening
    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    print(f"Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%")
    
    
def test_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module,
              accuracy_fn,
              device: torch.device = device):
    
    test_loss, test_acc = 0, 0
    model.to(device)
    model.eval()
    
    # Turn on inference context manager
    with torch.inference_mode():
        for X, y in data_loader:
            
            # 1. Forward pass
            test_pred = model(X)
            
            # 2. Calculate loss and accuracy
            test_loss += loss_fn(test_pred, y)
            test_acc += accuracy_fn(y_true=y,
                                    y_pred=test_pred.argmax(dim=1)
                                    # y_pred=test_pred
                                    )

            # Example adjustment (assuming you want to accumulate some kind of metrics):
            # test_loss += loss_fn(test_pred, X).item()  # Using X as a placeholder for true labels
            # test_accuracy += accuracy_fn(test_pred, X).item()  # Using X as a placeholder for true labels
            
        # Adjust metrics and print out
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
        print(f"Test loss: {test_loss:.5f} | Test accuracy: {test_acc:.2f}%\n")

In [None]:
from tqdm.auto import tqdm

In [None]:
def print_train_time(start: float,
                     end: float,
                     device: torch.device = None):
    total_time = end - start
    print(f"Train time on {device}: {total_time: .3f} secs")
    return total_time

In [None]:
torch.manual_seed(42)   # Don't use random seed while training

# Measure time
from timeit import default_timer as timer
train_time_model_0 = timer()

# Train and test model
epochs = 30
for epoch in tqdm(range(epochs)):
    print(f"Epoch: {epoch}\n----")
    train_step(model=model_0,
               data_loader=train_dataloader,
               loss_fn=loss_fn,
               optimizer=optimizer,
               accurucy_fn=accuracy_fn,
               device=device)
    
    test_step(model=model_0,
              data_loader=val_dataloader,
              loss_fn=loss_fn,
              accuracy_fn=accuracy_fn,
              device=device)
    
train_time_end_model_0 = timer()
total_train_time_model_0 = print_train_time(start=train_time_model_0,
                                            end=train_time_end_model_0,
                                            device=device)

Make predictions

In [None]:
torch.manual_seed(42)


def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn):
    loss, acc = 0, 0
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            y_pred = model(X)
            
            loss += loss_fn(y_pred, y)
            acc += accuracy_fn(y_true = y,
                               y_pred = y_pred.argmax(dim=1))
            
        loss /= len(data_loader)
        acc /= len(data_loader)
        
    return {"model_name": model.__class__.__name__,
            "model_loss": loss.item(),
            "model_acc": acc}
    
model_0_results = eval_model(model=model_0,
                             data_loader=val_dataloader,
                             loss_fn=loss_fn,
                             accuracy_fn=accuracy_fn)
model_0_results

In [None]:
def make_predictions(model: torch.nn.Module,
                     data: list):
    pred_probs = []
    model.eval()
    
    with torch.inference_mode():
        for sample in data:
            sample = torch.unsqueeze(sample, dim=0)
            
            pred_logits = model(sample)
            
            pred_prob = torch.softmax(pred_probs.squeeze(), dim=0)
            pred_probs.append(pred_prob)
    
    return torch.stack(pred_probs)