In [19]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import polars as pl
import time
import warnings
import gc
from hurst import compute_Hc
from scipy.signal import hilbert
from scipy.signal import iirfilter, filtfilt
import pickle
from sklearn.preprocessing import StandardScaler
from keras.metrics import KLDivergence
# from scipy.stats import skew, kurtosis
# from sklearn.decomposition import PCA

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset


warnings.filterwarnings("ignore")

In [6]:
# Fabien's paths
directory = 'D:/Kaggle/2024/Harmful_brain_activity_classification/train_models/'

# PyTorch models

In [7]:
verif_data = pd.read_csv(directory + "verif_headers_order.csv")
verif_headers_order = verif_data.columns

class cScaler:
    def __init__(self):
        self.full_train = StandardScaler()

file = open(directory + "standard_scaler.pickle", 'rb')
scaler = pickle.load(file)
file.close()

In [8]:
testdata = pd.read_parquet(directory + "Combined_Features_wf_all.parquet")

Y_data = testdata.iloc[:,9:15].values
print(testdata.shape)

testdata = testdata.iloc[:,15:]
testdata = testdata.select_dtypes(include=[np.number])
testdata = testdata.drop("Total_votes", axis =1)
print(testdata.shape)

# strategy for missing values
testdata = testdata.replace(np.nan, 0)

Y_data = Y_data / np.sum(Y_data,axis=1,keepdims=True)
X_cols = testdata.select_dtypes(include=[np.number]).keys()
X_data = testdata.select_dtypes(include=[np.number])
X_data = scaler.full_train.transform(X_data) # standardization

(106800, 1430)
(106800, 1410)


In [9]:
dataT = torch.tensor(X_data).float()
labelsT = torch.tensor(Y_data).float()

In [11]:
def createBnormModel(Datain):

    class BnormModel(nn.Module):
        def __init__(self):
             super().__init__()

             ### input layer
             self.input = nn.Linear(Datain.shape[1],1500)
             self.bnin = nn.BatchNorm1d(1500)
             ### hidden layers
             self.fc1 = nn.Linear(1500,2000)
             self.bn1 = nn.BatchNorm1d(2000)
             self.fc2 = nn.Linear(2000,1500)
             self.bn2 = nn.BatchNorm1d(1500)
             self.fc3 = nn.Linear(1500,200)
             self.dropout = nn.Dropout(0.2)

             ### output layer
             self.output = nn.Linear(200,6)

        # forward pass
        def forward(self,x):
             x = F.relu( self.bnin(self.input(x)) )
             x = F.relu( self.bn1(self.fc1(x)) )
             # x = self.dropout(x)
             x = F.relu( self.bn2(self.fc2(x)) )
             # x = self.dropout(x)
             x = F.relu( self.fc3(x) )
             return torch.log_softmax( self.output(x),axis=1 )
            
    # create the model instance
    net = BnormModel()

    # loss function
    # lossfun = nn.NLLLoss()
    lossfun = nn.KLDivLoss(reduction='batchmean')

    # optimizer
    optimizer = torch.optim.Adam(net.parameters(),lr=1e-4, weight_decay=1e-3)

    return net, lossfun, optimizer

# Load the model
Norm_Net_trained, lossfun, optimizer = createBnormModel(X_data)
modelinfo = torch.load(directory + 'Trained_Norm_Net')
Norm_Net_trained.load_state_dict(modelinfo['model_state_dict'])
Norm_Net_trained.eval();

# Metrics

In [12]:
# assessment metrics
def log(x):
    x[x == 0] = 1e-15
    x[x == 1] = 1-1e-15
    return np.log(x)


def JensenShannonDiv(true_y, pred_y): # Jensen-Shannon Divergence https://towardsdatascience.com/how-to-understand-and-use-jensen-shannon-divergence-b10e11b03fd6
    # removing 0 to avoid divisions by 0.
    true_y[true_y == 0] = 1e-15
    pred_y[pred_y == 0] = 1e-15
    # sum to 1
    true_y = true_y.T / np.sum(true_y, axis = 1)
    true_y = true_y.T
    pred_y = pred_y.T / np.sum(pred_y, axis = 1)
    pred_y = pred_y.T
    JSD1 = pred_y*log(2* pred_y/ (pred_y+true_y))
    JSD2 = true_y*log(2* true_y/ (pred_y+true_y))
    JSD = 0.5*np.sum(JSD1, axis = 1) + 0.5*np.sum(JSD2, axis = 1)
    return np.nanmean(JSD)


# competition metric? 
def kl_divergence(solution, submission, epsilon = 1e-15, micro_average = False, sample_weights = None):
    if not isinstance(solution, pd.DataFrame): solution = pd.DataFrame(solution)
    if not isinstance(submission, pd.DataFrame): submission = pd.DataFrame(submission)   

    for col in solution.columns:

        if not pd.api.types.is_float_dtype(solution[col]):
            solution[col] = solution[col].astype(float)
        submission[col] = np.clip(submission[col], epsilon, 1 - epsilon)

        y_nonzero_indices = solution[col] != 0 
        solution[col] = solution[col].astype(float)
        solution.loc[y_nonzero_indices, col] = solution.loc[y_nonzero_indices, col] * \
                                                np.log(solution.loc[y_nonzero_indices, col] / submission.loc[y_nonzero_indices, col])
        
        solution.loc[~y_nonzero_indices, col] = 0

    if micro_average:
        return np.average(solution.sum(axis=1))#, weights=sample_weights)
    else:
        return np.average(solution.mean())

In [13]:
with torch.no_grad():
    pred = np.array(Norm_Net_trained(dataT))

pred = np.exp(pred)
print(pred)

In [22]:
print(f'Pytorch KLD loss: {lossfun(torch.tensor(pred).float(), labelsT)}')
print(f'Jensen Shannon Div: {JensenShannonDiv(np.array(labelsT), pred)}')
print(f'KL Divergence: {kl_divergence(np.array(labelsT), pred)}')
kld = KLDivergence()
print(f'Keras KLD loss: {kld(np.array(labelsT), pred)}')

Pytorch KLD loss: -1.1043344736099243
Jensen Shannon Div: 0.0320485420525074
KL Divergence: 0.02341656037011554
Keras KLD loss: 0.1404956579208374
