In [None]:
# /*==========================================================================================*\
# **                        _           _ _   _     _  _         _                            **
# **                       | |__  _   _/ | |_| |__ | || |  _ __ | |__                         **
# **                       | '_ \| | | | | __| '_ \| || |_| '_ \| '_ \                        **
# **                       | |_) | |_| | | |_| | | |__   _| | | | | | |                       **
# **                       |_.__/ \__,_|_|\__|_| |_|  |_| |_| |_|_| |_|                       **
# \*==========================================================================================*/


# -----------------------------------------------------------------------------------------------
# Author: Tien-Thanh Bui (@bu1th4nh)
# Title: homework.ipynb
# Date: 2023/11/01 18:10:37
# Description: Homework File for Prof. Wei Zhang | CS @ UCF
# -----------------------------------------------------------------------------------------------


import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import ydata_profiling as ydp
import seaborn as sns
import sklearn
import os, sys, logging
from tqdm import tqdm

tqdm.pandas()


logging.root.handlers = [];
logging.basicConfig(
    format   = '%(asctime)s [%(levelname)s] %(message)s', 
    datefmt  = '%Y/%m/%d %H:%M:%S',
    level    = logging.INFO,
    handlers = [
        # logging.FileHandler(
        #     filename = "data-processing-log_latest.log", 
        #     encoding = "utf-8-sig",    
        #     mode     = "w"
        # ),
        logging.StreamHandler(sys.stdout)
    ]
)

import torch
logging.info(torch.cuda.is_available())

