# Imports

In [1]:
import os, sys, random, time, copy, glob

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

import lightning as L

from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet18, ResNet18_Weights
from torchvision import transforms

from sklearn.metrics import classification_report, accuracy_score
from sklearn.metrics import roc_auc_score, f1_score
from sklearn.model_selection import train_test_split

from torchinfo import summary

from PIL import Image

import warnings

warnings.filterwarnings("ignore")

In [2]:
os.chdir("applied-ml-assignment-5/src")

# Setting up the data

In [3]:
CHICKEN_IMAGES_PATH = "../data/chicken/data"
DUCK_IMAGES_PATH = "../data/duck/data"

In [22]:
def create_annotations_file(paths: list[str]):
    # training dataset
    train = pd.DataFrame(columns=["path", "class"])
    for path in paths:
        full_path = os.path.join(path, "train")
        image_names = [f"{path}/train/{_}" for _ in os.listdir(full_path)[:100]]
        label = path.split("/")[2]
        temp = pd.DataFrame({"path": image_names, "class": label})
        train = pd.concat([train, temp])

    # validation dataset
    val = pd.DataFrame(columns=["path", "class"])
    for path in paths:
        full_path = os.path.join(path, "val")
        image_names = [f"{path}/val/{_}" for _ in os.listdir(full_path)[:50]]
        label = path.split("/")[2]
        temp = pd.DataFrame({"path": image_names, "class": label})
        val = pd.concat([val, temp])

    # test dataset
    test = pd.DataFrame(columns=["path", "class"])
    for path in paths:
        full_path = os.path.join(path, "test")
        image_names = [f"{path}/test/{_}" for _ in os.listdir(full_path)[:100]]
        label = path.split("/")[2]
        temp = pd.DataFrame({"path": image_names, "class": label})
        test = pd.concat([test, temp])

    train = train.reset_index(drop=True)
    val = val.reset_index(drop=True)
    test = test.reset_index(drop=True)

    return train, val, test


train, val, test = create_annotations_file([CHICKEN_IMAGES_PATH, DUCK_IMAGES_PATH])

Next we will create the dataset

In [41]:
class ChickenOrDuck(Dataset):

    def __init__(
        self, annotations_file, transforms=transforms.Compose([transforms.ToTensor()])
    ):
        super().__init__()
        self.annotations_file = annotations_file
        self.transforms = transforms

    def __getitem__(self, index):
        image_path = self.annotations_file.loc[index, "path"]
        image_class = self.annotations_file.loc[index, "class"]
        image_label = 1 if image_class == "chicken" else 0

        image = Image.open(image_path).convert("RGB")
        image = image.resize((128, 128))
        image = self.transforms(image)

        return image, image_label

    def __len__(self):
        return len(self.annotations_file)

In [42]:
mean, std = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
image_transforms = {
    "train": transforms.Compose(
        [
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.ToTensor(),
            transforms.Normalize(mean=mean, std=std),
        ]
    ),
    "val": transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize(mean=mean, std=std),
        ]
    ),
}

train_dataset = ChickenOrDuck(train, image_transforms["train"])
val_dataset = ChickenOrDuck(val, image_transforms["val"])
test_dataset = ChickenOrDuck(test, image_transforms["val"])

## Training Hyperparameters

In [43]:
BATCH_SIZE = 32
LR = 2e-3
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [44]:
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [45]:
model = resnet18(ResNet18_Weights.IMAGENET1K_V1)
num_in = model.fc.in_features
model.fc = nn.Linear(num_in, 2)
summary(model, (32, 3, 128, 128))

Layer (type:depth-idx)                   Output Shape              Param #
ResNet                                   [32, 2]                   --
├─Conv2d: 1-1                            [32, 64, 64, 64]          9,408
├─BatchNorm2d: 1-2                       [32, 64, 64, 64]          128
├─ReLU: 1-3                              [32, 64, 64, 64]          --
├─MaxPool2d: 1-4                         [32, 64, 32, 32]          --
├─Sequential: 1-5                        [32, 64, 32, 32]          --
│    └─BasicBlock: 2-1                   [32, 64, 32, 32]          --
│    │    └─Conv2d: 3-1                  [32, 64, 32, 32]          36,864
│    │    └─BatchNorm2d: 3-2             [32, 64, 32, 32]          128
│    │    └─ReLU: 3-3                    [32, 64, 32, 32]          --
│    │    └─Conv2d: 3-4                  [32, 64, 32, 32]          36,864
│    │    └─BatchNorm2d: 3-5             [32, 64, 32, 32]          128
│    │    └─ReLU: 3-6                    [32, 64, 32, 32]          --
│

In [46]:
class ImagenetTransferLearning(L.LightningModule):
    def __init__(self):
        super().__init__()

        backbone = resnet18(ResNet18_Weights.IMAGENET1K_V1)
        num_in = backbone.fc.in_features

        layers = list(backbone.children())[:-1]
        self.feature_extractor = nn.Sequential(*layers)
        self.feature_extractor.eval()

        num_target_classes = 2
        self.classifier = nn.Linear(num_in, num_target_classes)

    # def forward(self, x):
    #     with torch.no_grad():
    #         representations = self.feature_extractor(x).flatten(1)
    #     x = self.classifier(representations)
    #     return x

    def training_step(self, batch, batch_idx, datalaoder_idx=None):
        x, y = batch
        with torch.no_grad():
            representations = self.feature_extractor(x).flatten(1)
        x = self.classifier(representations)
        loss = F.cross_entropy(x, y)
        self.log("train_loss", loss)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(
            params=filter(lambda p: p.requires_grad, self.classifier.parameters()),
            lr=LR,
        )
        return optimizer

In [47]:
model = ImagenetTransferLearning()
trainer = L.Trainer(
    max_epochs=5,
)
trainer.fit(model, train_dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name              | Type       | Params
-------------------------------------------------
0 | feature_extractor | Sequential | 11.2 M
1 | classifier        | Linear     | 1.0 K 
-------------------------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.710    Total estimated model params size (MB)


Training: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.
