# Exercise 3.2

In [1]:
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
import pickle
from collections import OrderedDict
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix, precision_score, recall_score, accuracy_score
from tqdm import tqdm


In [2]:
# Define the model
case = 'a'
num_classes = 10
num_epochs = 500

In [3]:
if case == 'a':
    inputs, n_hidden0, n_hidden1, out = 784*3, 64, 16, 10
    ckpt_pth = 'best_model_NN.pth'
    model = nn.Sequential(
    nn.Linear(inputs, n_hidden0, bias=True),
    nn.Tanh(),
    nn.Linear(n_hidden0, n_hidden1, bias=True),
    nn.Tanh(),
    nn.Linear(n_hidden1, out, bias=True),
    nn.Softmax()
    ).to('cuda')
elif case == 'b':
    ckpt_pth = 'best_model_CNN.pth'
    preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    model = torch.hub.load('pytorch/vision:v0.10.0', 'alexnet', pretrained=True)
    model.classifier[6] = nn.Linear(4096, num_classes)
    model = model.to('cuda')
else:
    raise ValueError('Case choice is invalid')

model.train()

Sequential(
  (0): Linear(in_features=2352, out_features=64, bias=True)
  (1): Tanh()
  (2): Linear(in_features=64, out_features=16, bias=True)
  (3): Tanh()
  (4): Linear(in_features=16, out_features=10, bias=True)
  (5): Softmax(dim=None)
)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
# Load the data
with open('/content/drive/MyDrive/0_development_data.pkl', 'rb') as f:
    devel_imgs = pickle.load(f)
train_imgs = devel_imgs[0][::2]
val_imgs = devel_imgs[0][1::2]
with open('/content/drive/MyDrive/0_test_data.pkl', 'rb') as f:
    test_imgs = pickle.load(f)
test_imgs = test_imgs[0]

In [8]:
len(train_imgs), len(val_imgs), len(test_imgs)

(10499, 10498, 21003)

In [9]:
# Extract the labels
labels_train = [int(i[0].split('/')[-2]) for i in train_imgs]
labels_val = [int(i[0].split('/')[-2]) for i in val_imgs]
labels_test = [int(i[0].split('/')[-2]) for i in test_imgs]

In [10]:
# Define the data loader and training objects
class CustomDataset(Dataset):
    def __init__(self, image_list, labels, transform=None):
        self.image_list = image_list
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        if case == 'a':
            image = self.image_list[idx].astype(float)
            image /= 255.0 # This should be at dataset level
            image -= np.sum(np.sum(image,0),0) / (image.shape[0]*image.shape[1])
        elif case == 'b':
            img_tmp = self.image_list[idx]
            image = preprocess(Image.fromarray(img_tmp))
        label = self.labels[idx]
        return image, label

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
if case == 'a':
    optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
else:
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Create CustomDataset instance
train_list = [i[1] for i in train_imgs]
val_list = [i[1] for i in val_imgs]
test_list = [i[1] for i in test_imgs]
dataset_train = CustomDataset(train_list, labels_train, transform=None)
dataset_val = CustomDataset(val_list, labels_val, transform=None)
dataset_test = CustomDataset(test_list, labels_test, transform=None)

# Create DataLoader
batch_size = 32
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)


In [None]:
# Training loop
early_stopping_patience = 10

best_val_loss = 10000.0
for epoch in range(num_epochs):
    running_loss, running_val_loss = 0.0, 0.0
    model.train()
    for inputs_, labels_ in tqdm(dataloader_train):

        if case == 'a': inputs_ = torch.reshape(inputs_, (inputs_.shape[0], -1))
        inputs_, labels_ = inputs_.to(torch.float).to('cuda'), labels_.to('cuda')
        optimizer.zero_grad()
        outputs = model(inputs_)
        loss = criterion(outputs, labels_)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    model.eval()
    with torch.no_grad():
        for inputs_val, labels_val in tqdm(dataloader_val):
            if case == 'a': inputs_val = torch.reshape(inputs_val, (inputs_val.shape[0], -1))
            inputs_val, labels_val = inputs_val.to(torch.float).to('cuda'), labels_val.to('cuda')
            outputs_val = model(inputs_val)
            val_loss = criterion(outputs_val, labels_val)
            running_val_loss += val_loss.item()

    epoch_val_loss = running_val_loss/len(dataloader_val)
    if epoch_val_loss < best_val_loss:
        early_stopping_counter = 0
        best_val_loss = float(epoch_val_loss)
        torch.save(model.state_dict(), ckpt_pth)
    else:
        early_stopping_counter += 1
        if early_stopping_counter==early_stopping_patience:
            print('-------- Early Stopping ------------')
            print(f'Epoch {epoch+1}, Train loss: {running_loss/len(dataloader_train)}, Val loss: {running_val_loss/len(dataloader_val)}')
            break

    print(f'Epoch {epoch+1}, Train loss: {running_loss/len(dataloader_train)}, Val loss: {epoch_val_loss}')

