## Imports

In [None]:
from torchvision.datasets import DatasetFolder, ImageFolder
from torchvision.transforms import v2
from torch.utils.data import DataLoader, Subset, ConcatDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch

import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split

from collections import Counter
from pathlib import Path
import time
import subprocess
import shutil
import json

## Data Preprocessing

In [None]:
# Kaggle Loaded Dataset Path (readonly)
root_input = Path("/kaggle/input/imagenet100")

# Working Dataset Path to modify the default
root_working = Path("/kaggle/working/reduced_imagenet")
root_working.mkdir(exist_ok=True)

if root_working.exists():
    print("The working directory was created successfully.")
else:
    print("There seems to be an issue.")

In [None]:
# Loading all the labels
labels_path = Path(root_input / "Labels.json")

# Reading the Raw JSON Labels
labels = json.loads(labels_path.read_text())
print(json.dumps(labels, indent=4))

In [None]:
# Modifying the labels to retain only the first name
for index, item in enumerate(labels.items()):
    key, value = item
    class_name = value.split(",")[0]
    labels[key] = class_name

# Modified Labels
mod_labels = json.dumps(labels, indent=4)
print("All Labels:\n", mod_labels)

# Saving the Reduced Labels
with open(root_working / "labels.json", "w") as file:
    json.dump(labels, fp=file, indent=4)

In [None]:
print("The copying process has started")

# Iterating over the directories at root
for parent_node in root_input.iterdir():

    # Avoiding the Label File
    if not parent_node.is_dir():
        continue

    # Retrieving the Split Name
    split_name = parent_node.stem.split(".")[0]

    # Executing the Copy based on the Split Name
    if split_name == "train" :
        dest_path = root_working / "train"
    elif split_name == "val":
        dest_path = root_working / "valid"
        
    # Creating the directory
    dest_path.mkdir(exist_ok=True)
    
    # Iterating through the classes on sub-nodes
    for sub_node in parent_node.iterdir():

        # If Class ID in Reduced => Backtrack the name and copy
        if sub_node.stem in labels.keys():
            class_name = labels[sub_node.stem]
            completed_path = dest_path / class_name
            shutil.copytree(sub_node, completed_path, dirs_exist_ok=True)

print("Copy Process Completed")

## Data Handling

In [None]:
# Dataset Working Path
dataset_path = Path("/kaggle/working/reduced_imagenet")


class DataHandler:
    """This class is responsible for loading the datasets."""
    def __init__(self, root_dir: Path) -> None:
        self.root_dir = root_dir
        self.norm_means = [0.485, 0.456, 0.406]
        self.norm_stds = [0.229, 0.224, 0.225]

        self.apply_transforms = v2.Compose([
            v2.Resize(size=256, interpolation=v2.InterpolationMode.BICUBIC),  # Maintaining the Aspect Ratio
            v2.CenterCrop(size=(224, 224)),  # Crop the Image to the Subject
            v2.ToImage(),  # Converts PIL Image to Tensor
            v2.ToDtype(torch.float32, scale=True),  # Converting the Dtype for Normalisation
            v2.Normalize(mean=self.norm_means, std=self.norm_stds),  # Applies Normalisation
        ])

    def load_set(self, set_name: str) -> DatasetFolder:
        """Loads the set by the specified set_name."""

        if set_name == "train":
            dataset_path = self.root_dir / "train"
        elif set_name == "valid":
            dataset_path = self.root_dir / "valid"
        else:
            raise UnboundLocalError("Invalid set name provided.")

        dataset = ImageFolder(root=dataset_path, transform=self.apply_transforms)
        self.class_names = dataset.classes
        return dataset

    def move_samples_from_train(
        self, train_set: DatasetFolder, 
        move_percent: float = 0.12
    ) -> tuple[Subset, Subset]:
        """Moves a fixed number of samples from train to valid for better split."""

        targets = np.array(train_set.targets)
        indices = range(len(train_set))
        
        # Stratified Sampling of the Train Set
        train_indices, valid_indices = train_test_split(
            indices, test_size=move_percent, stratify=targets
        )
        
        # Subsets
        train_set_pre_prep = Subset(train_set, train_indices)
        valid_set_pre_prep = Subset(train_set, valid_indices)

        return train_set_pre_prep, valid_set_pre_prep
    
    def prepare_dataset(self, dataset: DatasetFolder, batch_size: int=64, shuffle: bool=True) -> DataLoader:
        """Returns the prepared and loaded dataset."""

        return DataLoader(
            dataset=dataset, 
            batch_size=batch_size,
            shuffle=shuffle,
            num_workers=4
        )
    
    def view_images(self, loaded_set: DataLoader) -> None:
        """Helper function just to view the images."""
        images, targets = next(iter(loaded_set))

        plt.figure(figsize=(12, 10))
        for i in range(20):
            img = images[i].squeeze()
            label = targets[i]

            plt.subplot(5, 4, i + 1)
            plt.title(f"{self.class_names[label]}", fontdict={"size": 7})
            plt.imshow(img.permute(1, 2, 0))
            plt.axis("off")
        
        plt.tight_layout()
        plt.show()


