<a href="https://colab.research.google.com/github/harditrivedi16/Image-Classification--258/blob/main/ResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Training the ResNet model

## Importing and Installing all the libraries

In [29]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from collections import namedtuple
import torchvision
from torchvision import datasets, models, transforms
import numpy as np
import os
from torch.utils.data import random_split



## Adding the code cells from the sample code given in the [Github link](https://github.com/lkk688/MultiModalClassifier/blob/main/TorchClassifier/myTorchModels/CustomResNet.py)

In [20]:
class ResNet(nn.Module):
    def __init__(self, config, output_dim):
        super().__init__()

        block, n_blocks, channels = config
        self.in_channels = channels[0]

        assert len(n_blocks) == len(channels) == 4

        #bias = False: The authors of the ResNet paper argue that the bias terms are unnecessary as every convolutional layer in a ResNet is followed by
        # a batch normalization layer which has a  β  (beta) term that does the same thing as the bias term in the convolutional layer

        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)

    def get_resnet_layer(self, block, n_blocks, channels, stride = 1):

        layers = []

        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False

        layers.append(block(self.in_channels, channels, stride, downsample))

        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)

        return x, h


In [21]:
class BasicBlock(nn.Module):
    #The BasicBlock is made of two 3x3 convolutional layers
    expansion = 1

    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3,
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3,
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.relu = nn.ReLU(inplace = True)

        if downsample: #When downsampling, we add a convolutional layer with a 1x1 filter, and no padding, to the residual path.
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1,
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None

        self.downsample = downsample

    def forward(self, x):

        i = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)

        if self.downsample is not None:
            i = self.downsample(i)

        x += i
        x = self.relu(x)

        return x

In [22]:
class Bottleneck(nn.Module):

    expansion = 4 #The Bottleneck block has an expansion of four
    #the number of channels in the image output a block isn't out_channels, but expansion * out_channels

    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3,
                               stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)

        self.relu = nn.ReLU(inplace = True)

        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1,
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None

        self.downsample = downsample

    def forward(self, x):

        i = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x)

        if self.downsample is not None:
            i = self.downsample(i)

        x += i
        x = self.relu(x)

        return x

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def setupCustomResNet(numclasses, modelname):
    ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])
    resnet18_config = ResNetConfig(block = BasicBlock,
                                n_blocks = [2,2,2,2],
                                channels = [64, 128, 256, 512])

    resnet34_config = ResNetConfig(block = BasicBlock,
                                n_blocks = [3,4,6,3],
                                channels = [64, 128, 256, 512])

    resnet50_config = ResNetConfig(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])

    resnet101_config = ResNetConfig(block = Bottleneck,
                                    n_blocks = [3, 4, 23, 3],
                                    channels = [64, 128, 256, 512])

    resnet152_config = ResNetConfig(block = Bottleneck,
                                    n_blocks = [3, 8, 36, 3],
                                    channels = [64, 128, 256, 512])

    if modelname == 'resnet50':
        #load the pre-trained ResNet model.
        pretrained_model = models.resnet50(pretrained = True)
        print(pretrained_model)

        #create a new linear layer with the required dimensions
        IN_FEATURES = pretrained_model.fc.in_features
        OUTPUT_DIM = numclasses #len(test_data.classes)

        fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)
        pretrained_model.fc = fc #replace the pre-trained model's linear layer with our own, randomly initialized linear layer.

        #initialize our ResNet50 model from the configuration
        model = ResNet(resnet50_config, OUTPUT_DIM)
        model.load_state_dict(pretrained_model.state_dict())
        print(f'The model has {count_parameters(model):,} trainable parameters')

        return model

## Mounting the drive and getting the dataset folder's path

In [5]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [24]:
# Define the path to the dataset folder
data_path = '/content/drive/MyDrive/CMPE 258 Assignment 2/Data_with_class'

### Verifying that the number of images is more than 500

In [25]:
# Function to count the number of .jpg files in the folder
def count_images(folder_path):
    jpg_files = [f for f in os.listdir(folder_path) if f.endswith('.jpg')]
    return len(jpg_files)

In [26]:
# Count the number of images in the folder
num_images = count_images(data_path)
print("Number of images in the folder:", num_images)

Number of images in the folder: 0


## Data Preprocessing

In [27]:
# Load the dataset with ImageFolder
full_dataset = datasets.ImageFolder(data_path)

### Splitting the dataset into train and val model

In [30]:
# Calculate sizes for splitting the dataset into train and validation
train_size = int(0.8 * len(full_dataset))
valid_size = len(full_dataset) - train_size

# Split the dataset
train_dataset, valid_dataset = random_split(full_dataset, [train_size, valid_size])

### Applying the transform function

In [31]:
# 3. Define the transform function
train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.ToTensor(),           # Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize images
])

valid_transforms = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to 224x224
    transforms.ToTensor(),           # Convert images to PyTorch tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize images
])

In [32]:
train_dataset.dataset.transform = train_transforms
valid_dataset.dataset.transform = valid_transforms

### Define the data loaders

In [33]:
batch_size = 32

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)

## Model Set up

In [34]:
num_classes = 2
model_name = 'resnet50'
model = setupCustomResNet(num_classes, model_name)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 108MB/s]


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [35]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

## Training Loop

In [36]:
def train_model(model, train_loader, valid_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        running_loss = 0.0
        running_corrects = 0

        # Training Phase
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs, _ = model(inputs)  # Assume model returns output and some intermediate layer
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs} - Training Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}')

        # Validation Phase
        model.eval()  # Set model to evaluate mode
        running_loss = 0.0
        running_corrects = 0

        with torch.no_grad():
            for inputs, labels in valid_loader:
                outputs, _ = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(valid_loader.dataset)
            epoch_acc = running_corrects.double() / len(valid_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs} - Validation Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}')



## Training

In [37]:
train_model(model, train_loader, valid_loader, criterion, optimizer, num_epochs=10)

Epoch 1/10 - Training Loss: 0.7404, Acc: 0.6278
Epoch 1/10 - Validation Loss: 4.8072, Acc: 0.4887
Epoch 2/10 - Training Loss: 0.5475, Acc: 0.7331
Epoch 2/10 - Validation Loss: 1.4217, Acc: 0.6692
Epoch 3/10 - Training Loss: 0.4896, Acc: 0.8008
Epoch 3/10 - Validation Loss: 0.7727, Acc: 0.5714
Epoch 4/10 - Training Loss: 0.3708, Acc: 0.8421
Epoch 4/10 - Validation Loss: 0.9196, Acc: 0.6767
Epoch 5/10 - Training Loss: 0.4619, Acc: 0.8008
Epoch 5/10 - Validation Loss: 0.5546, Acc: 0.6917
Epoch 6/10 - Training Loss: 0.3659, Acc: 0.8440
Epoch 6/10 - Validation Loss: 0.5651, Acc: 0.8271
Epoch 7/10 - Training Loss: 0.2838, Acc: 0.8797
Epoch 7/10 - Validation Loss: 0.5857, Acc: 0.8045
Epoch 8/10 - Training Loss: 0.2712, Acc: 0.8985
Epoch 8/10 - Validation Loss: 1.2341, Acc: 0.7293
Epoch 9/10 - Training Loss: 0.2473, Acc: 0.8947
Epoch 9/10 - Validation Loss: 1.0048, Acc: 0.7068
Epoch 10/10 - Training Loss: 0.1639, Acc: 0.9474
Epoch 10/10 - Validation Loss: 1.0977, Acc: 0.7669
