# Project File
Given the magnitude of the RestNet CNN, it is unfeasible to run it on a local CPU. Hence, in order to run it on Google Collab, all the contents will be fused onto this file to run it there. Aside from the file headings, I will not put the rest of the markdowns here. Those can be found on the files themselves. 

## Dataloader

In [None]:
import torch 
from torch import nn

torch.__version__

In [None]:
import torchvision

torchvision.__version__

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
import os
import zipfile
from pathlib import Path
import subprocess

# Define the dataset and paths
dataset = 'puneet6060/intel-image-classification'

# Navigate one level up to place the data folder at the same level as the src folder
base_path = Path("..")  # Represents one level above the current directory
data_path = base_path / 'data'
zip_path = data_path / 'intel-image-classification.zip'

# Create data directory if it doesn't exist
data_path.mkdir(parents=True, exist_ok=True)

# Download the zip file if it doesn't already exist
if not zip_path.exists():
    print(f"Downloading {dataset} dataset...")
    subprocess.run(['kaggle', 'datasets', 'download', '-d', dataset, '-p', str(data_path)])
else:
    print(f"{zip_path} already exists. Skipping download.")

# Extract the dataset directly into the data folder
if not any(data_path.iterdir()):  # Check if data folder is empty
    print("Extracting dataset...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(data_path)
else:
    print(f"{data_path} already exists and is not empty. Skipping extraction.")


In [None]:
image_path = data_path / "intel-image-classification"
image_path

In [None]:
train_dir = image_path / "seg_train" / "seg_train"
test_dir = image_path / "seg_test" / "seg_test"
pred_dir = image_path / "seg_pred"

train_dir, test_dir, pred_dir

In [None]:
import random
from PIL import Image

random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)

image_path_list = list(image_path.glob("*/*/*/*.jpg")) # This line gets all the images by constantly traversing every single item in the given directory and the following ones as well
# print(image_path_list)
# print(f"The Found List Length is {len(image_path_list)}\n")

# Picking a random image
random_image_path = random.choice(image_path_list)
# The image class is the name of the directory where the image is stored
image_class = random_image_path.parent.stem
img = Image.open(random_image_path)

# Metadata
# print(f"Random image path: {random_image_path}")
# print(f"Image class: {image_class}")
# print(f"Image height: {img.height}")
# print(f"Image width: {img.width}")
img

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Turn the image into an array
img_as_array = np.asarray(img)
img_as_array 

In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),         # Resize all images to the same dimensions
    transforms.RandomHorizontalFlip(p=0.5), # Randomly flip images horizontally
    transforms.RandomRotation(10),         # Rotate images randomly within a range
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # Adjust image color properties
    transforms.TrivialAugmentWide(num_magnitude_bins=31), # Randomly select an additional data augmentation strategy
    transforms.ToTensor()
])

# You normally do not manipulate your test data in terms of data augmentation so we define a separate transform
test_transform = transforms.Compose([
    transforms.Resize(size=(64, 64)),
    transforms.ToTensor()
])

In [None]:
train_transform(img).shape

In [None]:
def plot_transformed_images(image_paths, transform, n=3, seed=42):
 
  if seed:
    random.seed(seed)
  random_image_paths = random.sample(image_paths, k=n)
  
  for image_path in random_image_paths:
    with Image.open(image_path) as f:
      fig, ax = plt.subplots(nrows=1, ncols=2)
      ax[0].imshow(f)
      ax[0].set_title(f"Original\nSize: {f.size}")
      ax[0].axis(False)

      # Transform and plot target image
      transformed_image = transform(f).permute(1, 2, 0) # note we will need to change shape for matplotlib (C, H, W) -> (H, W, C)
      ax[1].imshow(transformed_image)
      ax[1].set_title(f"Transformed\nShape: {transformed_image.shape}")
      ax[1].axis("off")

      fig.suptitle(f"Class: {image_path.parent.stem}", fontsize=16)

# plot_transformed_images(image_paths=image_path_list,
#                         transform=train_transform,
#                         n=3,
#                         seed=42)

In [None]:
print(f"The relevant paths: train_dir: {train_dir}\ntest_dir: {test_dir}\nimage_path: {image_path}")

In [None]:
from torchvision import datasets

train_data = datasets.ImageFolder(root=train_dir,
                                  transform=train_transform,
                                  target_transform=None)

test_data = datasets.ImageFolder(root=test_dir,
                                 transform=train_transform)

train_data, test_data

In [None]:
train_data.classes, test_data.classes, len(train_data), len(test_data)

