In [1]:
# Python Libraries
import pandas as pd
import numpy as np
import os
import cv2
import matplotlib.pyplot as plt
import datetime
from pathlib import Path
import gc
import warnings
warnings.filterwarnings("ignore")
import sys
import time

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# PyTorch Libraries
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch import optim
import torchvision.transforms as transforms
import torchvision

# Bar Progress and import images
from fastprogress import master_bar, progress_bar
from PIL import Image
gc.collect()

0

### Load the Images and Paths

In [2]:
# Set the paths to your data directories
path = 'multiclass_x-ray-images/'
out= 'models/'
train_dataset_path = os.path.join(path, 'set_train')
validation_dataset_path = os.path.join(path, 'set_test')
test_dataset_path = os.path.join(path, 'set_pred')  # Assuming you have a 'set_pred' folder

### Create the Dataset

In [3]:
class ChestXrayDataset(Dataset):
    def __init__(self, folder_dir, image_size):
        self.image_paths = []  # List of image paths
        self.image_labels = []  # List of image labels
        self.classes = []  # List of class labels

        # Define list of image transformations
        image_transformation = [
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor()
        ]

        self.image_transformation = transforms.Compose(image_transformation)

        # Get all image paths, labels, and classes
        categories = os.listdir(folder_dir)
        for category_index, category in enumerate(categories):
            category_path = os.path.join(folder_dir, category)
            if os.path.isdir(category_path):
                images = os.listdir(category_path)
                for image in images:
                    image_path = os.path.join(category_path, image)
                    self.image_paths.append(image_path)
                    label = [0] * len(categories)
                    label[category_index] = 1
                    self.image_labels.append(label)
                self.classes.append(category)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        # Read image
        image_path = self.image_paths[index]
        image_data = Image.open(image_path).convert("RGB")  # Convert image to RGB channels

        # Resize and convert image to torch tensor
        image_data = self.image_transformation(image_data)

        return image_data, torch.FloatTensor(self.image_labels[index])

In [4]:
def print_label_mappings(data_loader, loader_type):
    # Extract class labels from the dataset
    class_labels = data_loader.dataset.classes

    # Create a dictionary to map class indices to class labels
    labels = {index: label for index, label in enumerate(class_labels)}

    # Print the label mappings with loader type
    print(f"Label Mappings for classes present in the {loader_type} dataset\n")
    for key, value in labels.items():
        print(f"{key} : {value}")

## Define the Models
<li> 1. CustomNet
<li> 2. DenseNet121

### CustomNet Model

In [5]:
class CustomNet(nn.Module):
    def __init__(self, num_classes=8, is_trained=False):
        super().__init__()
        self.ConvLayer1 = nn.Sequential(
            nn.Conv2d(3, 8, 3),
            nn.Conv2d(8, 16, 3),
            nn.MaxPool2d(2),
            nn.ReLU()
        )
        self.ConvLayer2 = nn.Sequential(
            nn.Conv2d(16, 32, 5),
            nn.Conv2d(32, 32, 3),
            nn.MaxPool2d(4),
            nn.ReLU()
        )
        self.ConvLayer3 = nn.Sequential(
            nn.Conv2d(32, 64, 3),
            nn.Conv2d(64, 64, 5),
            nn.MaxPool2d(2),
            nn.ReLU()
        )
        self.ConvLayer4 = nn.Sequential(
            nn.Conv2d(64, 128, 5),
            nn.Conv2d(128, 128, 3),
            nn.MaxPool2d(2),
            nn.ReLU()
        )
        self.Lin1 = nn.Sequential(nn.Linear(512, num_classes), nn.Sigmoid())

    def forward(self, x):
        x = self.ConvLayer1(x)
        x = self.ConvLayer2(x)
        x = self.ConvLayer3(x)
        x = self.ConvLayer4(x)
        x = x.view(x.size(0), -1)
        x = self.Lin1(x)


        return x

## Pre Trained Models
### DenseNet121

In [6]:
"""
        Init model architecture

        Parameters
        ----------
        num_classes: int
            number of classes
        is_trained: bool
            whether using pretrained model from ImageNet or not
"""
####################################################################################
###   DenseNet121
####################################################################################
#
class DenseNet121(nn.Module):
    def __init__(self, num_classes=8, is_trained=True):

        super().__init__()
        self.net = torchvision.models.densenet121(pretrained=is_trained)
        # Get the input dimension of last layer
        kernel_count = self.net.classifier.in_features
        self.net.classifier = nn.Sequential(nn.Linear(kernel_count, num_classes), nn.Sigmoid())

    def forward(self, inputs):
        """
        Forward the netword with the inputs
        """
        return self.net(inputs)

### Setting Training Parameters

In [7]:
IMAGE_SIZE = 224                              # Image size (224x224)
BATCH_SIZE = 96
LEARNING_RATE = 0.001
LEARNING_RATE_SCHEDULE_FACTOR = 0.1           # Parameter used for reducing learning rate
LEARNING_RATE_SCHEDULE_PATIENCE = 5           # Parameter used for reducing learning rate
MAX_EPOCHS = 50   