import shutil
folder = 'checkpoints/'
try:
    for filename in os.listdir(folder):
        file_path = os.path.join(folder, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
            logging.info(f'[TRAIN] Deleted checkpoint {file_path}')
        except Exception as e:
            logging.error(f'[TRAIN] Failed to delete checkpoint {file_path}. Reason: {e}')
except Exception as e: pass

## 1. Data Acquisition and Basic Analysis

### Acquisition

In [None]:
# Data - We save the data in parquet format for faster loading
if(not os.path.exists('data/Data.parquet')): pd.read_csv('data/Data.csv').to_parquet('data/Data.parquet')
if(not os.path.exists('data/Label.parquet')): pd.read_csv('data/Label.csv').to_parquet('data/Label.parquet')

# Load
data = pd.read_parquet('data/Data.parquet').dropna()
logging.info("Data")
display(data.head())
# Label
label = pd.read_parquet('data/Label.parquet')
logging.info("Label")
display(label.head())

### Pre-process: Transposition

In [None]:
# Data - We transpose the data to make it easier to work with. 
# Initially, each row is a feature, and each column is a sample. 
# We want to make each row a sample, and each column a feature.
data = data.set_index('sample').transpose()
data.index.name = 'sample'
data.columns.name = ''
# display(data.head())
# Label - We set the index to be the first column - due to the data description. 
label = label.set_index(label.columns[0])
label.index.name = 'sample'
# display(label.head())

### Very Basic Data Information

In [None]:
# data.info()
logging.info(f"Number of samples: {data.shape[0]}")
logging.info(f"Number of features: {data.shape[1]}")

if(not os.path.exists('./description.xlsx')): data.describe().transpose().to_excel('./description.xlsx')
description = pd.read_excel('./description.xlsx', index_col=0)  # To save time, we load the description from the file.
description.index.name = 'features'
# description.head(50)

In [None]:
label.info()
logging.info(f"Number of LABELED samples: {label.shape[0]}") 


## 2. EDA - Exploratory Data Analysis

### 2.1. Histogram

In [None]:
# Select k random features from data - I'm feeling lucky =))))
# Why? We have 20k features
k = 1
random_feature_set = np.random.choice(data.columns, k, replace=False)
logging.info(f"Random feature set: {random_feature_set}")


for feature in random_feature_set:
    # display(data[feature].describe())
    fig, ax = plt.subplots(1, 2, figsize=(20, 5))
    data.hist(feature, bins=100, ax=ax[0])
    # data[feature].plot.kde(ax=ax[0], secondary_y=True)
    ax[0].set_title(f'Histogram for feature {feature}')
    data.boxplot(feature, ax=ax[1])
    ax[1].set_title(f'Boxplot for feature {feature}')
    # fig.savefig(f'./plots/{feature}.pdf')

In [None]:
logging.info("Number of sample has ER value = 1: {label[label.ER == 1].shape[0]}")
logging.info("Number of sample has ER value = 0: {label[label.ER == 0].shape[0]}")
logging.info("Number of sample has TN value = 1: {label[label.TN == 1].shape[0]}")
logging.info("Number of sample has TN value = 0: {label[label.TN == 0].shape[0]}")


### 2.2. Correlation

In [None]:
# Select k random features from data - I'm feeling lucky :)
# Why? We have 20k features, and if we calculate corr. for all of them, the matrix will be too big (400M x size of a float64)
k = 20
random_feature_set = np.random.choice(data.columns, k, replace=False)

In [None]:
corr_sample = data[random_feature_set].corr().abs()
# display(corr_sample.head())

In [None]:
# Calculate distribution of correlation
k = 1
random_feature_set = np.random.choice(corr_sample.columns, k, replace=False)
logging.info(f"Random feature set: {random_feature_set}")


for feature in random_feature_set:
    # display(data[feature].describe())
    fig, ax = plt.subplots(1, 2, figsize=(20, 3))
    corr_sample.hist(feature, bins=100, ax=ax[0])
    # data[feature].plot.kde(ax=ax[0], secondary_y=True)
    ax[0].set_title(f'Histogram of correlation coefficient between {feature} and others')
    ax[0].set_xlim(0, 1)

    corr_sample[feature].plot.kde(ax=ax[0], secondary_y=True, bw_method='scott')
    corr_sample.boxplot(feature, ax=ax[1])
    ax[1].set_title(f'Boxplot of correlation coefficient between {feature} and others')
    ax[1].set_ylim(0, 1)
    # fig.savefig(f'./plots/{feature}.pdf')

### 2.3. Nullity Analysis

## 3. Data Preprocessing

### 3.1. Label Merge

In [None]:
# Merge data and label
labeled_dataset = data.merge(label[["TN"]], left_index=True, right_index=True, how="outer")
labeled_dataset.rename(columns={"TN": "label"}, inplace=True)

logging.info(f"[Reminder] Data size: {data.shape[0]}")
logging.info(f"Number of labeled data: {labeled_dataset[labeled_dataset.label.notnull()].shape[0]}")
logging.info(f"Number of unlabeled data: {labeled_dataset[labeled_dataset.label.isnull()].shape[0]}")

In [None]:
usable_dataset = labeled_dataset[labeled_dataset.label.notnull()]
usable_data = usable_dataset.drop(columns=["label"])
usable_labl = usable_dataset["label"]

logging.info(f"Number of usable data: {usable_dataset.shape[0]}")

### 3.2. Train-Test Split

In [None]:
# Split data into train and test
from sklearn.model_selection import train_test_split
original_train_data, original_test_data, original_train_labl, original_test_labl = train_test_split(usable_data.values, usable_labl.values, test_size=0.2, random_state=42)

## 4. Dimensionality Reduction

### 4.1. Autoencoder Implementation

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl



class AutoEncoder(pl.LightningModule):
    def __init__(self, layer_characteristics, input_size, learning_rate=1e-3, optimizer=torch.optim.Adam):
        """
            Initialize the Auto Encoder model

            Parameters
            ----------
            - `layer_characteristics`: list
                List of dictionaries, each dictionary contains the following keys:
                    - `layer_type`: "linear"
                    - `activation`: "relu", "sigmoid", "tanh"
                    - `dropout`: 0.0 - 1.0
                    - `output_size`: int
            - `input_size`: int
                Input size of the model. This is the feature length
            - `learning_rate`: float, default=1e-3
                Learning rate of the optimizer
            - `optimizer`: `torch.optim`, default=torch.optim.Adam
                Optimizer of the model

        """

        logging.info("===============================================================")
        logging.info("Initializing AutoEncoder")


        super(AutoEncoder, self).__init__()

        self.encoder = nn.ModuleList()
        self.decoder = nn.ModuleList()
        self.learning_rate = learning_rate
        self.optimizer = optimizer

        encoder_size = len(layer_characteristics)

        restoration_layer = {
            "output_size": input_size,
            "layer_type": layer_characteristics[0]["layer_type"],
            "activation": layer_characteristics[0]["activation"],
            "dropout": layer_characteristics[0]["dropout"],
        }
        layer_characteristics = layer_characteristics + list(reversed(layer_characteristics))[1:] + [restoration_layer]


        for i, layer in enumerate(layer_characteristics):
            layer_type  = layer["layer_type"]
            input_size  = input_size if i == 0 else layer_characteristics[i-1]["output_size"]
            activation  = layer["activation"]
            output_size = layer["output_size"]
            dropout     = layer["dropout"]

            logging.info(f"Layer {i}: {input_size} -> {output_size} | {activation} | {layer_type} | {dropout}")

            if i < encoder_size:
                # Layer
                if(layer_type == "linear"): 
                    self.encoder.append(nn.Linear(input_size, output_size))
                    self.encoder.append(nn.Dropout(dropout))
                else: raise ValueError("Invalid layer type")
                
                # Activation
                if(activation == "relu"): self.encoder.append(nn.ReLU())
                elif(activation == "sigmoid"): self.encoder.append(nn.Sigmoid())
                elif(activation == "tanh"): self.encoder.append(nn.Tanh())
                else: raise ValueError("Invalid activation function")
            else:
                # Layer
                if(layer_type == "linear"): 
                    self.decoder.append(nn.Linear(input_size, output_size))
                    self.decoder.append(nn.Dropout(dropout))
                else: raise ValueError("Invalid layer type")
                
                # Activation
                if(activation == "relu"): self.decoder.append(nn.ReLU())
                elif(activation == "sigmoid"): self.decoder.append(nn.Sigmoid())
                elif(activation == "tanh"): self.decoder.append(nn.Tanh())
                else: raise ValueError("Invalid activation function")

        self.encoder = nn.Sequential(*self.encoder)
        self.decoder = nn.Sequential(*self.decoder)



    def forward(self, x):
        """
            Forward pass of the model. This is used for training only.

            Parameters
            ----------
            `x`: torch.Tensor
                Input tensor

            Returns
            -------
            torch.Tensor
                Output tensor
        """


        x = self.encoder(x)
        x = self.decoder(x)
        return x
    

    def inference(self, x):
        """
            Inference pass of the model, which only uses encoder part. This is used for dimensionality reduction.

            Parameters
            ----------
            `x`: torch.Tensor
                Input tensor

            Returns
            -------
            torch.Tensor
                Output tensor
        """


        x = self.encoder(x)
        return x
    

    def training_step(self, batch, batch_idx):
        """
            Training step of the model, used in PyTorch Lighning wrapper

            Parameters
            ----------
            `batch`: tuple
                Tuple of input and target tensors
            `batch_idx`: int
                Batch index

            Returns
            -------
            torch.Tensor
                Output tensor, which is the loss value of the model
        """

        x, y = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        """
            Validation step of the model, used in PyTorch Lighning wrapper

            Parameters
            ----------
            `batch`: tuple
                Tuple of input and target tensors
            `batch_idx`: int
                Batch index

            Returns
            -------
            torch.Tensor
                Output tensor, which is the loss value of the model
        """

        
        x, y = batch
        y_hat = self(x)
        loss = F.mse_loss(y_hat, y)
        self.log('val_loss', loss)
        return loss
    
    def configure_optimizers(self):
        return self.optimizer(self.parameters(), lr=self.learning_rate)

In [None]:
def AutoEncoderReducer(layer_characteristics, train_data, test_data, val_data = None):
    if(layer_characteristics is None):
        layer_characteristics = [
            {
                "output_size": 500,
                "activation": "tanh",
                "layer_type": "linear",
                "dropout": 0.2,
            },
            {
                "output_size": 200,
                "activation": "relu",
                "layer_type": "linear",
                "dropout": 0.2,
            }
        ]
    


    # Create model
    model = AutoEncoder(layer_characteristics, train_data.shape[1])



    # Create data loader. We only use training data for training
    from torch.utils.data import TensorDataset, DataLoader
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    dataset = TensorDataset(torch.tensor(train_data, dtype=torch.float32).to(device), torch.tensor(train_data, dtype=torch.float32).to(device))
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

    # Create trainer
    trainer = pl.Trainer(
        max_epochs=1000, 
        callbacks=[
            pl.callbacks.EarlyStopping(monitor='train_loss', patience=10, mode='min'),
            pl.callbacks.ModelCheckpoint(
                monitor='train_loss', 
                mode='min', 
                dirpath = "checkpoints/", 
                filename='autoencoder', 
                save_top_k=1, 
                verbose=True,
                save_weights_only=False,
            ),
            # pl.callbacks.RichProgressBar(),
        ]
    )
    
    # Train
    

    trainer.fit(model, dataloader)

    model = AutoEncoder.load_from_checkpoint(
        trainer.checkpoint_callback.best_model_path,
        layer_characteristics=layer_characteristics,
        input_size=train_data.shape[1],
        strict=False
    )
    model.eval()
    with torch.no_grad():
        model.to(device)
        # Inference
        return (
            model.inference(torch.tensor(train_data, dtype=torch.float32).to(device)).detach().cpu().numpy(),
            model.inference(torch.tensor(test_data, dtype=torch.float32).to(device)).detach().cpu().numpy(), 
            model.inference(torch.tensor(val_data, dtype=torch.float32).to(device)).detach().cpu().numpy()
        ) if val_data is not None else (
            model.inference(torch.tensor(train_data, dtype=torch.float32).to(device)).detach().cpu().numpy(),
            model.inference(torch.tensor(test_data, dtype=torch.float32).to(device)).detach().cpu().numpy(), 
        )

### 4.2. Canonical DR Models

In [None]:
def PCAReducer(n_components, train_data, test_data, val_data = None):
    from sklearn.decomposition import PCA
    pca = PCA(n_components=n_components).fit(train_data)
    return (
        pca.transform(train_data), 
        pca.transform(test_data),
        pca.transform(val_data)
    ) if val_data is not None else (
        pca.transform(train_data), 
        pca.transform(test_data)
    )



## 5. Binary Classification

### 5.1. Canonical ML Classifiers

In [None]:
def SVMClassifier(train_data, train_labl):
    from sklearn.svm import SVC
    return SVC(probability=True, verbose=True).fit(train_data, train_labl)

def RandomForestClassifier(train_data, train_labl):
    from sklearn.ensemble import RandomForestClassifier
    return RandomForestClassifier(verbose=True).fit(train_data, train_labl)

def LogisticRegressionClassifier(train_data, train_labl):
    from sklearn.linear_model import LogisticRegression
    return LogisticRegression(max_iter=1000, verbose=True).fit(train_data, train_labl)

def AdaBoostClassifier(train_data, train_labl):
    from sklearn.ensemble import AdaBoostClassifier
    return AdaBoostClassifier().fit(train_data, train_labl)

### 5.2. Neural Network Classifier

In [None]:
from typing import Any
import numpy as np
import pandas as pd
from pytorch_lightning.utilities.types import STEP_OUTPUT
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl



class NeuralClassifierCore(pl.LightningModule):
    def __init__(self, layer_characteristics, input_size, learning_rate=1e-3, optimizer=torch.optim.Adam):
        """
            Initialize the Binary Neural Classifier model

            Parameters
            ----------
            - `layer_characteristics`: list
                List of dictionaries, each dictionary contains the following keys:
                    - `layer_type`: "linear"
                    - `activation`: "relu", "sigmoid", "tanh", "softmax"
                    - `dropout`: 0.0 - 1.0
                    - `output_size`: int
                Regardless of the number of layers, the network will always have the final feedforward layer with 1 output and sigmoid activation,
            - `input_size`: int
                Input size of the model. This is the feature length
            - `learning_rate`: float, default=1e-3
                Learning rate of the optimizer
            - `optimizer`: `torch.optim`, default=torch.optim.Adam
                Optimizer of the model

        """

        logging.info("===============================================================")
        logging.info("Initializing NeuralClassifierCore")

        super(NeuralClassifierCore, self).__init__()

        self.net = nn.ModuleList()
        self.learning_rate = learning_rate
        self.optimizer = optimizer
        self.criterion = nn.BCELoss()

        if(layer_characteristics[-1]['output_size'] != 1):
            layer_characteristics.append({
                "output_size": 1,
                "layer_type": "linear",
                "activation": "sigmoid",
                "dropout": 0.0,
            })


        for i, layer in enumerate(layer_characteristics):
            input_size  = input_size if i == 0 else layer_characteristics[i-1]["output_size"]
            output_size = layer["output_size"]
            layer_type  = layer["layer_type"]
            activation  = layer["activation"]
            dropout     = layer["dropout"]

            logging.info(f"Layer {i}: {input_size} -> {output_size} | {activation} | {layer_type} | {dropout}")


            # Layer
            if(layer_type == "linear"): 
                self.net.append(nn.Linear(input_size, output_size))
                self.net.append(nn.Dropout(dropout))
            else: raise ValueError("Invalid layer type")
            

            # Activation
            if(activation == "relu"): self.net.append(nn.ReLU())
            elif(activation == "sigmoid"): self.net.append(nn.Sigmoid())
            elif(activation == "tanh"): self.net.append(nn.Tanh())
            elif(activation == "softmax"): self.net.append(nn.Softmax())
            else: raise ValueError("Invalid activation function")

        self.net = nn.Sequential(*self.net)


    def forward(self, x):
        """
            Forward pass of the model. This is used for training only.

            Parameters
            ----------
            `x`: torch.Tensor
                Input tensor

            Returns
            -------
            torch.Tensor
                Output tensor
        """

        out = self.net(x).squeeze()
        return out
    
    

    def training_step(self, batch, batch_idx):
        """
            Training step of the model, used in PyTorch Lighning wrapper

            Parameters
            ----------
            `batch`: tuple
                Tuple of input and target tensors
            `batch_idx`: int
                Batch index

            Returns
            -------
            torch.Tensor
                Output tensor, which is the loss value of the model
        """

        x, y = batch
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.log('train_loss', loss)
        return loss
    
    def validation_step(self, batch, batch_idx):
        """
            Validation step of the model, used in PyTorch Lighning wrapper

            Parameters
            ----------
            `batch`: tuple
                Tuple of input and target tensors
            `batch_idx`: int
                Batch index

            Returns
            -------
            torch.Tensor
                Output tensor, which is the loss value of the model
        """

        
        x, y = batch
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.log('val_loss', loss)
        return loss
    

    def test_step(self, batch, batch_idx):
        """
            Testing step of the model, used in PyTorch Lighning wrapper

            Parameters
            ----------
            `batch`: tuple
                Tuple of input and target tensors
            `batch_idx`: int
                Batch index

            Returns
            -------
            torch.Tensor
                Output tensor, which is the loss value of the model
        """

        
        x, y = batch
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.log('test_loss', loss)
        return loss

    
    def configure_optimizers(self):
        return self.optimizer(self.parameters(), lr=self.learning_rate)

In [None]:
def NeuralClassifier(train_data, train_label, val_data, val_label, test_data, test_label, layer_characteristics=None):

    logging.info(f"What is train_data: {train_data.shape}")

    layer_characteristics = layer_characteristics if layer_characteristics is not None else [
        {
            "output_size": np.min([200, train_data.shape[1]]),
            "activation": "relu",
            "layer_type": "linear",
            "dropout": 0.2,
        },
        {
            "output_size": 100,
            "activation": "tanh",
            "layer_type": "linear",
            "dropout": 0.2,
        },
        {
            "output_size": 50,
            "activation": "relu",
            "layer_type": "linear",
            "dropout": 0.2,
        }
    ]

    # Create model
    cls = NeuralClassifierCore(layer_characteristics, train_data.shape[1])

    # Create data loader
    from torch.utils.data import TensorDataset, DataLoader
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    train_dataset = TensorDataset(torch.tensor(np.array(train_data), dtype=torch.float32).to(device), torch.tensor(np.array(train_label), dtype=torch.float32).to(device))
    val_dataset = TensorDataset(torch.tensor(np.array(val_data), dtype=torch.float32).to(device), torch.tensor(np.array(val_label), dtype=torch.float32).to(device))
    test_dataset = TensorDataset(torch.tensor(np.array(test_data), dtype=torch.float32).to(device), torch.tensor(np.array(test_label), dtype=torch.float32).to(device))
    
    
    train = DataLoader(train_dataset, batch_size=64, shuffle=True, persistent_workers=True, num_workers=3)
    val = DataLoader(val_dataset, batch_size=64, persistent_workers=True, num_workers=2)
    test = DataLoader(test_dataset, batch_size=64, persistent_workers=True, num_workers=2)



    # Create trainer
    trainer = pl.Trainer(
        max_epochs=500, 
        callbacks=[
            pl.callbacks.EarlyStopping(monitor='val_loss', patience=10, mode='min'),            
            pl.callbacks.ModelCheckpoint(
                monitor='val_loss', 
                mode='min', 
                dirpath = "checkpoints/", 
                filename='classifier', 
                save_top_k=1, 
                verbose=True,
                save_weights_only=False,
            ),
            # pl.callbacks.RichProgressBar(),
        ]
    )

    # Train
    trainer.fit(cls, train_dataloaders=train, val_dataloaders=val)
    cls = NeuralClassifierCore.load_from_checkpoint(
        trainer.checkpoint_callback.best_model_path,
        layer_characteristics=layer_characteristics,
        input_size=train_data.shape[1],
        strict=False,
    )
    logging.info("Validation loss:")    
    trainer.test(cls, val)
    logging.info("Test loss:")
    trainer.test(cls, test)

    
    

    # Inference
    return cls
    

## 6. Model Training

In [37]:
reduction_methods = ["Not Reduced", "PCA", "Auto Encoder"]
result_AUC = pd.DataFrame(index = reduction_methods)
result_AUC.index.name = "Reduction Method"


data_dict = {"Canon ML": {}, "Deep Learning": {}}
models_dict = {}


AE_characteristics = [
    {
        "output_size": 1000,
        "activation": "relu",
        "layer_type": "linear",
        "dropout": 0.2,
    },
    {
        "output_size": 500,
        "activation": "relu",
        "layer_type": "linear",
        "dropout": 0.2,
    },
    {
        "output_size": 200,
        "activation": "relu",
        "layer_type": "linear",
        "dropout": 0.2,
    }
]

### 6.1. Canonical ML

In [38]:
cls_methods = ["Random Forest", "Logistic Regression", "SVM", "AdaBoost"]

In [None]:
for (i, reduction_method) in enumerate(reduction_methods):
    # Dimensionality Reduction
    if(reduction_method == "PCA"): processed_train_data, processed_test_data = PCAReducer(200, original_train_data, original_test_data)
    elif(reduction_method == "Auto Encoder"): processed_train_data, processed_test_data = AutoEncoderReducer(AE_characteristics, original_train_data, original_test_data)
    else: processed_train_data, processed_test_data = original_train_data.copy(), original_test_data.copy()

    data_dict["Canon ML"][reduction_method] = dict(
        train_data = processed_train_data,
        test_data  = processed_test_data,
    )

    

    # Classification
    for (j, cls_method) in enumerate(cls_methods):
        logging.info(f"Processing {reduction_method} - {cls_method}...")
        logging.info(f"Feature size: {processed_train_data.shape[1]}, Sample size: {processed_train_data.shape[0]}")

        if(cls_method == "SVM"):                    cls = SVMClassifier(processed_train_data, original_train_labl)
        elif(cls_method == "Random Forest"):         cls = RandomForestClassifier(processed_train_data, original_train_labl)
        elif(cls_method == "Logistic Regression"):   cls = LogisticRegressionClassifier(processed_train_data, original_train_labl)
        elif(cls_method == "AdaBoost"):             cls = AdaBoostClassifier(processed_train_data, original_train_labl)
        else: raise ValueError("Invalid classification method")

        models_dict[reduction_method, cls_method] = cls

    logging.info(f"====================================================")

# Cinderella

### 6.2. Deep Learning

In [40]:
cls_methods = cls_methods + ["Deep Learning"]
cls_characteristics = [
    {
        "output_size": 75,
        "activation": "relu",
        "layer_type": "linear",
        "dropout": 0.2,
    },
    {
        "output_size": 50,
        "activation": "tanh",
        "layer_type": "linear",
        "dropout": 0.2,
    },
    {
        "output_size": 10,
        "activation": "relu",
        "layer_type": "linear",
        "dropout": 0.2,
    },
]

In [None]:
for (i, reduction_method) in enumerate(reduction_methods):
    # We further split the training data into training and validation data
    dl_train_data, dl_val_data, dl_train_labl, dl_val_labl = train_test_split(original_train_data, original_train_labl, test_size=1/8, random_state=42)

    # Dimensionality Reduction
    if(reduction_method == "PCA"): processed_train_data, processed_test_data, processed_val_data = PCAReducer(200, dl_train_data, original_test_data, dl_val_data)
    elif(reduction_method == "Auto Encoder"): processed_train_data, processed_test_data, processed_val_data = AutoEncoderReducer(AE_characteristics, dl_train_data, original_test_data, dl_val_data)
    else: processed_train_data, processed_test_data, processed_val_data = dl_train_data.copy(), original_test_data.copy(), dl_val_data.copy()

    data_dict["Deep Learning"][reduction_method] = dict(
        train_data = processed_train_data,
        test_data  = processed_test_data,
        val_data   = processed_val_data,
    )

    logging.info(f"Train set size: {processed_train_data.shape}")
    logging.info(f"Test set size: {processed_test_data.shape}")
    logging.info(f"Val set size: {processed_val_data.shape}")

    
    # Classification
    models_dict[reduction_method, "Deep Learning"] = NeuralClassifier(processed_train_data, dl_train_labl, processed_val_data, dl_val_labl, processed_test_data, original_test_labl, cls_characteristics)

    logging.info(f"====================================================")


## 7. AUC

In [None]:
from sklearn.metrics import roc_curve, auc
fig, ax = plt.subplots(
    len(reduction_methods), 
    len(cls_methods), 
    figsize=(
        10 * len(cls_methods),
        10 * len(reduction_methods)
    )
)


for (i, reduction_method) in enumerate(reduction_methods):
    for (j, cls_method) in enumerate(cls_methods):


        cls = models_dict[reduction_method, cls_method]
        if(cls_method == "Deep Learning"): 
            processed_test_data = data_dict["Deep Learning"][reduction_method]["test_data"]
            predicted = cls(torch.tensor(processed_test_data, dtype=torch.float32)).detach().cpu().numpy()
        else: 
            processed_test_data = data_dict["Canon ML"][reduction_method]["test_data"]
            predicted = cls.predict_proba(processed_test_data)[::,1]


        fpr, tpr, _ = roc_curve(original_test_labl, predicted)
        auc_value = auc(fpr, tpr)

        result_AUC.loc[reduction_method, cls_method] = auc_value
        # Plot ROC curve
        if(len(reduction_methods) == 1): axxx = ax[j]
        elif(len(cls_methods) == 1): axxx = ax[i]
        else: axxx = ax[i, j]
        

        axxx.plot(list(fpr), list(tpr), label=f"AUC = {auc_value:.3f}")
        axxx.set_title(f"{reduction_method} | {cls_method} | {processed_test_data.shape[1]} features | AUC = {auc_value:.3f}")

In [43]:
result_AUC.to_excel("./AUC Result.xlsx")

In [None]:
display(result_AUC)