In [None]:
img, label = train_data[0][0], train_data[0][1]
print(f"The img is: {img}\nThe shape is: {img.shape}\nThe label is: {label}")

In [None]:
img, label = train_data[0][0], train_data[0][1]
img.shape

In [None]:
import torch
from torch import nn

class ResidualBlock(nn.Module):
    """
    Residual block with the bottleneck architecture. Key integration of the RestNet50 architecture aimed at tackling the vanishing gradient problem.
    Essentially, this block provides the model with a helper path that skips some layers from the input to the output, allowing the residual to be learned more easily. 
    This situation arises when the gradients become increasingly smaller, causing the earlier layers during back propagation to receive exponentially smaller gradients, preventing the model from learning.
    
    This block contains three mini-layers: 1x1, 3x3, and 1x1 convolutions. We compress the data, then extract spatial features, and then compress again to its original state. 
    """

    def __init__(self, in_channels, mid_channels, out_channels, stride=1):
        super().__init__()

        self.conv_block_1 = nn.Conv2d(in_channels, mid_channels, kernel_size=1, stride=stride, bias=False)
        self.bn_block_1 = nn.BatchNorm2d(mid_channels)
        self.conv_block_2 = nn.Conv2d(mid_channels, mid_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn_block_2 = nn.BatchNorm2d(mid_channels)
        self.conv_block_3 = nn.Conv2d(mid_channels, out_channels, kernel_size=1, stride=1, bias=False)
        self.bn_block_3 = nn.BatchNorm2d(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels: # True if there are mismatched dimensions amongst the input and output channels
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False), # 1x1 convolution
                nn.BatchNorm2d(out_channels) # We normalize to match the input to the size of the output
            )

    def forward(self, x):
        shortcut = self.shortcut(x) # Apply the shortcut (identity or adjusted input)
        x = nn.ReLU()(self.bn_block_1(self.conv_block_1(x))) # 1st layer: 1x1 convolution + batch norm + ReLU
        x = nn.ReLU()(self.bn_block_2(self.conv_block_2(x))) # 2nd layer: 3x3 convolution + batch norm + ReLU
        x = self.bn_block_3(self.conv_block_3(x)) # 3rd layer: 1x1 convolution + batch norm
        x += shortcut # Add shortcut (residual connection)
        return nn.ReLU()(x) # Apply ReLU to the final output

In [None]:
from torch import nn
import torch

class RestNet(nn.Module):
    def __init__(self, input_shape, output_shape):
        super().__init__()

        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(input_shape, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        # This corresponds to the 4 residual stages after the initial convolution and pooling layers. Early stages focus on basic patterns while later stages focus on abstract representations
        self.conv_block_2 = self._make_stage(64, 64, 256, num_blocks=3, stride=1) # Extracts low-level features like edges and simple textures without spatial reduction.
        self.conv_block_3 = self._make_stage(256, 128, 512, num_blocks=4, stride=2) # Captures more complex features and reduces the spatial resolution
        self.conv_block_4 = self._make_stage(512, 256, 1024, num_blocks=6, stride=2) # Processes high-level features like object parts or shapes and further reduces spatial resolution
        self.conv_block_5 = self._make_stage(1024, 512, 2048, num_blocks=3, stride=2) # Extracts the most abstract and high-level features, preparing for the classification head

        # Classifier. It converts the high-level feature maps into class predictions
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Linear(2048, output_shape)
        )

    # Creates a residual stage by stacking residual blocks. 
    # The blocks in the stage work hierarchically to extract increasingly complex features while potentially reducing the spatial dimensions of the feature maps.
    def _make_stage(self, in_channels, mid_channels, out_channels, num_blocks, stride):

        strides = [stride] + [1] * (num_blocks - 1)
        layers = [] # to store the residual blocks

        for stride in strides:

            layers.append(ResidualBlock(in_channels, mid_channels, out_channels, stride))
            in_channels = out_channels
            
        return nn.Sequential(*layers)

    def forward(self, x):
        # [Batch Size, Color Channels, Height, Width]
        # print(f"The original shape is {x.shape}")
        x = self.conv_block_1(x)
        # print(f"The shape after conv_block_1 is {x.shape}")
        x = self.conv_block_2(x)
        # print(f"The shape after conv_block_2 is {x.shape}")
        x = self.conv_block_3(x)
        # print(f"The shape after conv_block_3 is {x.shape}")
        x = self.conv_block_4(x)
        # print(f"The shape after conv_block_4 is {x.shape}")
        x = self.conv_block_5(x)
        # print(f"The shape after conv_block_5 is {x.shape}")
        x = self.classifier(x)
        # print(f"The final shape is {x.shape}")
        return x