### Training, Validation & Test Dataset

In [8]:
# Create datasets
train_dataset = ChestXrayDataset(train_dataset_path, IMAGE_SIZE)
validation_dataset = ChestXrayDataset(validation_dataset_path, IMAGE_SIZE)
test_dataset = ChestXrayDataset(test_dataset_path, IMAGE_SIZE)  # For prediction

### Training, Validation & Test Loaders

In [9]:
# Create data loaders
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)  # No need to shuffle for prediction

In [10]:
# show the categories for train_loader
print_label_mappings(train_dataloader, "training")

Label Mappings for classes present in the training dataset

0 : Atelectasis
1 : Effusion
2 : Infiltration
3 : Mass
4 : No Finding
5 : Nodule
6 : Pneumonia
7 : Pneumothorax


In [11]:
# show the categories for validation_loader
print_label_mappings(val_dataloader, "validation")

Label Mappings for classes present in the validation dataset

0 : Atelectasis
1 : Effusion
2 : Infiltration
3 : Mass
4 : No Finding
5 : Nodule
6 : Pneumonia
7 : Pneumothorax


In [12]:
LABELS = train_dataloader.dataset.classes
LABELS

['Atelectasis',
 'Effusion',
 'Infiltration',
 'Mass',
 'No Finding',
 'Nodule',
 'Pneumonia',
 'Pneumothorax']

In [13]:
del train_dataset
del validation_dataset
del test_dataset
gc.collect()

0

### Set Divice

In [14]:
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
    torch.cuda.empty_cache()
device

'cpu'

### Evaluation Metrics

In [15]:
def multi_label_auroc(y_gt, y_pred):
    """ Calculate AUROC for each class

    Parameters
    ----------
    y_gt: torch.Tensor
        groundtruth
    y_pred: torch.Tensor
        prediction

    Returns
    -------
    list
        F1 of each class
    """
    auroc = []
    gt_np = y_gt.to("cpu").numpy()
    pred_np = y_pred.to("cpu").numpy()
    assert gt_np.shape == pred_np.shape, "y_gt and y_pred should have the same size"
    for i in range(gt_np.shape[1]):
        try:
            auroc.append(roc_auc_score(gt_np[:, i], pred_np[:, i]))
        except ValueError:
            pass
    return auroc

def multi_label_accuracy(y_gt, y_pred):
    """ Calculate AUROC for each class

    Parameters
    ----------
    y_gt: torch.Tensor
        groundtruth
    y_pred: torch.Tensor
        prediction

    Returns
    -------
    list
        F1 of each class
    """
    acc = []
    gt_np = y_gt.to("cpu").numpy()
    pred_np = y_pred.to("cpu").numpy()
    assert gt_np.shape == pred_np.shape, "y_gt and y_pred should have the same size"
    for i in range(gt_np.shape[1]):
        acc.append(accuracy_score(gt_np[:, i], np.where(pred_np[:, i]>=0.5,1,0)))
    return acc

def multi_label_f1(y_gt, y_pred):
    """ Calculate f1 for each class

    Parameters
    ----------
    y_gt: torch.Tensor
        groundtruth
    y_pred: torch.Tensor
        prediction

    Returns
    -------
    list
        F1 of each class
    """
    f1_out = []
    gt_np = y_gt.to("cpu").numpy()
    pred_np = y_pred.to("cpu").numpy()
    assert gt_np.shape == pred_np.shape, "y_gt and y_pred should have the same size"
    for i in range(gt_np.shape[1]):
        f1_out.append(f1_score(gt_np[:, i], np.where(pred_np[:, i]>=0.5,1,0)))
    return f1_out


def multi_label_precision_recall(y_gt, y_pred):
    """ Calculate precision for each class

    Parameters
    ----------
    y_gt: torch.Tensor
        groundtruth
    y_pred: torch.Tensor
        prediction

    Returns
    -------
    list
        precision of each class
    """
    precision_out = []
    recall_out = []
    gt_np = y_gt.to("cpu").numpy()
    pred_np = y_pred.to("cpu").numpy()
    assert gt_np.shape == pred_np.shape, "y_gt and y_pred should have the same size"
    for i in range(gt_np.shape[1]):
        p = precision_recall_fscore_support(gt_np[:, i], np.where(pred_np[:, i]>=0.5,1,0),average='binary')
        precision_out.append(p[0])
        recall_out.append(p[1])
    return precision_out,recall_out

### Training Function