# Testing
data_handle = DataHandler(root_dir=dataset_path)
train_loaded = data_handle.load_set("train")
train_prep = data_handle.prepare_dataset(dataset=train_loaded)
class_names = data_handle.class_names
data_handle.view_images(loaded_set=train_prep)

**Important**
- The images are a little weird as the norm is turned on should be fine with it off just for visualisation purposes.

In [None]:
# Class Names as in the dataset
print("Total No of Classes: ", len(class_names))
print("\nClass Names:\n", class_names)

## The Residual Block

In [None]:
class ResidualLearningBlock(torch.nn.Module):
    """Class Implements a Residual Learning Block to build the ResNet."""

    def __init__(self, in_channels: int, stride: int, no_of_filters: int) -> None:

        # Loading all the properties from the super class
        super().__init__()
        
        # Defining the layers for each block
        self.sequential_block = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=in_channels, out_channels=no_of_filters, kernel_size=1),  # 1x1 Convolution, Stride: 1
            torch.nn.BatchNorm2d(num_features=no_of_filters),
            torch.nn.ReLU(),

            torch.nn.Conv2d(in_channels=no_of_filters, out_channels=no_of_filters, padding=1, kernel_size=3, stride=stride),  # 3x3 Convolution, Stride: 1
            torch.nn.BatchNorm2d(num_features=no_of_filters),
            torch.nn.ReLU(),

            torch.nn.Conv2d(in_channels=no_of_filters, out_channels=no_of_filters*4, kernel_size=1),  # 1x1 Convolution, Stride: Stride
            torch.nn.BatchNorm2d(num_features=no_of_filters*4)
        )

        # Skip Connection Layer
        if stride > 1 or in_channels != no_of_filters * 4:
            self.skip_block = torch.nn.Sequential(
                torch.nn.Conv2d(in_channels=in_channels, out_channels=no_of_filters*4, kernel_size=1, stride=stride),
                torch.nn.BatchNorm2d(num_features=no_of_filters*4)
            )
        else:
            self.skip_block = torch.nn.Identity()

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """Implements the forward propagation of the residual network along with skip connection."""

        # Copying the Inputs for the Block
        inputs = X

        # Block Bottleneck Propagation
        x = self.sequential_block(X)

        # Skip Connection Propagation
        inputs = self.skip_block(inputs)

        # Concatenation of the Skip Connection
        concat = x + inputs

        # Activating the parameters
        concat = torch.nn.ReLU()(concat)

        return concat


# Testing
first_residual_block = ResidualLearningBlock(256, 2, 512)
print(first_residual_block)

## The Resnet-50 Model

