## Preprocessing PIE Dataset

#### Importing Libraries

In [None]:
import csv
from PIL import Image
import numpy as np
import os

#### Converting text files to CSV

In [None]:
def txt_to_csv(txt_file, csv_file, delimiter=' '):
    with open(txt_file, 'r') as infile:
        with open(csv_file, 'w', newline='') as outfile:
            writer = csv.writer(outfile)
            for line in infile:
                # Assuming each line contains data separated by a delimiter
                # You can adjust the delimiter as per your text file structure
                data = line.strip().split(delimiter)
                writer.writerow(data)

# Converting each text file to csv one-by-one
txt_to_csv('/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32/PIE_32x32/StTrainFile.txt',
            '/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32/PIE_32x32/train.csv')


#### Converting CSV file to images

In [None]:
# Create a directory to save the images
output_dir = 'PIE_images'
os.makedirs(output_dir, exist_ok=True)

# Path to the CSV file
csv_file = '/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32 (old)/PIE_32x32 (old)/test.csv'

# Open the CSV file
with open(csv_file, 'r') as file:
    reader = csv.reader(file)
    next(reader)  # Skip header row
    for idx, row in enumerate(reader):
        # Extract image pixel values and class label
        pixel_values = [float(value) for value in row[:-1]]
        label = int(row[-1])

        # Convert pixel values to 32x32 image
        image_array = np.array(pixel_values, dtype=np.float32).reshape(32, 32)
        image = Image.fromarray((image_array * 255).astype(np.uint8))

        # Rotate the image 90 degrees to the left
        image = image.rotate(-90, expand=True)

        # Create a directory for the class if it doesn't exist
        class_dir = os.path.join(output_dir, f'class_{label}')
        os.makedirs(class_dir, exist_ok=True)

        # Save the image with appropriate filename
        image_filename = f'PIE_image_{idx}.png'
        image_path = os.path.join(class_dir, image_filename)
        image.save(image_path)

        print(f'Saved image {idx} with label {label} to {image_path}')

## Training the model

#### importing libraries

In [None]:
import torch
import torchvision
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.utils.data import DataLoader
from torchinfo import summary
from torch.utils.data import random_split
from torch import nn
from torchvision import transforms
import numpy as np
from tqdm.auto import tqdm
from torcheval.metrics.functional import multiclass_f1_score
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.preprocessing import label_binarize
from sklearn.metrics import confusion_matrix
import seaborn as sns

#### Hyperparameters Used

In [None]:
# Hyperparameters
param = {
    'batch_size': 64,
    'lr': 0.001,
    'epochs': 100,
    'weight_decay': 2e-5,
    'num_workers': 8,
    'classes': None,
    # set the dataset name to either 'mnist', 'cifar10' or 'pie' and then rerun all the cells
    # from Training the model section
    'dataset_name': 'cifar10',
    'model_name': '',
}

# setting the name for model
param['model_name'] = 'resnet50' + '_' + param['dataset_name']

#### Loading the dataset