In [16]:
def epoch_training(epoch, model, train_dataloader, device, loss_criteria, optimizer, mb):
    """
    Epoch training

    Paramteters
    -----------
    epoch: int
      epoch number
    model: torch Module
      model to train
    train_dataloader: Dataset
      data loader for training
    device: str
      "cpu" or "cuda"
    loss_criteria: loss function
      loss function used for training
    optimizer: torch optimizer
      optimizer used for training
    mb: master bar of fastprogress
      progress to log

    Returns
    -------
    float
      training loss
    """
    # Switch model to training mode
    model.train()
    training_loss = 0 # Storing sum of training losses

    # For each batch
    for batch, (images, labels) in enumerate(progress_bar(train_dataloader, parent=mb)):

        # Move X, Y  to device (GPU)
        images = images.to(device)
        labels = labels.to(device)

        # Clear previous gradient
        optimizer.zero_grad()

        # Feed forward the model
        pred = model(images)
        #pred = torch.LongTensor(pred)
        loss = loss_criteria(pred, labels)
        #print("loss is ",loss)

        # Back propagation
        loss.backward()

        # Update parameters
        optimizer.step()

        # Update training loss after each batch
        training_loss += loss.item()

        #mb.child.comment = f'Training loss {training_loss/(batch+1)}'

    del images, labels, loss
    if torch.cuda.is_available(): torch.cuda.empty_cache()

    # return training loss
    return training_loss/len(train_dataloader)

### Evaluating Function

In [17]:
def evaluating(epoch, model, val_loader, device, loss_criteria, mb):
    """
    Validate model on validation dataset

    Parameters
    ----------
    epoch: int
        epoch number
    model: torch Module
        model used for validation
    val_loader: Dataset
        data loader of validation set
    device: str
        "cuda" or "cpu"
    loss_criteria: loss function
      loss function used for training
    mb: master bar of fastprogress
      progress to log

    Returns
    -------
    float
        loss on validation set
    float
        metric score on validation set
    """

    # Switch model to evaluation mode
    model.eval()

    val_loss = 0                                   # Total loss of model on validation set
    out_pred = torch.FloatTensor().to(device)      # Tensor stores prediction values
    out_gt = torch.FloatTensor().to(device)        # Tensor stores groundtruth values

    with torch.no_grad(): # Turn off gradient
        # For each batch
        for step, (images, labels) in enumerate(progress_bar(val_loader, parent=mb)):
            # Move images, labels to device (GPU)
            images = images.to(device)
            labels = labels.to(device)

            # Update groundtruth values
            out_gt = torch.cat((out_gt,  labels), 0)

            # Feed forward the model
            ps = model(images)
            loss = loss_criteria(ps, labels)

            # Update prediction values
            out_pred = torch.cat((out_pred, ps), 0)

            # Update validation loss after each batch
            val_loss += loss

    # Clear memory
    del images, labels, loss
    if torch.cuda.is_available(): torch.cuda.empty_cache()
    # return validation loss, and metric score
    val_loss_mean = val_loss/len(val_loader)
    auroc_mean = np.nanmean(np.array(multi_label_auroc(out_gt, out_pred)))
    acc_mean = np.nanmean(np.array(multi_label_accuracy(out_gt, out_pred)))
    f1_mean = np.nanmean(np.array(multi_label_f1(out_gt, out_pred)))

    return val_loss_mean,auroc_mean,acc_mean,f1_mean

### Define Optimizer

In [18]:
def get_opt(modeltxt,model):
    
    if modeltxt == "CustomNet":
        return optim.Adam(model.parameters())
    
    # DenseNet121
    return optim.Adam(model.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.999), eps=1e-8, weight_decay=1e-5)

### Train Model

In [19]:
# training_losses = []
# validation_losses = []
# validation_score = []
# validation_acc = []
# validation_f1 = []

# def trainModel(modelname,loss_criteria,modeltxt):
#     model = modelname(num_classes=len(LABELS),is_trained=True).to(device)

#     optimizer = get_opt(modeltxt,model)
#     # Learning rate will be reduced automatically during training
#     lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor = LEARNING_RATE_SCHEDULE_FACTOR,
#                                                         patience = LEARNING_RATE_SCHEDULE_PATIENCE, mode = 'max', verbose=True)
#     best_score = 0
#     best_score_acc = 0
#     best_score_f1 = 0

#     model_path = out+modeltxt+".pth"
#     out_path = out+modeltxt+"_running.csv"
# #     training_losses = []
# #     validation_losses = []
# #     validation_score = []
# #     validation_acc = []
# #     validation_f1 = []


#     # Config progress bar
#     mb = master_bar(range(MAX_EPOCHS))
#     mb.names = ['Train loss', 'Val loss', 'AUROC', 'Accuracy', 'f1 score']
#     x = []

#     nonimproved_epoch = 0
#     start_time = time.time()
#     cnt = 1

#     # Training each epoch
#     for epoch in mb:
#         #break
#         mb.main_bar.comment = f'Best AUROC score: {best_score}'
#         x.append(epoch)

#         # Training
#         train_loss = epoch_training(epoch, model, train_dataloader, device, loss_criteria, optimizer, mb)
#         mb.write('Finish training epoch {} with loss {:.4f}'.format(epoch, train_loss))
#         training_losses.append(train_loss)

