In [1]:
import os
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.datasets import OxfordIIITPet
from torchsummary import summary
import copy
import numpy as np
import re

## Load Dataset

In [93]:
class NeuralNetwork():
    def __init__(self, basemodel, output, train_loader, val_loader, test_loader, unfreeze=0, parameters=None):
        # Init model
        self.model = copy.deepcopy(basemodel)
        # changing the fully connected layer
        self.model.fc = nn.Linear(self.model.fc.in_features, output)
        # Unfreeze the L last layers and fully connected last layer
        self.unfreeze_layers(unfreeze)
        self.model = self.model.to(device)

        self.optimizer = optim.Adam([{"params": self.model.fc.parameters(),"lr": 1e-3, "weight_decay": 1e-4},])
        self.criterion = nn.CrossEntropyLoss()

        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader

        self.parameters = parameters if parameters is not None else {}
        

    def unfreeze_layers(self, L):
      # freezing all layers
      for p in self.model.parameters():
          p.requires_grad = False
    
      layer = 0
      # unfreeze last L layers and fully connected layer
      for name, p in reversed(list(self.model.named_parameters())):
        if layer < L+1:
          if not name.endswith("bias"):
            layer += 1
          p.requires_grad = True

    def check_trainable_layers(self):
        for name, param in self.model.named_parameters():
            if param.requires_grad:
                print(f"{name} requires grad")
            else:
                print(f"{name} does NOT require grad")

                
    def run_epoch(self, loader, train=True):
        self.model.train() if train else self.model.eval()
        total_loss, correct, total = 0.0, 0, 0
        for imgs, labs in loader:
            imgs, labs = imgs.to(device), labs.to(device)
            if train:
                self.optimizer.zero_grad()
            logits = self.model(imgs)
            loss   = self.criterion(logits, labs)
            if train:
                loss.backward()
                self.optimizer.step()
            total_loss += loss.item() * imgs.size(0)
            preds      = logits.argmax(dim=1)
            correct   += (preds == labs).sum().item()
            total     += imgs.size(0)
        return total_loss/total, correct/total*100


    def test_model(self):
      test_loss, test_acc = self.run_epoch(self.test_loader, train=False)
      print(f"Test Accuracy: {test_acc:.2f}%")
      return test_acc
        
    def train(self, epochs, path="model.pth"):
      best_val_acc = 0.0
      for epoch in range(1,epochs+1):
    
        # gradually unfreeze layers for strategy 2 according to rate parameter
        if 'gradual_unfreezing' in self.parameters:
            uf_rate = 1 if not 'unfreezing_rate' in self.parameters else parameters['unfreezing_rate']
            L = np.floor(uf_rate*(1+epoch)) # allow for rates < 1
            if L < 1:
                L = 1
            self.unfreeze_layers(L)
    
        tr_loss, tr_acc = self.run_epoch(self.train_loader, train=True)
        val_loss, val_acc = self.run_epoch(self.val_loader, train=False)
        print(f"Epoch {epoch:2d} | Train: {tr_acc:.2f}% | Val: {val_acc:.2f}%")
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(self.model.state_dict(), path)
    
      test_acc = self.test_model()
      return test_acc



In [2]:
OxfordIIITPet(root="data", download=True)

100%|██████████| 792M/792M [00:36<00:00, 21.6MB/s]
100%|██████████| 19.2M/19.2M [00:02<00:00, 9.29MB/s]


Dataset OxfordIIITPet
    Number of datapoints: 3680
    Root location: data

In [3]:
annnotations_dir = os.path.join("data", "oxford-iiit-pet", "annotations")
lines_for_files = open(os.path.join(annnotations_dir, "list.txt")).read().splitlines()[6:]
species_map = {l.split()[0]: int(l.split()[2]) for l in lines_for_files}

In [4]:
with open(os.path.join(annnotations_dir, "trainval.txt")) as f:
    trainval_names = [l.split()[0] for l in f if l.strip()]
with open(os.path.join(annnotations_dir, "test.txt")) as f:
    test_names = [l.split()[0] for l in f if l.strip()]

In [8]:
class BinaryPet(Dataset):
    def __init__(self, root, names, species_map, transform=None):
        self.root = root
        self.names = names
        self.species_map = species_map
        self.transform = transform

    def __len__(self):
        return len(self.names)

    def __getitem__(self, idx):
        name = self.names[idx]
        img_path = os.path.join(
            self.root, "oxford-iiit-pet", "images", name + ".jpg"
        )
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = self.species_map[name] - 1  # 1→cat→0, 2→dog→1
        return img, label

In [5]:
IMG_SIZE = 224

In [6]:
transformIMG = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
])

In [9]:
full = BinaryPet("data", trainval_names, species_map, transform=transformIMG)
n_val = int(0.1 * len(full))
train_ds, val_ds = random_split(full, [len(full)-n_val, n_val])

In [10]:
test_ds = BinaryPet("data", test_names, species_map, transform=transformIMG)

In [11]:
batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=4, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)



# Binary Classification

## Model Set Up

In [31]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)
# freezing all
for p in model.parameters():
    p.requires_grad = False
# changing the fully connected layer
model.fc = nn.Linear(model.fc.in_features, 2)
# Unfreeze the fully connected last layer
for name, p in model.named_parameters():
    if name.startswith("fc"):
        p.requires_grad = True
model = model.to(device)

In [25]:
criterion = nn.CrossEntropyLoss()

In [32]:
optimizer = optim.Adam([
    {"params": model.fc.parameters(),     "lr": 1e-3, "weight_decay": 1e-4},
])

## Training

