# Common Test I. Multi-Class Classification

Task: Build a model for classifying the images into lenses using PyTorch or Keras. Pick the most appropriate approach and discuss your strategy.

## Step 1: Imports
We start by importing everything we will need to work with and visualize the data. I am using PyTorch to create my final solution.

In [4]:
import random
import os

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchmetrics
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import v2
from tqdm.auto import tqdm

In [5]:
# Seed everything for reproducibility
SEED = 1
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)

## Creating the dataset

The dataset contains 2 folders, train and val. Train contains 3 folders(no, sphere, and vort) with 10,000 images each and val contains the same 3 folders with 2,500 images each.

I will start by creating a PyTorch Dataset to store all these images. As they are stored with the same name across different folders, I have manual checks in \__getitem__ to determine the exact folder and index to load the image from.

In [10]:
class StrongLensingDataset(Dataset):
    def __init__(self, imgs, train=True, transform=None):
        if(train == True):
            folder = "train"
        else:
            folder = "val"
        self.imgs = os.path.join(imgs, folder)
        print(sum(1 for _, _, files in os.walk(self.imgs) for f in files))
        self.len = sum(1 for _, _, files in os.walk(self.imgs) for f in files)
        self.transform = transform
        self.train = train

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        if self.train:
            if(idx <= 10000):
                clss = "no"
                label = 0
            elif(idx <= 20000):
                clss = "sphere"
                label  = 1
            elif(idx <= 30000):
                clss = "vort"
                label = 2
            idx = idx % 10000 + 1
        else:
            if(idx <= 2500):
                clss = "no"
                label = 0
            elif(idx <= 5000):
                clss = "sphere"
                label  = 1
            elif(idx <= 7500):
                clss = "vort"
                label = 2
            idx = idx % 2500 + 1       
        path = os.path.join(self.imgs, clss, f"{idx}.npy")
        img = torch.from_numpy(np.load(path))
        img = img.to(torch.float32)

        if self.transform:
            img = self.transform(img)
        return img, label

With a seperate function the create the DataLoader, I have decided on 3 transformations - Random rotations up to 90 degrees, random horizontal flips, and random vertical flips. This will help prevent overfitting as a good regularization technique.

The images are also normalized along the datasets mean and variance to ensure a mean 0, variance 1 distribution.

In [11]:
def get_dataloader(file_path, train=True, batch_size=16, num_workers=1):
    transform = v2.Compose([
        v2.RandomRotation(90),
        v2.RandomHorizontalFlip(p=0.5),
        v2.RandomVerticalFlip(p=0.5),
        v2.Normalize(mean=[0.0617], std=[0.1135]), # Values chosen to ensure mean 0 var 1
    ])
    if train:
        train_dataset = StrongLensingDataset(file_path, train=train, transform=transform)
    else:
        train_dataset = StrongLensingDataset(file_path, train=train, transform=None)
    train_loader = DataLoader(train_dataset, batch_size, shuffle=True, num_workers=num_workers, pin_memory=True, persistent_workers=True)
    return train_loader

## Model

For the model, I settled on a ResNet18 architecture with slight modifications. I replace Relu activations with Silu alongside replacing the initial 7x7 stride 2 convolution with a 3x3 stride 1. This gave slightly more accurate results for the most part.

In [12]:
class BasicBlock(nn.Module):
	expansion = 1
	def __init__(self, in_planes, planes, stride=1):
		super().__init__()
		self.conv1 = nn.Sequential(
			nn.Conv2d(in_planes, planes, kernel_size=3, padding=1, bias=False),
			nn.BatchNorm2d(planes),
			nn.SiLU()
		)
		self.conv2 = nn.Sequential(
			nn.Conv2d(planes, planes, kernel_size=3, padding=1, stride=stride, bias=False),
			nn.BatchNorm2d(planes),
			nn.SiLU()
		)
		self.shortcut = nn.Identity()
		if stride != 1 or in_planes != self.expansion * planes:
			self.shortcut = nn.Sequential(
				nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
				nn.BatchNorm2d(self.expansion * planes)
			)
	def forward(self, x):
		out = self.conv1(x)
		out = self.conv2(out)

		out += self.shortcut(x)

		return F.silu(out)

class ResNet(nn.Module):
	def __init__(self, block, layers, planes=[64, 128, 256, 512], classes=3):
		super().__init__()
		self.in_planes = 64
		self.block = block
		self.layers = layers
		self.planes = planes
		self.classes = classes

		self.initial_conv = nn.Conv2d(1, 64, kernel_size=3, padding=1, bias=False)
		self.initial_bn = nn.BatchNorm2d(64)

		self.layer1 = self._make_layer(block, layers[0], planes[0], stride=1)
		self.layer2 = self._make_layer(block, layers[1], planes[1], stride=2)
		self.layer3 = self._make_layer(block, layers[2], planes[2], stride=2)
		self.layer4 = self._make_layer(block, layers[3], planes[3], stride=2)

		self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
		self.fc = nn.Linear(planes[3] * block.expansion, classes)

	def _make_layer(self, block, layer, plane, stride=1):
		strides = [stride] + (layer - 1) * [1]

		convs = []
		for stride in strides:
			convs.append(block(self.in_planes, plane, stride))
			self.in_planes = plane * block.expansion
		return nn.Sequential(*convs)

	def forward(self, x):
		out = self.initial_conv(x)
		out = self.initial_bn(out)

		out = self.layer1(out)
		out = self.layer2(out)
		out = self.layer3(out)
		out = self.layer4(out)
		out = self.avgpool(out)
		out = out.view(out.size(0), -1)
		out = self.fc(out)
		return out