#         # Evaluating
#         val_loss, new_score, new_score_acc, new_score_f1 = evaluating(epoch, model, val_dataloader, device, loss_criteria, mb)

#         validation_losses.append(val_loss)
#         validation_score.append(new_score)
#         validation_acc.append(new_score_acc)
#         validation_f1.append(new_score_f1)

#         gc.collect()
#         # Update learning rate
#         lr_scheduler.step(new_score)

#         # Update training chart
#         # mb.update_graph([[x, training_losses], [x, validation_losses], [x, validation_score] , [x, validation_acc] ,
#         #                  [x, validation_f1]],
#         #                 [0,epoch+1+round(epoch*0.3)], [0,1])

#         diff = np.round(time.time() - start_time)
#         pd.DataFrame([[epoch,modeltxt,best_score,new_score,diff]]).to_csv(out_path,index=False,mode='a',header=False)
#         # Save model
#         t2 = 4
#         if modeltxt == 'DenseNet121':
#             t2 = 6
#         if best_score < new_score:
#             #mb.write(f"Improve AUROC from {best_score} to {new_score}")
#             best_score = new_score
#             best_score_acc = new_score_acc
#             best_score_f1 = new_score_f1
#             nonimproved_epoch = 0
#             best_model = model
#             torch.save({"model": model.state_dict(),
#                         "optimizer": optimizer.state_dict(),
#                         "best_score": best_score,
#                         "epoch": epoch,
#                         "lr_scheduler": lr_scheduler.state_dict()}, model_path)
#         else:
#             nonimproved_epoch += 1
#         if nonimproved_epoch > 5:
#             break
#             print("Early stopping")
#         if time.time() - start_time > 3600*t2:
#             break
#             print("Out of time")
            
#     return best_score,best_score_acc,best_score_f1,best_model

In [19]:
def trainModel(modelname, loss_criteria, modeltxt):
    model = modelname(num_classes=len(LABELS), is_trained=True).to(device)

    optimizer = get_opt(modeltxt, model)
    lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=LEARNING_RATE_SCHEDULE_FACTOR,
                                                        patience=LEARNING_RATE_SCHEDULE_PATIENCE, mode='max', verbose=True)
    best_score = 0
    best_score_acc = 0
    best_score_f1 = 0

    model_path = out + modeltxt + ".pth"
    out_path = out + modeltxt + "_running.csv"
    
    # Use a dictionary to store values for each model
    model_results = {
        'CustomNet': {'training_losses': [], 'validation_losses': [], 'validation_score': [], 'validation_acc': [], 'validation_f1': []},
        'DenseNet121': {'training_losses': [], 'validation_losses': [], 'validation_score': [], 'validation_acc': [], 'validation_f1': []}
    }

    mb = master_bar(range(MAX_EPOCHS))
    mb.names = ['Train loss', 'Val loss', 'AUROC', 'Accuracy', 'f1 score']
    x = []

    nonimproved_epoch = 0
    start_time = time.time()
    cnt = 1

    for epoch in mb:
        mb.main_bar.comment = f'Best AUROC score: {best_score}'
        x.append(epoch)

        train_loss = epoch_training(epoch, model, train_dataloader, device, loss_criteria, optimizer, mb)
        mb.write('Finish training epoch {} with loss {:.4f}'.format(epoch, train_loss))
        model_results[modeltxt]['training_losses'].append(train_loss)

        val_loss, new_score, new_score_acc, new_score_f1 = evaluating(epoch, model, val_dataloader, device, loss_criteria, mb)

        model_results[modeltxt]['validation_losses'].append(val_loss)
        model_results[modeltxt]['validation_score'].append(new_score)
        model_results[modeltxt]['validation_acc'].append(new_score_acc)
        model_results[modeltxt]['validation_f1'].append(new_score_f1)

        gc.collect()
        lr_scheduler.step(new_score)

        diff = np.round(time.time() - start_time)
        pd.DataFrame([[epoch, modeltxt, best_score, new_score, diff]]).to_csv(out_path, index=False, mode='a', header=False)

        t2 = 4
        if modeltxt == 'DenseNet121':
            t2 = 6
        if best_score < new_score:
            best_score = new_score
            best_score_acc = new_score_acc
            best_score_f1 = new_score_f1
            nonimproved_epoch = 0
            best_model = model
            torch.save({"model": model.state_dict(),
                        "optimizer": optimizer.state_dict(),
                        "best_score": best_score,
                        "epoch": epoch,
                        "lr_scheduler": lr_scheduler.state_dict()}, model_path)
        else:
            nonimproved_epoch += 1
        if nonimproved_epoch > 5:
            break
            print("Early stopping")
        if time.time() - start_time > 3600 * t2:
            break
            print("Out of time")

    return best_score, best_score_acc, best_score_f1, best_model, model_results


### Set Models to Train

In [20]:
model_list = [CustomNet, DenseNet121]
mName_list = ['CustomNet', 'DenseNet121']