In [None]:
class Resnet50(torch.nn.Module):
    """This class implements the complete Resnet-50 model architecture using ResidualLearningBlocks."""

    initial_fmaps = 64
    initial_kernel_size = 7
    initial_stride = 2
    initial_pool_size = 3
    
    def __init__(self, input_channels: int = 3, fc_size: int = 1000, dropout_rate: float = 0.2) -> None:

        # Loading all the properties from the super class
        super().__init__()

        # Intial Convolution Block
        self.model_conv_layers = torch.nn.Sequential(
            torch.nn.Conv2d(
                in_channels=input_channels,
                out_channels=self.initial_fmaps,
                kernel_size=self.initial_kernel_size,
                stride=self.initial_stride,
                padding=3   
            ),
            torch.nn.MaxPool2d(
                kernel_size=self.initial_pool_size,
                stride=2,
                padding=1
            )
        )

        # No of channel for the deep convolutional layers
        self.channel_config = [(64, 3), (128, 4), (256, 6), (512, 3)]
        prev_out_channels = self.channel_config[0][0]

        # Constructing the remaining model architecture
        for layer_idx, (no_channels, no_of_blocks) in enumerate(self.channel_config):

            # Creating each Conv_nX sequence
            for block_no in range(no_of_blocks):

                # Default stride value
                stride = 1

                # If the block is the first in the sequence
                if block_no == 0:
                    
                    # Updating Stride for each sequence block
                    stride = 1 if layer_idx == 0 else 2
                    
                    # Appending the Residual Block Sequences into the main Resnet Module
                    self.model_conv_layers.append(
                        ResidualLearningBlock(
                            in_channels=prev_out_channels,
                            stride=stride,
                            no_of_filters=no_channels
                        )
                    )
                else:
                    # Appending the Residual Block Sequences into the main Resnet Module
                    self.model_conv_layers.append(
                        ResidualLearningBlock(
                            in_channels=prev_out_channels,
                            stride=stride,
                            no_of_filters=no_channels
                        )
                    )
    
                # Updating the no of previous channels
                prev_out_channels = no_channels * 4

            # Adding the Dropout Layer after each block
            self.model_conv_layers.append(
                torch.nn.Dropout(p=dropout_rate)
            )
        
        # Adding the Average Pool Layer on successful execution
        else:
            self.model_conv_layers.append(
                torch.nn.AvgPool2d(kernel_size=self.initial_kernel_size)
            )

        # Downstream Layers
        self.dropout_fc = torch.nn.Dropout(p=dropout_rate)
        self.fc = torch.nn.Linear(in_features=2048 * 1, out_features=fc_size)

    def forward(self, X: torch.Tensor) -> torch.Tensor:
        """Implements the forward propagation of the complete Resnet Model."""

        avg_pool_scores = self.model_conv_layers(X)
        avg_pool_flatten = torch.nn.Flatten()(avg_pool_scores)
        avg_pool_dropout = self.dropout_fc(avg_pool_flatten)
        logits = self.fc(avg_pool_dropout)

        return logits


# Testing
first_resnet_50 = Resnet50()
print(first_resnet_50)

## The Optimizer

In [None]:
CHECKPOINT_PATH = "/kaggle/working/models"


class TrainingLoop:
    """This class handles the training loop for the models."""
    def __init__(self, learning_rate: float, model: torch.nn.Module):
        self.model = model
        self.optim = torch.optim.AdamW(
            params=self.model.parameters(),
            lr=learning_rate
        )
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.loss_fn = torch.nn.CrossEntropyLoss()

        # Learning rate scheduler
        self.lr_schedule = ReduceLROnPlateau(
            optimizer=self.optim, mode="min", factor=0.1,patience=5, min_lr=1e-6
        )

        # Creating the Checkpoint Storage Directory
        self.model_dir = Path(CHECKPOINT_PATH)
        if not self.model_dir.exists():
            self.model_dir.mkdir()
    
    def train_model(
            self, epochs: int,
            train_set: DataLoader, valid_set: DataLoader
        ) -> tuple[list[float], list[float]]:
        """Trains the model for the given number of epochs."""

        # Cache Losses
        train_losses = []
        valid_losses = []

        # Mean Training Variables
        mean_loss_train = 0
        mean_loss_valid = 0

        # Training Checkpoint
        best_valid_loss = torch.inf
        patience = 5
        patience_counter = 0

        # Training Loop
        print("The training process has started")
        for i in range(epochs):

            # Average Epoch Time tracking
            start = time.time()

            # ==== Training Step ====

            # Completing a single epoch
            for batch, (X, y) in enumerate(train_set):

                # Moving the batches to GPU
                X, y = X.to(self.device), y.to(self.device)
            
                # Training Step
                logits = self.model(X)

                # Loss Calculation
                train_loss = self.loss_fn(input=logits, target=y)
                train_loss = train_loss.sum()
                mean_loss_train += train_loss.item()

                # Backpropagation
                self.optim.zero_grad()
                train_loss.backward()
                self.optim.step()

            # Completion of Epoch
            end = time.time()

            # ==== Validation Step ====

            # Turning on the Eval mode on the model for the BN-Layers
            self.model.eval()
            with torch.no_grad():
                for batch, (X, y) in enumerate(valid_set):

                    # Moving the batches to GPU
                    X, y = X.to(self.device), y.to(self.device)

                    # Validation Calculation
                    logits = self.model(X)

                    # Loss Calculation
                    valid_loss = self.loss_fn(input=logits, target=y)
                    valid_loss = valid_loss.sum()
                    mean_loss_valid += valid_loss.item()
            
            # Switching the model back to training mode
            self.model.train()

            # ==== End of Epoch Metrics & Model Checkpointing ====
            mean_loss_train /= len(train_set)
            mean_loss_valid /= len(valid_set)
            time_epoch = end - start

            # Updating the LR Scheduler
            self.lr_schedule.step(mean_loss_valid)
            
            # Updating the Caches
            train_losses.append(mean_loss_train)
            valid_losses.append(mean_loss_valid)

            print(f"Epoch {i + 1}: Train Loss -> {mean_loss_train} | Valid Loss -> {mean_loss_valid} | Time Epoch -> {time_epoch} | Learning Rate -> {self.lr_schedule.get_last_lr()}")

            # Updating the best validation loss so far
            if mean_loss_valid < best_valid_loss:
                best_valid_loss = mean_loss_valid

                # Saving the Model by weights
                torch.save(obj=self.model.state_dict(), f=self.model_dir / "resnet_50_imagenet.pth")
                print("New best model was saved")
                patience_counter = 0
            else:
                patience_counter += 1
                print(f"No improvement: {patience_counter} / {patience}")
                if patience_counter >= patience:
                    print("Early Stopping")
                    break

            # ==== Reset the Training Loop Metrics ====
            mean_loss_train, mean_loss_valid = 0, 0
        
        return train_losses, valid_losses