In [None]:
model_0 = RestNet(input_shape=3, output_shape=1000).to(device)
model_0

In [None]:
model_0.state_dict()

In [None]:
img.unsqueeze(0).shape # [Batch Size, Color Channels, Width, Height]

In [None]:
model_0(img.unsqueeze(0))

In [None]:
try:
  import torchinfo
except:
  !pip install torchinfo
  import torchinfo

from torchinfo import summary
summary(model_0, input_size=[1, 3, 224, 224])

In [None]:
def print_train_time(start: float, end: float, device: torch.device = None):
    total_time = end - start
    print(f"Train time on {device}: {total_time:.3f} seconds")
    return total_time

In [None]:
def accuracy_fn(y_true, y_pred):
    """
    Calculates accuracy between truth labels and predictions.

    Args:
        y_true (torch.Tensor): Truth labels for predictions.
        y_pred (torch.Tensor): Predictions to be compared to predictions.
    """
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [None]:
def train_step (model: torch.nn.Module,
                data_loader: torch.utils.data.DataLoader,
                loss_fn: torch.nn.Module,
                optimizer: torch.optim.Optimizer,
                accuracy_fn,
                device: torch.device = device):
    
    train_loss, train_accuracy = 0, 0

    model.train()

    for batch, (X, y) in enumerate(data_loader):

        X, y = X.to(device), y.to(device)
        y_pred = model(X) # Forward pass, outputs raw logits
        loss = loss_fn(y_pred, y)
        train_loss += loss
        train_accuracy += accuracy_fn(y_true=y, y_pred=y_pred.argmax(dim=1)) # transforms from logits to labels
        optimizer.zero_grad() 
        loss.backward()
        optimizer.step()
        
    # We adjust to get these metrics per batch, and not the total
    train_loss /= len(data_loader)
    train_accuracy /= len(data_loader)
    print(f"Train loss: {train_loss:.5f} | Train acc: {train_accuracy:.2f}%]\n")

In [None]:
def test_step(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               accuracy_fn,
               device: torch.device = device):
    
    test_loss, test_accuracy = 0, 0

    model.eval()  # Turns off different settings in the model not needed for evaluation/testing (dropout/batch norm layers)

    with torch.inference_mode(): # Turns off gradient tracking and a couple more things behind the scenes
        for batch, (X, y) in enumerate(dataloader):

            X, y = X.to(device), y.to(device)
            test_pred_logits = model(X)
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()
            # test_pred_labels = test_pred_logits.argmax(dim=1)
            # test_accuracy += ((test_pred_labels == y).sum().item()/len(test_pred_labels))
            test_accuracy += accuracy_fn(y_true=y, y_pred=test_pred_logits.argmax(dim=1)) # transforms from logits to labels

    test_loss = test_loss / len(dataloader)
    test_accuracy = test_accuracy / len(dataloader)
    return test_loss, test_accuracy

In [None]:
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5,
          device=device):

  results = {"train_loss": [],
             "train_acc": [],
             "test_loss": [],
             "test_acc": []}

  for epoch in tqdm(range(epochs)):
    train_loss, train_acc = train_step(model=model,
                                       data_loader=train_dataloader,
                                       loss_fn=loss_fn,
                                       optimizer=optimizer,
                                       accuracy_fn=accuracy_fn,
                                       device=device)
    test_loss, test_acc = test_step(model=model,
                                    data_loader=test_dataloader,
                                    loss_fn=loss_fn,
                                    accuracy_fn=accuracy_fn,
                                    device=device)

    print(f"Epoch: {epoch} | Train loss: {train_loss:.4f} | Train acc: {train_acc:.4f} | Test loss: {test_loss:.4f} | Test acc: {test_acc:.4f}")

    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["test_loss"].append(test_loss)
    results["test_acc"].append(test_acc)

  return results

In [None]:
torch.manual_seed(42)
torch.cuda.manual_seed(42)

NUM_EPOCHS = 5

loss_fn = nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(model_0.parameters(), lr=0.001)

start_time = timer()

model_0_results = train(model_0,
                        train_dataloader=train_dataloader,
                        test_dataloader=test_dataloader,
                        optimizer=optimizer,
                        loss_fn=loss_fn,
                        epochs=NUM_EPOCHS,
                        device=device)

end_time = timer()
print(f"Total training time: {end_time-start_time:.3f} seconds")