In [22]:
# eval_df_train = []
# model_results_dict = {}

# for m in model_list:
#     mName = m().__class__.__name__
#     print("Processing Model ",mName)
#     globals()[f"best_score_{mName}"],globals()[f"best_score_acc_{mName}"],globals()[f"best_score_f1_{mName}"],globals()[f"best_model_{mName}"], model_results = trainModel(modelname=m,loss_criteria=nn.BCELoss(),modeltxt=mName)
#     #
#     eval_df_train.append([mName,globals()[f"best_score_{mName}"],globals()[f"best_score_acc_{mName}"],globals()[f"best_score_f1_{mName}"]])

### Train Models in a Loop

In [None]:
eval_df_train = []
model_results_dict = {}

for m in model_list:
    mName = m().__class__.__name__
    print("Processing Model ", mName)
    globals()[f"best_score_{mName}"],globals()[f"best_score_acc_{mName}"],globals()[f"best_score_f1_{mName}"],globals()[f"best_model_{mName}"], model_results = trainModel(modelname=m, loss_criteria=nn.BCELoss(), modeltxt=mName)

    # Store results in model_results_dict
    model_results_dict[mName] = model_results

    eval_df_train.append([mName,globals()[f"best_score_{mName}"],globals()[f"best_score_acc_{mName}"],globals()[f"best_score_f1_{mName}"]])

Processing Model  CustomNet


### Evaluation Results

In [None]:
eval_df_train=pd.DataFrame(eval_df_train)
eval_df_train.columns = ['Model Name','AUROC', 'Accuracy', 'f1 Score']
eval_df_train.to_csv(out+"eval_df_train.csv",index=False)
eval_df_train

In [None]:
# Access results for each model from model_results_dict
for mName, model_result in model_results_dict.items():
    print(f"Results for {mName}:")
    print(f"Training Losses: {model_result[mName]['training_losses']}")
    print(f"Validation Losses: {model_result[mName]['validation_losses']}")
    print(f"Validation Scores: {model_result[mName]['validation_score']}")
    print(f"Validation Accuracies: {model_result[mName]['validation_acc']}")
    print(f"Validation F1 Scores: {model_result[mName]['validation_f1']}")
    print("\n")

In [None]:
# Create a figure and axis for the plots
fig, axs = plt.subplots(len(model_list), 2, figsize=(16, 8 * len(model_list)))
fig.suptitle('Training and Validation Metrics Over Epochs for Each Model', y=1)

for i, m in enumerate(model_list):
    mName = m().__class__.__name__
    
    # Access results for the current model from model_results_dict
    model_result = model_results_dict[mName]

    # Access the epoch values from the model_results_dict
    x = model_result[mName]['training_losses']

    # Create a DataFrame for the metrics
    df_metrics = pd.DataFrame({
        'Epoch': range(len(x)),
        'Training Loss': model_result[mName]['training_losses'],
        'Validation Loss': model_result[mName]['validation_losses'],
        'AUROC': model_result[mName]['validation_score'],
        'Accuracy': model_result[mName]['validation_acc'],
        'F1 Score': model_result[mName]['validation_f1']
    })

    # Plot training loss
    axs[i, 0].plot(df_metrics['Epoch'], df_metrics['Training Loss'], label='Training Loss')
    axs[i, 0].set_title(f'{mName} - Training Loss')
    axs[i, 0].set_xlabel('Epoch')
    axs[i, 0].set_ylabel('Training Loss')
    axs[i, 0].legend()

    # Plot validation metrics
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['Validation Loss'], label='Validation Loss')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['AUROC'], label='AUROC')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['Accuracy'], label='Accuracy')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['F1 Score'], label='F1 Score')
    axs[i, 1].set_title(f'{mName} - Validation Metrics')
    axs[i, 1].set_xlabel('Epoch')
    axs[i, 1].set_ylabel('Metric Value')
    axs[i, 1].legend()

plt.tight_layout(rect=[0, 0, 1, 0.95])  # Adjust layout for the title
plt.show()

In [None]:
# Create a figure and axis for the plots
fig, axs = plt.subplots(len(model_list), 2, figsize=(16, 8 * len(model_list)))
fig.suptitle('Training and Validation Metrics Over Epochs for Each Model', y=1)