In [None]:
def load_dataset(dataset_name, data_transform1, data_transform2):
     # If the dataset is CIFAR10
    if dataset_name == 'cifar10':
        param['classes'] = 10
        param['epochs'] = 60

        data_transform1 = transforms.Compose(data_transform1)
        data_transform2 = transforms.Compose(data_transform2)

        # Downloading the CIFAR10 training dataset if its not present
        visual_dataset = torchvision.datasets.CIFAR10(root='dm_project/', train=False,
                                                     download=False, transform=transforms.ToTensor())
        train_dataset = torchvision.datasets.CIFAR10(root='dm_project/', train=True,
                                                     download=True, transform=data_transform1)
        test_dataset = torchvision.datasets.CIFAR10(root='dm_project/', train=False,
                                                    download=False, transform=data_transform2)
    elif dataset_name == 'mnist':
        param['classes'] = 10
        param['epochs'] = 50
        
        # Converting the MNIST dataset images from 1 to 3 channels
        new_transform1 = [transforms.Grayscale(num_output_channels=3)] + data_transform1
        new_transform2 = [transforms.Grayscale(num_output_channels=3)] + data_transform2

        data_transform1 = transforms.Compose(new_transform1)
        data_transform2 = transforms.Compose(new_transform2)

        # Downloading the MNIST training dataset if its not present
        visual_dataset = torchvision.datasets.MNIST(root='dm_project/', train=False,
                                                    download=False, transform=transforms.ToTensor())
        train_dataset = torchvision.datasets.MNIST(root='dm_project/', train=True,
                                                   download=True, transform=data_transform1)
        test_dataset = torchvision.datasets.MNIST(root='dm_project/', train=False,
                                                  download=False, transform=data_transform2)
    elif dataset_name == 'pie':
        param['classes'] = 68
        param['epochs'] = 100
        
        # Converting the PIE dataset images from 1 to 3 channels
        new_transform1 = [transforms.Grayscale(num_output_channels=3)] + data_transform1
        new_transform2 = [transforms.Grayscale(num_output_channels=3)] + data_transform2
        
        data_transform1 = transforms.Compose(new_transform1)
        data_transform2 = transforms.Compose(new_transform2)

        # Load the PIE training dataset from local directory
        # where the code file is present
        visual_dataset = torchvision.datasets.ImageFolder(root='/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32/train',
                                                           transform=transforms.ToTensor())
        train_dataset = torchvision.datasets.ImageFolder(root='/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32/train',
                                                          transform=data_transform1)
        test_dataset = torchvision.datasets.ImageFolder(root='/home/baa223/Badri/deeplearning/dm_project/dm_project/PIE_32x32/test',
                                                         transform=data_transform2)
    else:
        raise ValueError("Dataset not supported")
    
    return train_dataset, test_dataset, visual_dataset

## Training Dynamics

In [None]:
# Use the imagenet weights as a starting point
auto_weights = torchvision.models.ResNet50_Weights.IMAGENET1K_V2

# Defining the data augmentations for the dataset
transform1 = [
    transforms.RandomResizedCrop(32, interpolation=transforms.InterpolationMode.BILINEAR, antialias=True),
    transforms.RandomHorizontalFlip(),
    transforms.TrivialAugmentWide(interpolation=transforms.InterpolationMode.BILINEAR),
    transforms.PILToTensor(),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomErasing(p=0.1)
]

transform2 = [
    transforms.Resize((40, 40), interpolation=transforms.InterpolationMode.BILINEAR, antialias=True),
    transforms.CenterCrop(32),
    transforms.PILToTensor(),
    transforms.ConvertImageDtype(torch.float),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]


In [None]:
# creating train and test datasets
train_dataset, test_dataset, visual_dataset = load_dataset(param['dataset_name'], transform1, transform2)

# Using 10% of training dataset to create validation dataset
val_size = int(0.1 * len(train_dataset))
train_size = len(train_dataset) - val_size

train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

In [None]:
# creating dataloaders
train_dl = DataLoader(train_dataset, batch_size=param['batch_size'], 
                      shuffle=True, num_workers=param['num_workers'])
val_dl = DataLoader(val_dataset, batch_size=param['batch_size'], 
                    shuffle=False, num_workers=param['num_workers'])
test_dl = DataLoader(test_dataset, batch_size=param['batch_size'], 
                     shuffle=False, num_workers=param['num_workers'])

In [None]:
train_images, train_labels = next(iter(train_dl))
test_images, test_labels = next(iter(test_dl))

# Displaying the shape of the images and labels
print(f"Train Images Shape: {train_images.shape}, Train Labels Shape: {train_labels.shape}")
print(f"Test Images Shape: {test_images.shape}, Test Labels Shape: {test_labels.shape}")

#### Visualizing the dataset

In [None]:
# write a python function that takes in the dataset and shows us 6 random images
def show_images(dataset):
    plt.figure(figsize=(5, 5))
    for i in range(9):
        plt.subplot(3, 3, i+1)
        image, label = dataset[np.random.randint(0, len(dataset))]
        plt.imshow(image.permute(1, 2, 0))
        plt.title(f"Class: {label}")
        plt.axis('off')
    plt.show()