# Testing
optim = TrainingLoop(learning_rate=1e-4, model=first_resnet_50)
print(optim)

## The Main Function

In [None]:
# Learning Curve Asset Path
ASSET_PATH = Path("/kaggle/working/assets")

# Accelerator Device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Accelerator Available: {DEVICE}")
print(f"Units: {torch.cuda.device_count()}")

In [None]:
def main():
    """The main function for the Resnet-50."""
    print("Hello, Residual Network - 50")
    print("\n ------- \n")

    # Initialising the Data Handler
    data_handle = DataHandler(root_dir=dataset_path)

    # Loading the sets from disk
    train_set = data_handle.load_set("train")
    valid_set = data_handle.load_set("valid")

    # Initial sample sizes
    print("Initial Sample Sizes:")
    train_count = Counter(train_set.targets)
    valid_count = Counter(valid_set.targets)
    print(f"Train Set:\n{train_count}")
    print(f"\nValid Set:\n{valid_count}")
    print("\n ------- \n")

    # Moving Samples
    train_set_pre_prep, valid_set_pre_prep = data_handle.move_samples_from_train(train_set, move_percent=0.15)
    
    print("Moved Sample Sizes:")
    train_targets = [train_set.targets[i] for i in train_set_pre_prep.indices]
    valid_targets = [train_set.targets[i] for i in valid_set_pre_prep.indices]
    train_count = Counter(train_targets)
    valid_count = Counter(valid_targets)
    print(f"Train Set:\n{train_count}")
    print(f"\nValid Set:\n{valid_count}")
    print("\n ------- \n")

    # Preparing the sets
    train_prep = data_handle.prepare_dataset(train_set_pre_prep)
    valid_set_pre_prep = ConcatDataset([valid_set, valid_set_pre_prep])
    valid_prep = data_handle.prepare_dataset(valid_set_pre_prep)

    # Loading the Model
    model_base = Resnet50().to(device=DEVICE)
    resnet_50 = torch.nn.DataParallel(model_base)
    
    # Loading the Training Loop Handler
    optimizer = TrainingLoop(learning_rate=5e-4, model=resnet_50)
    
    # Training the model
    train_losses, valid_losses = optimizer.train_model(30, train_prep, valid_prep)

    # Plotting the losses
    plt.figure(figsize=(10, 8))
    plt.title("Learning Curve")
    plt.plot(range(1, len(train_losses) + 1), train_losses, c="b", ls="-", label="Train Loss")
    plt.plot(range(1, len(valid_losses) + 1), valid_losses, c="g", ls="-.", label="Valid Loss")
    plt.legend(loc="upper right")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")

    # Storing the Learning Curve
    if not ASSET_PATH.exists():
        ASSET_PATH.mkdir()
    plt.savefig(ASSET_PATH / "learning_curve.png")

    # Rendering the plot
    plt.show()


# ==== Driver Code ====
main()