for i, m in enumerate(model_list):
    mName = m().__class__.__name__
    
    # Access results for the current model from model_results_dict
    model_result = model_results_dict[mName]

    # Access the epoch values from the model_results_dict
    x = model_result[mName]['training_losses']

    # Create a DataFrame for the metrics
    df_metrics = pd.DataFrame({
        'Epoch': range(len(x)),
        'Training Loss': model_result[mName]['training_losses'],
        'Validation Loss': model_result[mName]['validation_losses'],
        'AUROC': model_result[mName]['validation_score'],
        'Accuracy': model_result[mName]['validation_acc'],
        'F1 Score': model_result[mName]['validation_f1']
    })

    # Plot training loss with validation loss
    axs[i, 0].plot(df_metrics['Epoch'], df_metrics['Training Loss'], label='Training Loss')
    axs[i, 0].plot(df_metrics['Epoch'], df_metrics['Validation Loss'], label='Validation Loss')  # Add this line
    axs[i, 0].set_title(f'{mName} - Training and Validation Loss')
    axs[i, 0].set_xlabel('Epoch')
    axs[i, 0].set_ylabel('Loss')
    axs[i, 0].legend()

    # Plot validation metrics
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['Validation Loss'], label='Validation Loss')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['AUROC'], label='AUROC')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['Accuracy'], label='Accuracy')
    axs[i, 1].plot(df_metrics['Epoch'], df_metrics['F1 Score'], label='F1 Score')
    axs[i, 1].set_title(f'{mName} - Validation Metrics')
    axs[i, 1].set_xlabel('Epoch')
    axs[i, 1].set_ylabel('Metric Value')
    axs[i, 1].legend()

plt.tight_layout(rect=[0, 0, 1, 0.95])  # Adjust layout for the title
plt.show()


### Best model Parameters

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def count_parameters_all(model):
    return sum(p.numel() for p in model.parameters())

In [None]:
param_list = []
i = 0
for model in model_list:
    modelName = model().__class__.__name__
    num_params_all = count_parameters_all(model())
    num_params = count_parameters(model())
    param_list.append([modelName,num_params_all,num_params])

param_list=pd.DataFrame(param_list)
param_list.columns = ['Model Name','Total Parameters','Trainable Parameters']
param_list

In [None]:
# !pip install prettytable

In [None]:
from prettytable import PrettyTable

def count_parameters_pretty(model):
    print(model.__class__.__name__)
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad:
            continue
        param = parameter.numel()
        table.add_row([name, param])
        total_params+=param
    print(table)
    print(f"Total Trainable Params: {total_params}")
    k = f"Total Trainable Params: {total_params}"
    return k

out1=count_parameters_pretty(best_model_CustomNet)

In [None]:
# del train_dataloader
# del val_dataloader
# gc.collect()

### Functions for Prediction, Confusion Matrix and Plots on Test Data Loader

In [None]:
def getTestPreds(best_model,test_dataloader,modeltxt):
    y_pred_t = torch.FloatTensor().to(device) # Tensor stores prediction values
    y_test_t = torch.FloatTensor().to(device) # Tensor stores groundtruth values

    y_pred_list = []
    y_test_list = []
    test_auroc = []
    test_acc = []
    test_f1 = []
    test_precision = []
    test_recall = []

    with torch.no_grad():
        best_model.eval()
        for X_batch, labels in test_dataloader:
            X_batch = X_batch.to(device)
            labels = labels.to(device)
            ps = best_model(X_batch)
            y_test_t = torch.cat((y_test_t,  labels), 0)
            y_pred_t = torch.cat((y_pred_t, ps), 0)
            
            test_acc.append(np.mean(multi_label_accuracy(y_test_t, y_pred_t)))
            test_f1.append(np.mean(multi_label_f1(y_test_t, y_pred_t)))
            test_auroc.append(np.mean(multi_label_auroc(y_test_t, y_pred_t)))

            p,r = multi_label_precision_recall(y_test_t, y_pred_t)
            test_precision.append(np.mean(p))
            test_recall.append(np.mean(r))

        test_auroc = np.nanmean(test_auroc)
        test_acc = np.nanmean(test_acc)
        test_f1 = np.nanmean(test_f1)
        test_precision = np.nanmean(test_precision)
        test_recall = np.nanmean(test_recall)


        print("AUROC : ",test_auroc)
        print("Accuracy : ",test_acc)
        print("f1 score : ",test_f1)
        print("precision score : ",test_precision)
        print("recall score : ",test_recall)

        eval_matrix = [modeltxt,test_auroc,test_acc,test_f1,test_precision,test_recall]

        return y_test_t,y_pred_t,eval_matrix

# Plot
def plot_conf(y_test,y_pred,modeltxt):
    f, axes = plt.subplots(2, 4, figsize=(14, 8))
    f.suptitle('Confustion Matrix For Model '+ modeltxt, fontsize=20, fontweight='bold')
    plt.rcParams.update({'font.size': 12,'font.weight': 'bold'})
    y_test = y_test.cpu()
    y_pred = np.where(y_pred.cpu()>0.5,1,0)
    axes = axes.ravel()
    for i in range(8):
        disp = ConfusionMatrixDisplay(confusion_matrix(y_test[:, i],
                                                       y_pred[:, i]))
        disp.plot(ax=axes[i], values_format='.10g')
        disp.ax_.set_title(f'{LABELS[i]}')
        disp.im_.colorbar.remove()

    plt.subplots_adjust(wspace=0.25, hspace=0.25)
    plt.show()