# Displaying 6 random images from the training dataset
show_images(visual_dataset)

## Training the model

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Load the model with the imagenet weights
model1 = torchvision.models.resnet50(weights=auto_weights)
# model1 = torchvision.models.resnet50()

# freeze all the layers except the last layer
# for params in model1.parameters():
#     params.requires_grad = False

# Change the last layer to output the number of classes in the dataset
model1.fc = nn.Linear(in_features=model1.fc.in_features, out_features=param['classes'], bias=True)
model1.to(device)
summary(model1, input_size=(32, 3, 255, 255),
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"])


In [None]:
# Use cross entropy loss and Adam optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model1.parameters(), lr=param['lr'],
                            weight_decay=param['weight_decay'])

# Set up metrics dictionary to log loss, accuracy and f1 score
metrics = {"train_loss": [],
           "train_acc": [],
           "val_loss": [],
           "val_acc": [],
           "train_f1": [],
           "val_f1": []
           }

In [None]:
max_val_acc = 0
train_acc, train_loss = 0, 0

for epoch in range(param['epochs']):
    # training
    model1.train()
    # set up a progress bar
    train_dl_progress = tqdm(enumerate(train_dl), total=len(train_dl),
                                 bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]")

    train_acc, train_loss = 0, 0
    # iterate over the training dataloader
    for i, (images, labels) in train_dl_progress:
        images, labels = images.to(device), labels.to(device)

        # get the model predictions
        y_pred_logits = model1(images)
        # get the class with the highest probability
        y_pred = y_pred_logits.argmax(dim=1)
        # calculate the f1 score
        train_f1 = multiclass_f1_score(y_pred, labels, num_classes=param['classes'])

        loss = loss_fn(y_pred_logits, labels)
        # cumulate the loss and accuracy
        train_loss += loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_acc += (y_pred == labels).sum().item() / len(y_pred_logits)
        train_dl_progress.set_description(f"Epoch: {epoch+1}/{param['epochs']}")

    # calculate the average loss and accuracy
    train_acc /= len(train_dl)
    train_loss /= len(train_dl)

    # evaluation of validation dataset
    model1.eval()
    val_loss, val_acc = 0, 0
    with torch.inference_mode():
        # iterate over the validation dataloader
        for val_images, val_labels in val_dl:
            val_images, val_labels = val_images.to(device), val_labels.to(device)

            # get the model predictions
            y_val_pred_logits = model1(val_images)
            # get the class with the highest probability
            y_val_pred = y_val_pred_logits.argmax(dim=1)

            # cumulate the loss and accuracy
            val_loss += loss_fn(y_val_pred_logits, val_labels)
            val_acc += (y_val_pred == val_labels).sum().item()/len(y_val_pred_logits)
            # caculate the f1 score
            val_f1 = multiclass_f1_score(y_val_pred, val_labels, num_classes=param['classes'])

        # calculate the average loss and accuracy
        val_loss /= len(val_dl)
        val_acc /= len(val_dl)

        # save the model with the highest validation accuracy
        if max_val_acc < val_acc:
            max_val_acc = val_acc
            print(f"model saved with val accuracy of {val_acc*100:.6f}")
            torch.save(model1.state_dict(), f"dm_project/{param['model_name']}.pt")

    # log the metrics
    metrics["train_loss"].append(train_loss.item()) # type: ignore
    metrics["train_acc"].append(train_acc)
    metrics["val_loss"].append(val_loss.item()) # type: ignore
    metrics["val_acc"].append(val_acc)
    metrics["train_f1"].append(train_f1.item())
    metrics["val_f1"].append(val_f1.item())
    
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc*100:.2f}%, '
        f'Validation Accuracy: {val_acc*100:.2f}\n')


## Visualizing the Results

#### Accuracy and Loss graph of the training process

In [None]:
# graph of loss and accuracy stored inside metrics dictionary
# train and val loss of the model
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(metrics["train_loss"], label="Train Loss")
plt.plot(metrics["val_loss"], label="Val Loss")

plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss vs Epochs")
plt.legend()

