<a href="https://colab.research.google.com/github/Dipak22/Case-Studies/blob/master/ResNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Import necessary classes and functions



In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset , DataLoader
from torchvision.datasets import ImageFolder
from tqdm.notebook import tqdm
import numpy as np
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

In [2]:
class ResidualBlock(nn.Module):
    def __init__(self, in_planes, planes, downsample=None, middle_conv_stride=1, residual=True):
        """
        This residual block will be reused multiple times to define our model. It consists of 3 convolutional layers,
        along with Batch Normalization and ReLU. If a downsample is needed, it will also accept a downsampling convolution
        that will ensure our identity is equal to the output before returning.

        in_planes: Expected Number of Input Planes
        planes: Number of Planes to Map to in the Intermediate before expansion
        downsample: Pass in a downsampling function to ensure Identity shape matches X
        middle_conv_stride: The first block in every set of N blocks has a stride of 2 on the second convolution
        residual: Turn the residual sum on or off
        """
        super(ResidualBlock, self).__init__()
        ### Set Convolutional Layers ###
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, stride=1)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=middle_conv_stride, padding=1)
        self.bn2 = nn.BatchNorm2d(planes)

        ### Output to planes * 4 as our expansion ###
        self.conv3 = nn.Conv2d(planes, planes*4, kernel_size=1, stride=1)
        self.bn3 = nn.BatchNorm2d(planes*4)
        self.relu = nn.ReLU()

        ### This Will Exist if a Downsample Is Needed ###
        self.downsample = downsample
        self.residual = residual

    def forward(self, x):
        identity = x # Store the identity function

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)

        if self.residual:
            if self.downsample is not None: # If our identity function has less channels or larger size we remap it
                identity = self.downsample(identity)

            x  = x + identity

        return x


In [3]:
class ResNet(nn.Module):
    def __init__(self, layer_counts, num_channels=3, num_classes=2, residual=True):
        """
        ResNet Implementation (Inspired by PyTorch torchvision.models implementation)

        layer_counts: Number of blocks in each set of blocks passed as a list
        num_channels: Number of input channels to model
        num_classes: Number of outputs for classification
        residual: Turn on or off residual connections
        """
        super(ResNet, self).__init__()
        self.residual = residual # Store if we want residual connections
        self.inplanes = 64 # Starting number of planes to map to from input channels

        ### INITIAL SET OF CONVOLUTIONS ###
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(self.inplanes)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        ### DEFINE LAYERS ###
        self.layer1 = self._make_layers(layer_counts[0], planes=64, stride=1)
        self.layer2 = self._make_layers(layer_counts[1], planes=128, stride=2)
        self.layer3 = self._make_layers(layer_counts[2], planes=256, stride=2)
        self.layer4 = self._make_layers(layer_counts[3], planes=512, stride=2)

        ### AVERAGE POOLING AND MAP TO CLASSIFIER ###
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*4, num_classes)

    def _make_layers(self, num_residual_blocks, planes, stride):
        downsample = None # Initialize downsampling as None
        layers = nn.ModuleList() # Create a Module list to store all our convolutions

        # If we have a stride of 2, or the number of planes dont match. This condition will ALWAYS BE MET only
        #on the first block of every set of blocks

        if stride != 1 or self.inplanes != planes*4:
            ### Map to the number of wanted planes with a stride of 2 to map identity to X
            downsample = nn.Sequential(nn.Conv2d(self.inplanes, planes*4, kernel_size=1, stride=stride),
                                       nn.BatchNorm2d(planes*4))

        ### Append this First Block with the Downsample Layer ###
        layers.append(ResidualBlock(in_planes=self.inplanes,
                                    planes=planes,
                                    downsample=downsample,
                                    middle_conv_stride=stride,
                                    residual=self.residual))

        ### Set our InPlanes to be expanded by 4 ###
        self.inplanes = planes * 4

        ### The remaining layers shouldnt have any issues so we can just append all of teh blocks on ###
        for _ in range(num_residual_blocks - 1):
            layers.append(
                ResidualBlock(
                    in_planes=self.inplanes,
                    planes = planes,
                    residual=self.residual
                )
            )

        return nn.Sequential(*layers)


    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x

### PreDefine Different ResNet Models ###
def ResNet50(residual=True):
    return ResNet([3,4,6,3], residual=residual)

def ResNet101(residual=True):
    return ResNet([3,4,23,3], residual=residual)

def ResNet152(residual=True):
    return ResNet([3,8,36,3], residual=residual)