# Get Values
def get_conf(y_test,y_pred,modeltxt):
    conf_vals = []
    y_test = y_test.cpu()
    y_pred = np.where(y_pred.cpu()>0.5,1,0)
    for i in range(8):
        c = confusion_matrix(y_test[:, i],y_pred[:, i])
        try:
            p = c[0][0]
        except IndexError:
            p = 0
        try:
            q = c[0][1]
        except IndexError:
            q = 0
        try:
            r = c[1][0]
        except IndexError:
            r = 0
        try:
            s = c[1][1]
        except IndexError:
            s = 0

        conf_vals.append([modeltxt,LABELS[i],p,q,r,s])

    return conf_vals

### Test Predictions

In [None]:
eval_matrix_all = []
for mName in mName_list:
    print("Predicting for Model ",mName)
    globals()[f"y_test_t_{mName}"],globals()[f"y_pred_t_{mName}"],eval_matrix = getTestPreds(globals()[f"best_model_{mName}"],test_dataloader,mName)
    eval_matrix_all.append(eval_matrix)
    print("----------------------------------------------")

### Print and Plot Evaluation Matrices
Print AUROC, Accuracy, F1 Score, Precision and Recall

In [None]:
df_mat_all = pd.DataFrame(eval_matrix_all)
df_mat_all.columns = ["Model Name","AUROC","Accuracy","F1 Score","Precision","Recall"]
df_mat_all

### Print Accuracy for Labels

In [None]:
label_list_all = []
for mName in mName_list:
    for i in range(8):
        acc = accuracy_score(globals()[f"y_test_t_{mName}"].to("cpu").numpy()[:, i], np.where(globals()[f"y_pred_t_{mName}"].to("cpu").numpy()[:, i]>=0.5,1,0))
        p = precision_recall_fscore_support(globals()[f"y_test_t_{mName}"].to("cpu").numpy()[:, i], np.where(globals()[f"y_pred_t_{mName}"].to("cpu").numpy()[:, i]>=0.5,1,0),average='binary')
        #print(LABELS[i]," ==> ","Acc : ",acc," Precision : ",p[0]," Recall : ",p[1])
        if i !=12:
            auroc = roc_auc_score(globals()[f"y_test_t_{mName}"].to("cpu").numpy()[:, i], globals()[f"y_pred_t_{mName}"].to("cpu").numpy()[:, i])
        else:
            auroc = np.nan

        #print(LABELS[i],p)
        label_list_all.append([mName,LABELS[i],auroc,acc,p[0],p[1]])
        
df_label_list_all = pd.DataFrame(label_list_all)
df_label_list_all.columns = ["Model Name","Label","AUROC","Accuracy","Precision","Recall"]
df_label_list_all.to_csv(out+"df_label_list_all.csv",index=False)
df_label_list_all

### AUROC of Labels accross Different Models

In [None]:
df_label_list_all.pivot(index='Label', columns='Model Name', values=["AUROC"])

### Accuracy of Labels accross Different Models

In [None]:
df_label_list_all.pivot(index='Label', columns='Model Name', values=["Accuracy"])

### Precision of Labels accross Different Models

In [None]:
df_label_list_all.pivot(index='Label', columns='Model Name', values=["Precision"])

In [None]:
df_label_list_all.pivot(index='Label', columns='Model Name', values=["Recall"])

### Print Confusion Matrix

In [None]:
conf_all = pd.DataFrame()
for mName in mName_list:
    print("Print Confusion Matrix for Model ",mName)
    conf_all = conf_all.append(pd.DataFrame(get_conf(globals()[f"y_test_t_{mName}"],globals()[f"y_pred_t_{mName}"],mName)))

In [None]:
conf_all.columns = ["Model Name","Label","True Negative","False Positive","False Negative","True Positive"]
conf_all.to_csv("ConfusionMatrix.csv",index=False)
conf_all_piv = conf_all.pivot(index='Label', columns='Model Name', values=["True Negative","False Positive","False Negative","True Positive"])
conf_all_piv.to_csv(out+"ConfusionMatrixPivot.csv",index=False)
conf_all_piv

### Plot Confusion Matrix

In [None]:
conf_all = pd.DataFrame()
for mName in mName_list:
    print("Plot Confusion Matrix for Model ",mName)
    plot_conf(globals()[f"y_test_t_{mName}"],globals()[f"y_pred_t_{mName}"],mName)

In [None]:
plt.plot(training_losses)
plt.ylabel("loss")
plt.xlabel("epoch")
plt.title("Training Loss Curve CustumNet")
plt.show()

In [None]:
LABELS

In [None]:
eval_matrix_all