## Training

Start creating all training parameters and load everything up.

In [13]:
# Create dataloaders
train_loader = get_dataloader("dataset", train=True, batch_size=128)
test_loader = get_dataloader("dataset", train=False, batch_size=128)

30000
7500


In [22]:
# Create model, loss, optimizer, and scheduler
model = ResNet(BasicBlock, [2, 2, 2, 2], [64, 128, 256, 512], 3) # Resnet 18
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor = 0.5, patience = 5)
epochs = 120

print("Model size:", sum(p.numel() for p in model.parameters()))

Model size: 11169219


In [23]:
# Some lists for visualization after
t_epochs = []
t_loss = []
t_auc = []
t_accuracy = []

In [32]:
def train(model, optimizer, scheduler, loss_fn, train_loader, test_loader, epochs=100, device=torch.device("cuda")):
    outputfile = open("output.txt", "w")
    model.train()
    model = nn.DataParallel(model)
    model = model.to(device)
    auroc = torchmetrics.AUROC(task="multiclass", num_classes=3)
    scaler = torch.cuda.amp.GradScaler()
    for epoch in tqdm(range(epochs)):
        print("Learning Rate:", [group['lr'] for group in optimizer.param_groups])
        running_loss = 0.0
        running_accuracy = 0.0
        running_auroc = 0.0
        batches = 0
        for image, label in tqdm(train_loader, desc="Train"):
            image = image.to(device)
            label = label.to(device)

            optimizer.zero_grad()
            with torch.autocast(device_type='cuda', dtype=torch.float16):
                output = model(image)
                loss = loss_fn(output, label)

            _, predicted = torch.max(output, 1)
            total_correct = (predicted == label).sum().item()
            total_samples = label.size(0)
            scores = auroc(F.softmax(output, dim=1), label)

            running_loss += loss.item()
            running_accuracy += 100 * (total_correct / total_samples)
            running_auroc += scores.item()
            batches += 1

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        epoch_loss_train = running_loss / batches
        epoch_accuracy_train = running_accuracy / batches
        epoch_auroc_train = running_auroc / batches
        scheduler.step(epoch_loss_train)
        
        t_epochs.append(epoch)
        t_loss.append(epoch_loss_train)
        t_accuracy.append(epoch_accuracy_train)
        t_auc.append(epoch_auroc_train)
        with torch.no_grad():
            running_loss = 0.0
            running_accuracy = 0.0
            running_auroc = 0.0
            batches = 0
            for image, label in tqdm(test_loader, desc="Test"):
                auroc = torchmetrics.AUROC(task="multiclass", num_classes=3)
                image = image.to(device)
                label = label.to(device)

                output = model(image)
                loss = loss_fn(output, label)
                
                _, predicted = torch.max(output, 1)
                total_correct = (predicted == label).sum().item()
                total_samples = label.size(0)
                scores = auroc(F.softmax(output, dim=1), label)

                running_loss += loss.item()
                running_accuracy += 100 * (total_correct / total_samples)
                running_auroc += scores.item()
                batches += 1
            epoch_loss_val = running_loss / batches
            epoch_accuracy_val = running_accuracy / batches
            epoch_auroc_val = running_auroc / batches
        print(f"Epoch: {epoch}")
        print("Train:")
        print(f"\tLoss: {epoch_loss_train}, Accuracy: {epoch_accuracy_train}, AUROC: {epoch_auroc_train}")
        print("Validation:")
        print(f"\tLoss: {epoch_loss_val}, Accuracy: {epoch_accuracy_val}, AUROC: {epoch_auroc_val}")
        outputfile.write(f"Epoch: {epoch}\n")
        outputfile.write("Train:\n")
        outputfile.write(f"\tLoss: {epoch_loss_train}, Accuracy: {epoch_accuracy_train}, AUROC: {epoch_auroc_train}\n")
        outputfile.write("Validation:\n")
        outputfile.write(f"\tLoss: {epoch_loss_val}, Accuracy: {epoch_accuracy_val}, AUROC: {epoch_auroc_val}\n")
    outputfile.close()
    torch.save(model.module.state_dict(), "model.pt")

In [33]:
train(model, optimizer, scheduler, loss_fn, train_loader, test_loader, epochs, device=torch.device("cuda"))

0it [00:00, ?it/s]