# train and val accuracy of the model
plt.subplot(1, 2, 2)
plt.plot(metrics["train_acc"], label="Train Acc")
plt.plot(metrics["val_acc"], label="Val Acc")

plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Epochs")
plt.legend()

plt.suptitle(f"Performance of Resnet50 on {param['dataset_name']} dataset")
plt.show()

#### Performance of the model on testing dataset

In [None]:
# initialize the saved model with same architecture as the trained model
saved_model = torchvision.models.resnet50()
# Change the last layer to output the number of classes in the dataset
saved_model.fc = nn.Linear(in_features=saved_model.fc.in_features, out_features=param['classes'], bias=True)

# load the trained model weights
saved_model.load_state_dict(torch.load(f"dm_project/{param['model_name']}.pt"))
saved_model.to(device)

test_loss, test_acc, test_auc = 0, 0, 0
test_gt, test_pred = [], []

# set the model to evaluation mode
saved_model.eval()
with torch.inference_mode():
    # iterate over the test dataloader
    for test_images, test_labels in test_dl:
        test_images, test_labels = test_images.to(device), test_labels.to(device)

        # get the model predictions
        y_test_pred_logits = saved_model(test_images)
        y_test_pred = y_test_pred_logits.argmax(dim=1)

        # store the ground truth and predictions as list for calculating AUC
        test_gt.extend(list(test_labels.cpu().numpy()))
        test_pred.extend(list(y_test_pred_logits.cpu().numpy()))

        # cumulate the loss and accuracy
        test_loss += loss_fn(y_test_pred_logits, test_labels)
        test_acc += (y_test_pred == test_labels).sum().item()/len(y_test_pred_logits)
        # calculate the f1 score
        test_f1 = multiclass_f1_score(y_test_pred, test_labels, num_classes=param['classes'])
        
    # calculate the average loss and accuracy
    test_loss /= len(test_dl)
    test_acc /= len(test_dl)

# convert the ground truth to one hot encoding
test_gt = label_binarize(test_gt, classes=np.arange(param['classes']))
# convert the predictions to a numpy array
test_pred = [[float(num) for num in arr.tolist()] for arr in test_pred]
test_pred = np.array(test_pred)

# calculate the AUC score for each class
auc_scores = []
fpr, tpr, roc_auc = dict(), dict(), dict()
for i in range(param['classes']):
    fpr[i], tpr[i], _ = roc_curve(test_gt[:, i], test_pred[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    auc_scores.append(roc_auc[i])

# Plot of a ROC curve for a specific class
for i in range(param['classes']):
    plt.plot(fpr[i], tpr[i], label='Class %d ROC curve (area = %0.2f)' % (i, roc_auc[i]))

avg_auc = np.mean(auc_scores)


# print the test loss, accuracy, f1 score and mean AUC
print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.4f} | Test F1: {test_f1.item():.4f} | Mean Test AUC: {avg_auc:.4f}")

# plotting the auc of roc curves
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
if param['classes'] <= 10:
    plt.legend(loc="lower right")
plt.show()

#### Ground Truth vs Predicted Labels

In [None]:
# a function that shows ground truth and predicted images for 6 random images
# images will look a little odd because we are loading them from train dataset
# which is using a lot of data augmentation techniques on these images.
def show_gt_pred(model, dataset, vis_ds):
    plt.figure(figsize=(5, 5))
    for i in range(9):
        plt.subplot(3, 3, i+1)
        index = np.random.randint(0, len(dataset))
        image, label = dataset[index]
        plt.imshow(image.permute(1, 2, 0))
        plt.title(f"GT: {label}, Pred: {model(image.unsqueeze(0).to(device)).argmax().item()}")
        plt.axis('off')
    plt.show()

# Displaying 6 random images from the test dataset with ground truth and predicted labels
show_gt_pred(saved_model, train_dataset, visual_dataset)

#### Confusion Matrix

In [None]:
# show the confusion matrix for the test dataset

# get the confusion matrix
conf_matrix = confusion_matrix(test_gt.argmax(axis=1), test_pred.argmax(axis=1)) # type: ignore

# plot the confusion matrix
plt.figure(figsize=(10, 10))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()