In [None]:
for mName in mName_list:
    print("Predicting for Model", mName)
    y_test_t = globals()[f"y_test_t_{mName}"]
    y_pred_t = globals()[f"y_pred_t_{mName}"]

    # Código para trazar curvas ROC
    auc_roc_vals = []
    for i in range(len(LABELS)):
        try:
            gt = np.array(y_test_t.cpu()[:, i])
            pred = y_pred_t.cpu()[:, i]
            gt = gt.astype('int64')
            gt = gt.reshape(-1, 1)
            auc_roc = roc_auc_score(gt, pred)
            print(auc_roc)
            auc_roc_vals.append(auc_roc)
            fpr_rf, tpr_rf, _ = roc_curve(gt, pred)
            plt.figure(1, figsize=(10, 10))
            plt.plot([0, 1], [0, 1], 'k--')
            plt.plot(fpr_rf, tpr_rf,
                     label=LABELS[i] + " (" + str(round(auc_roc, 3)) + ")")
            plt.xlabel('False positive rate')
            plt.ylabel('True positive rate')
            plt.title(f'ROC curve for {mName}')
            plt.legend(loc='best')
        except ValueError:
            pass
    plt.show()

In [None]:
# eval_matrix_all = []
# for mName in mName_list:
#     print("Predicting for Model ")
#     y_test_t, y_pred_t, eval_matrix = getTestPreds(globals()[f"best_model_{mName}"], test_dataloader, mName)
#     eval_matrix_all.append(eval_matrix)

#     # Código para trazar curvas ROC
#     auc_roc_vals = []
#     for i in range(len(LABELS)):
#         try:
#             gt = np.array(y_test_t.cpu()[:, i])
#             pred = y_pred_t.cpu()[:, i]
#             gt = gt.astype('int64')
#             gt = gt.reshape(-1, 1)
#             auc_roc = roc_auc_score(gt, pred)
#             print(auc_roc)
#             auc_roc_vals.append(auc_roc)
#             fpr_rf, tpr_rf, _ = roc_curve(gt, pred)
#             plt.figure(1, figsize=(10, 10))
#             plt.plot([0, 1], [0, 1], 'k--')
#             plt.plot(fpr_rf, tpr_rf,
#                      label=LABELS[i] + " (" + str(round(auc_roc, 3)) + ")")
#             plt.xlabel('False positive rate')
#             plt.ylabel('True positive rate')
#             plt.title('ROC curve')
#             plt.legend(loc='best')
#         except ValueError:
#             pass
#     plt.show()

In [None]:
# # Asegúrate de que todas las listas tengan la misma longitud
# min_length = min(len(globals()[f"best_score_acc_CustomNet"]),
#                  len(globals()[f"best_score_f1_CustomNet"]))

# # Utiliza solo las primeras `min_length` épocas para evitar la discrepancia de dimensiones
# x = x[:min_length]

# # Crear subgráficos
# fig, ax = plt.subplots(nrows=3, ncols=1, figsize=(12, 10))

# # Gráfico de precisión de entrenamiento y validación
# ax[0].set_title('AUROC vs. Epochs')
# ax[0].plot(x, globals()[f"best_score_acc_CustomNet"][:min_length], 'o-', label='AUROC')
# ax[0].set_xlabel('Epochs')
# ax[0].set_ylabel('AUROC')
# ax[0].legend(loc='best')

# # Gráfico de precisión de entrenamiento y validación
# ax[1].set_title('Accuracy vs. Epochs')
# ax[1].plot(x, globals()[f"best_score_acc_CustomNet"][:min_length], 'o-', label='Accuracy')
# ax[1].set_xlabel('Epochs')
# ax[1].set_ylabel('Accuracy')
# ax[1].legend(loc='best')

# # Gráfico de pérdida de entrenamiento y validación
# ax[2].set_title('F1 Score vs. Epochs')
# ax[2].plot(x, globals()[f"best_score_f1_CustomNet"][:min_length], 'o-', label='F1 Score')
# ax[2].set_xlabel('Epochs')
# ax[2].set_ylabel('F1 Score')
# ax[2].legend(loc='best')

# # Ajustar el diseño y mostrar el gráfico
# plt.tight_layout()
# plt.show()


In [None]:
# Crear una lista de épocas
x = list(range(MAX_EPOCHS))

# Crear subgráficos
fig, ax = plt.subplots(nrows=3, ncols=1, figsize=(12, 10))

# Gráfico de precisión de entrenamiento y validación
ax[0].set_title('AUROC vs. Epochs')
ax[0].plot(x, globals()[f"best_score_acc_CustomNet"], 'o-', label='AUROC')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('AUROC')
ax[0].legend(loc='best')

# Gráfico de precisión de entrenamiento y validación
ax[1].set_title('Accuracy vs. Epochs')
ax[1].plot(x, globals()[f"best_score_acc_CustomNet"], 'o-', label='Accuracy')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Accuracy')
ax[1].legend(loc='best')

# Gráfico de pérdida de entrenamiento y validación
ax[2].set_title('F1 Score vs. Epochs')
ax[2].plot(x, globals()[f"best_score_f1_CustomNet"], 'o-', label='F1 Score')
ax[2].set_xlabel('Epochs')
ax[2].set_ylabel('F1 Score')
ax[2].legend(loc='best')

# Ajustar el diseño y mostrar el gráfico
plt.tight_layout()
plt.show()