## get data


In [4]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("karakaggle/kaggle-cat-vs-dog-dataset")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/kaggle-cat-vs-dog-dataset


In [5]:
PATH_TO_DATA = path +"/kagglecatsanddogs_3367a/PetImages/"

### DEFINE TRANSFORMATIONS ###
normalizer = transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]) ### IMAGENET MEAN/STD ###
train_transforms = transforms.Compose([
                                        transforms.Resize((224,224)),
                                        transforms.RandomHorizontalFlip(),
                                        transforms.ToTensor(),
                                        normalizer
                                      ])


dataset = ImageFolder(PATH_TO_DATA, transform=train_transforms)

train_samples, test_samples = int(0.9 * len(dataset)), len(dataset) - int(0.9 * len(dataset))
train_dataset, val_dataset = torch.utils.data.random_split(dataset, lengths=[train_samples, test_samples])

def train(model, device, epochs, optimizer, loss_fn, batch_size, trainloader, valloader):
    log_training = {"epoch": [],
                    "training_loss": [],
                    "training_acc": [],
                    "validation_loss": [],
                    "validation_acc": []}

    for epoch in range(1, epochs + 1):
        print(f"Starting Epoch {epoch}")
        training_losses, training_accuracies = [], []
        validation_losses, validation_accuracies = [], []

        model.train() # Turn On BatchNorm and Dropout
        for image, label in tqdm(trainloader):
            image, label = image.to(DEVICE), label.to(DEVICE)
            optimizer.zero_grad()
            out = model.forward(image)

            ### CALCULATE LOSS ##
            loss = loss_fn(out, label)
            training_losses.append(loss.item())

            ### CALCULATE ACCURACY ###
            predictions = torch.argmax(out, axis=1)
            accuracy = (predictions == label).sum() / len(predictions)
            training_accuracies.append(accuracy.item())

            loss.backward()
            optimizer.step()

        model.eval() # Turn Off Batchnorm
        for image, label in tqdm(valloader):
            image, label = image.to(DEVICE), label.to(DEVICE)
            with torch.no_grad():
                out = model.forward(image)

                ### CALCULATE LOSS ##
                loss = loss_fn(out, label)
                validation_losses.append(loss.item())

                ### CALCULATE ACCURACY ###
                predictions = torch.argmax(out, axis=1)
                accuracy = (predictions == label).sum() / len(predictions)
                validation_accuracies.append(accuracy.item())

        training_loss_mean, training_acc_mean = np.mean(training_losses), np.mean(training_accuracies)
        valid_loss_mean, valid_acc_mean = np.mean(validation_losses), np.mean(validation_accuracies)

        log_training["epoch"].append(epoch)
        log_training["training_loss"].append(training_loss_mean)
        log_training["training_acc"].append(training_acc_mean)
        log_training["validation_loss"].append(valid_loss_mean)
        log_training["validation_acc"].append(valid_acc_mean)

        print("Training Loss:", training_loss_mean)
        print("Training Acc:", training_acc_mean)
        print("Validation Loss:", valid_loss_mean)
        print("Validation Acc:", valid_acc_mean)

    return log_training, model

In [None]:
### SELECT DEVICE ###
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Training on Device {DEVICE}")

### MODEL TRAINING INPUTS ###
epochs = 10
loss_fn = nn.CrossEntropyLoss()
batch_size = 128

### BUILD DATALOADERS ###
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
valloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

### ResNet101 With Residuals ###
model = ResNet101(residual=True) # Use ResNet50 if there is Memory Constraints
model = model.to(DEVICE)
optimizer = optim.Adam(params=model.parameters(), lr=0.0001)

print("Training With Residuals")
resnet_w_resid, w_model = train(model=model,
                                device=DEVICE,
                                epochs=epochs,
                                optimizer=optimizer,
                                loss_fn=loss_fn,
                                batch_size=batch_size,
                                trainloader=trainloader,
                                valloader=valloader)


### ResNet101 Without Residuals ###
model = ResNet101(residual=False) # Use ResNet50 if there is Memory Constraints
model = model.to(DEVICE)
optimizer = optim.Adam(params=model.parameters(), lr=0.0001)

print("Training Without Residuals")
resnet_wo_resid, wo_model = train(model=model,
                                  device=DEVICE,
                                  epochs=epochs,
                                  optimizer=optimizer,
                                  loss_fn=loss_fn,
                                  batch_size=batch_size,
                                  trainloader=trainloader,
                                  valloader=valloader)