In [12]:
# Make predictions on the train data
model.load_state_dict(torch.load(ckpt_pth, weights_only=True))
model.eval()

Sequential(
  (0): Linear(in_features=2352, out_features=64, bias=True)
  (1): Tanh()
  (2): Linear(in_features=64, out_features=16, bias=True)
  (3): Tanh()
  (4): Linear(in_features=16, out_features=10, bias=True)
  (5): Softmax(dim=None)
)

In [13]:
# Inference function
def get_predictions(input_batch, model):

    # move the input and model to GPU for speed if available
    if torch.cuda.is_available():
        input_batch = input_batch.to('cuda')
        model.to('cuda')

    with torch.no_grad():
        output = model(input_batch)

    probabilities = torch.nn.functional.softmax(output, dim=1)
    return probabilities

In [14]:
# Run inference
preds_list = []
batch_size = 64
with torch.no_grad():
    for inputs_test, _ in tqdm(dataloader_test):
        if case == 'a': inputs_test = torch.reshape(inputs_test, (inputs_test.shape[0], -1))
        inputs_test = inputs_test.to(torch.float).to('cuda')
        preds_list.append(get_predictions(inputs_test, model).cpu().numpy())
final_preds = np.argmax(np.reshape(np.vstack(preds_list), (-1,10)),1)

100%|██████████| 657/657 [00:01<00:00, 556.51it/s]


In [15]:
# Generate all interesting metrics
def multiclass_metrics(y_true, y_pred, labels):
    """
    Compute per-class accuracy, sensitivity (recall), specificity, and precision.

    y_true, y_pred : array-like of shape (n_samples,)
    labels         : list of class labels, e.g. [0,1,...,9]
    """
    # Compute the full confusion matrix once
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    # cm[i, j] is count of true class i predicted as class j

    # Prepare containers
    metrics = {
        "class": [],
        "accuracy": [],
        "sensitivity (recall)": [],
        "specificity": [],
        "precision": []
    }

    # Total samples
    total = cm.sum()

    for idx, cls in enumerate(labels):
        TP = cm[idx, idx]
        FN = cm[idx, :].sum() - TP
        FP = cm[:, idx].sum() - TP
        TN = total - TP - FP - FN

        # Per-class metrics
        acc = (TP + TN) / total
        sens = TP / (TP + FN) if (TP + FN) > 0 else 0.0
        spec = TN / (TN + FP) if (TN + FP) > 0 else 0.0
        prec = TP / (TP + FP) if (TP + FP) > 0 else 0.0

        metrics["class"].append(cls)
        metrics["accuracy"].append(acc)
        metrics["sensitivity (recall)"].append(sens)
        metrics["specificity"].append(spec)
        metrics["precision"].append(prec)

    return pd.DataFrame(metrics)


In [16]:
# Report the performance
report_df = multiclass_metrics(labels_test, final_preds, np.arange(10).tolist()).set_index('class')
report_df

Unnamed: 0_level_0,accuracy,sensitivity (recall),specificity,precision
class,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,0.900728,0.0,0.998997,0.0
1,0.997905,0.987617,0.999196,0.993557
2,0.994525,0.972714,0.996933,0.972249
3,0.994096,0.970129,0.996866,0.972811
4,0.996191,0.995088,0.996309,0.966603
5,0.901443,0.986301,0.893012,0.478039
6,0.997238,0.98695,0.998363,0.985046
7,0.99481,0.973194,0.997341,0.97719
8,0.99381,0.964075,0.996995,0.971726
9,0.994905,0.9766,0.996933,0.97242