In [33]:
def run_epoch(model, loader, train=True, criterion=criterion, optimizer=optimizer): # TODO: Include criterion and optimizer in a neater way
    model.train() if train else model.eval()
    total_loss, correct, total = 0.0, 0, 0
    for imgs, labs in loader:
        imgs, labs = imgs.to(device), labs.to(device)
        if train:
            optimizer.zero_grad()
        logits = model(imgs)
        loss   = criterion(logits, labs)
        if train:
            loss.backward()
            optimizer.step()
        total_loss += loss.item() * imgs.size(0)
        preds      = logits.argmax(dim=1)
        correct   += (preds == labs).sum().item()
        total     += imgs.size(0)
    return total_loss/total, correct/total*100


In [None]:
best_val_acc = 0.0
for epoch in range(1, 20):
    tr_loss, tr_acc = run_epoch(model, train_loader, train=True)
    val_loss, val_acc = run_epoch(model, val_loader, train=False)
    print(f"Epoch {epoch:2d} | Train: {tr_acc:.2f}% | Val: {val_acc:.2f}%")
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "best_binary_resnet34.pth")

Epoch  1 | Train: 93.69% | Val: 96.74%
Epoch  2 | Train: 98.04% | Val: 98.37%
Epoch  3 | Train: 98.13% | Val: 98.10%
Epoch  4 | Train: 98.82% | Val: 97.01%
Epoch  5 | Train: 98.31% | Val: 97.83%
Epoch  6 | Train: 98.46% | Val: 99.18%
Epoch  7 | Train: 98.85% | Val: 98.37%
Epoch  8 | Train: 99.03% | Val: 97.83%
Epoch  9 | Train: 99.06% | Val: 98.37%
Epoch 10 | Train: 98.85% | Val: 98.64%
Epoch 11 | Train: 99.15% | Val: 99.18%
Epoch 12 | Train: 98.88% | Val: 97.55%
Epoch 13 | Train: 99.37% | Val: 98.37%
Epoch 14 | Train: 99.03% | Val: 98.91%
Epoch 15 | Train: 98.97% | Val: 99.46%
Epoch 16 | Train: 99.21% | Val: 98.91%
Epoch 17 | Train: 99.25% | Val: 98.64%
Epoch 18 | Train: 99.58% | Val: 98.64%
Epoch 19 | Train: 99.43% | Val: 98.91%


In [None]:
model.load_state_dict(torch.load("best_binary_resnet34.pth"))
test_loss, test_acc = run_epoch(model, test_loader, train=False)
print(f"Test Accuracy: {test_acc:.2f}%")



Test Accuracy: 99.21%


# Multi Class Classification

TODO:
* different learning rates/learning rate schedulers for different layers
* data augmentation
* Effect of fine-tuning batch-norm parameters
* Train with Imbalanced Dataset
* Testing and logging of the above
* gradual_unfreezing testing
* unfreezing l layers at a time testing
  
DONE:
* gradual_unfreezing
* unfreezing l layers at a time

## Load Dataset

In [77]:
annnotations_dir = os.path.join("data", "oxford-iiit-pet", "annotations")
lines_for_files = open(os.path.join(annnotations_dir, "list.txt")).read().splitlines()[6:]
breeds_map = {l.split()[0]:re.split(r'_\d+', l.split()[0])[0] for l in lines_for_files}

In [78]:
breed_to_id = {val: i for i, val in enumerate(sorted(set(breeds_map.values())))}
id_to_breed = {v: k for k, v in breed_to_id.items()}

In [79]:
class MultiClassPet(Dataset):
    def __init__(self, root, names, breeds_map, transform=None):
        self.root = root
        self.names = names
        self.breeds_map = breeds_map
        self.transform = transform

    def __len__(self):
        return len(self.names)

    def __getitem__(self, idx):
        name = self.names[idx]
        img_path = os.path.join(
            self.root, "oxford-iiit-pet", "images", name + ".jpg"
        )
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        label = breed_to_id[self.breeds_map[name]]
        return img, label

In [80]:
full = MultiClassPet("data", trainval_names, breeds_map, transform=transformIMG)
n_val = int(0.1 * len(full))
train_ds, val_ds = random_split(full, [len(full)-n_val, n_val])
test_ds = MultiClassPet("data", test_names, breeds_map, transform=transformIMG)

In [81]:
batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  num_workers=4, pin_memory=True)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)

## Model Set Up

In [83]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
basemodel = models.resnet34(weights=models.ResNet34_Weights.DEFAULT)

In [95]:
# Strategy 1: train with l layers unfrozen simultaneously
accs = []
for L in range(1, 10):
  print(f"Start training model with {L} last layers trainable")
  modelmulti = NeuralNetwork(basemodel, 37, train_loader, val_loader, test_loader, unfreeze=L)  
  accs.append(modelmulti.train(10))

Start training model with 1 last layers trainable
Epoch  1 | Train: 61.32% | Val: 87.77%


KeyboardInterrupt: 

In [49]:
# Strategy 2: train with gradual unfreezing
parameters = {}
parameters['gradual_unfreezing'] = True
accs = []

# test rates in steps of two
for rate in [0.3, 0.5, 0.8, 1, 2, 3]:
  print(f"Start training model with uf_rate {rate}")
  parameters['unfreezing_rate'] = rate
  modelmulti = NeuralNetwork(basemodel, 37, train_loader, val_loader, test_loader, unfreeze=1, parameters=parameters)   # start with only one unfrozen layer
  accs.append(modelmulti.train(10))

Start training model with uf_rate 0.3
Epoch  1 | Train: 58.18% | Val: 85.05%
Epoch  2 | Train: 87.32% | Val: 90.49%


KeyboardInterrupt: 