# Training and Testing Models

## Libraries

In [6]:
import os
import json
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay
from tqdm import tqdm
import matplotlib.pyplot as plt
from Helpers.data import Data
from Helpers.models import Models

## Preprocessing

### Frame Skip Sampling & Normalization

In [None]:
path = "" # release in process
N = 50
n = 15
axes = ['X', 'Y']
body_parts = ["pose", "hand_r","hand_l"]
keypoints_len = [15, 21, 21]
cols = ["class", "subject"]

# Generar columnas del df
for frame in range(1, n + 1):
    for body_part, n_keypoints in zip(body_parts, keypoints_len):
        for k_id in range(0, n_keypoints):
            for axis in axes:
                col_name = f"{body_part}_K{k_id}_{axis}_F{frame}"
                cols.append(col_name)

rows = []
for glosa in os.listdir(path):
    glosa_path = os.path.join(path, glosa)
    for subject in os.listdir(glosa_path):
        json_path = os.path.join(glosa_path, subject, "json")
        frames = os.listdir(json_path)
        
        
        N_samples, trash = ppc.getNSamples(N=N, n=n, l=len(frames))
        N_samples = ppc.getNSamplesArray(N_samples=N_samples)
        for indices in N_samples:
            row = ppc.make_a_row(label=glosa, subject=subject, json_path=json_path,
                                frames=frames, indices=indices, normalize=True)
            rows.append(row)
df = pd.DataFrame(rows, columns=cols)

### handling of null values

In [None]:
def fill_missing_with_previous_frame(df, body_parts, keypoints_len, axes, n):
    """
    Realiza un forward-fill horizontal para cada subsecuencia de columnas
    (body_part, keypoint, axis) a lo largo de los frames 1..n.
    """
    for body_part, n_kpoints in zip(body_parts, keypoints_len):
        for k_id in range(n_kpoints):
            for axis in axes:
                # Construimos la lista de columnas correspondientes
                # a un mismo (body_part, k_id, axis) a través de los frames
                frame_cols = [f"{body_part}_K{k_id}_{axis}_F{frame}"
                              for frame in range(1, n + 1)]
                
                # Subdataframe con sólo estas columnas
                subdf = df[frame_cols]
                
                # Hacemos una transposición, ffill vertical (que aquí rellena
                # "hacia abajo"), y volvemos a transponer.
                # Esto equivale a un ffill horizontal en las columnas originales.
                subdf = subdf.T.ffill(axis=0).T
                
                # Asignamos de vuelta al df original
                df[frame_cols] = subdf

    return df

In [None]:
df = fill_missing_with_previous_frame(
    df=df,
    body_parts=["pose", "hand_r","hand_l"],
    keypoints_len=[15, 21, 21],
    axes=['X', 'Y'],
    n=15
)
df = df.dropna() # Eliminamos las instancias que no pudieron ser rellenadas
df.to_csv("./Data/Dataset.csv", index=False)

## Subject-Indepent Validation

### Download Preprocess Dataset

In [None]:
#!pip install -q gdown # you need install gdown to dowload the datase size: 2.19Gb
import gdown
file_id = "1knZzpGblTER4O2KVjXT1ei0uooWGQSTO"
gdown.download(id=file_id, output="./Data/Dataset.csv", quiet=False)

### Run Experiments

In [None]:
df = pd.read_csv("./Data/Dataset.csv")

In [None]:
data_obj = Data(df=df)
subjects_array = [1,2,3,4,5,6,7,8,9,10]
for sub in tqdm(subjects_array, desc="Procesando sujetos", unit="sujeto"):
    train_data, test_data = data_obj.LeaveOneOutExp1(sub)
    if len(train_data) == 0:
        print(f"[ADVERTENCIA] Sujeto {sub}: train_data vacío. Se omite.")
        continue
    if len(test_data) == 0:
        print(f"[ADVERTENCIA] Sujeto {sub}: test_data vacío. Se omite.")
        continue

    X_train, y_train = data_obj.SplitXandY(train_data)
    X_test, y_test   = data_obj.SplitXandY(test_data)
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train,
        test_size=0.1,
        random_state=42,
        stratify=y_train
    )
    
    (y_train_enc, y_val_enc, y_test_enc), le = data_obj.EncodeLabels([y_train, y_val, y_test])
    input_shape = (X_train.shape[1], X_train.shape[2])  # (frames, features)
    class_names = le.classes_
    labels = np.arange(len(class_names))
    n_classes = len(class_names)

    y_true = y_test_enc.argmax(axis=1)
    
    models_builder = Models()
    nombres = []
    modelos = []
    
    resnet, inputs = models_builder.ResNet1D(input_shape)
    resnet = models_builder.AddClassificationLayer(
        base_model=resnet,
        inputs=inputs,
        n_classes=n_classes,
        dropout=[0.8],
        denses=[128, 128],
        sequential=False
    )
    nombres.append('Resnet')
    modelos.append(resnet)
    
    simpleLSTM = models_builder.SimpleRnnLSTM(input_shape, 111)
    simpleLSTM.add(tf.keras.layers.Dropout(0.8))
    simpleLSTM = models_builder.AddClassificationLayer(
        base_model=simpleLSTM,
        n_classes=n_classes,
        dropout=[0.7],
        denses=[128],
        sequential=True
    )
    nombres.append('LSTM')
    modelos.append(simpleLSTM)
    
    gergesLSTM = models_builder.GergesLSTM(input_shape)
    gergesLSTM = models_builder.AddClassificationLayer(
        base_model=gergesLSTM,
        n_classes=n_classes,
        denses=[64, 32]
    )
    nombres.append('LSTM Gerges')
    modelos.append(gergesLSTM)
    
    biLSTM = models_builder.BiLSTM(input_shape)
    biLSTM = models_builder.AddClassificationLayer(
        base_model=biLSTM,
        n_classes=n_classes,
        denses=[64, 32]
    )
    nombres.append('BiLSTM Gerges')
    modelos.append(biLSTM)
    
    gru = models_builder.GRU(input_shape)
    gru = models_builder.AddClassificationLayer(
        base_model=gru,
        n_classes=n_classes,
        denses=[64, 32]
    )
    nombres.append('GRU Gerges')
    modelos.append(gru)
    
    simplernn = models_builder.SimpleRNN(input_shape, units=128)
    simplernn = models_builder.AddClassificationLayer(
        base_model=simplernn,
        n_classes=n_classes,
        dropout=[0.3],
        denses=[128, 32],
        sequential=True
    )
    nombres.append('RNN simple')
    modelos.append(simplernn)
    
    resultados = []
    modelos_con_nombres = list(zip(nombres, modelos))

    for nombre, model in tqdm(modelos_con_nombres, desc=f"Modelos para sujeto {sub}", unit="modelo", leave=False):
        dirpath = os.path.join(".", "Modelos", "Experimento", str(sub), nombre)
        os.makedirs(dirpath, exist_ok=True)
        start_time_train = time.time()
        models_builder.TrainModel(
            model=model,
            X_train=X_train,
            X_val=X_val,
            y_train=y_train_enc,
            y_val=y_val_enc,
            dirpath=dirpath,
            epochs=25,
            batch_size=32,
        )
        end_time_train = time.time()
        training_time = end_time_train - start_time_train
        history_path = os.path.join(dirpath, "history_epochs(25)_batch_size(32).csv")
        history_df = pd.read_csv(history_path)
        
        final_epoch = history_df.iloc[-1]
        train_acc = final_epoch['accuracy']
        train_loss = final_epoch['loss']
        val_acc = final_epoch['val_accuracy']
        val_loss = final_epoch['val_loss']
        
        start_time_eval_last = time.time()
        test_loss, test_accuracy = model.evaluate(X_test, y_test_enc, batch_size=32, verbose=0)
        end_time_eval_last = time.time()
        eval_time_last = end_time_eval_last - start_time_eval_last

        y_pred_probs = model.predict(X_test, batch_size=32, verbose=0)
        y_pred = y_pred_probs.argmax(axis=1)

        cm = confusion_matrix(y_true, y_pred, labels=labels)
        TP = np.diag(cm)
        FP = cm.sum(axis=0) - TP
        FN = cm.sum(axis=1) - TP
        TN = cm.sum() - (TP + FP + FN)

        sensitivity = TP / (TP + FN)
        specificity = TN / (TN + FP)
        avg_sensitivity_last = np.nanmean(sensitivity)
        avg_specificity_last = np.nanmean(specificity)

        # F1-score (macro avg)
        report = classification_report(y_true, y_pred, labels=labels, target_names=class_names, zero_division=0, output_dict=True)
        f1score_last = report['macro avg']['f1-score']

        # Guardar reports
        report_df = pd.DataFrame(report).transpose()
        reports_dir = os.path.join(dirpath, "Reports")
        os.makedirs(reports_dir, exist_ok=True)
        report_df.to_csv(os.path.join(reports_dir, "Classification_Report_LastModel.csv"), index=False)

        metrics_df = pd.DataFrame({
            "Class": class_names,
            "Sensitivity": sensitivity,
            "Specificity": specificity
        })
        metrics_df.to_csv(os.path.join(reports_dir, "Sensitivity_Specificity_LastModel.csv"), index=False)

        # Guardar el last_model
        last_model_path = os.path.join(dirpath, "last_model_epochs(25)_batch_size(32).h5")
        model.save(last_model_path)

        # 5.5 best_model_loss
        best_loss_path = os.path.join(dirpath, "best_model_loss.h5")
        best_loss_model = keras.models.load_model(best_loss_path)

        start_time_eval_best_loss = time.time()
        best_loss_test_loss, best_loss_test_accuracy = best_loss_model.evaluate(X_test, y_test_enc, batch_size=32, verbose=0)
        end_time_eval_best_loss = time.time()
        eval_time_best_loss = end_time_eval_best_loss - start_time_eval_best_loss

        y_pred_probs_bl = best_loss_model.predict(X_test, batch_size=32, verbose=0)
        y_pred_bl = y_pred_probs_bl.argmax(axis=1)

        cm_bl = confusion_matrix(y_true, y_pred_bl, labels=labels)
        TP = np.diag(cm_bl)
        FP = cm_bl.sum(axis=0) - TP
        FN = cm_bl.sum(axis=1) - TP
        TN = cm_bl.sum() - (TP + FP + FN)

        sensitivity_bl = TP / (TP + FN)
        specificity_bl = TN / (TN + FP)
        avg_sensitivity_best_loss = np.nanmean(sensitivity_bl)
        avg_specificity_best_loss = np.nanmean(specificity_bl)

        report_bl = classification_report(y_true, y_pred_bl, labels=labels, target_names=class_names, zero_division=0, output_dict=True)
        f1score_best_loss = report_bl['macro avg']['f1-score']

        report_bl_df = pd.DataFrame(report_bl).transpose()
        report_bl_df.to_csv(os.path.join(reports_dir, "Classification_Report_BestLossModel.csv"), index=False)

        metrics_best_loss_df = pd.DataFrame({
            "Class": class_names,
            "Sensitivity": sensitivity_bl,
            "Specificity": specificity_bl
        })
        metrics_best_loss_df.to_csv(os.path.join(reports_dir, "Sensitivity_Specificity_BestLossModel.csv"), index=False)

        # 5.6 best_model_acc
        best_acc_path = os.path.join(dirpath, "best_model_acc.h5")
        best_acc_model = keras.models.load_model(best_acc_path)

        start_time_eval_best_acc = time.time()
        best_acc_test_loss, best_acc_test_accuracy = best_acc_model.evaluate(X_test, y_test_enc, batch_size=32, verbose=0)
        end_time_eval_best_acc = time.time()
        eval_time_best_acc = end_time_eval_best_acc - start_time_eval_best_acc

        y_pred_probs_ba = best_acc_model.predict(X_test, batch_size=32, verbose=0)
        y_pred_ba = y_pred_probs_ba.argmax(axis=1)

        cm_ba = confusion_matrix(y_true, y_pred_ba, labels=labels)
        TP = np.diag(cm_ba)
        FP = cm_ba.sum(axis=0) - TP
        FN = cm_ba.sum(axis=1) - TP
        TN = cm_ba.sum() - (TP + FP + FN)

        sensitivity_ba = TP / (TP + FN)
        specificity_ba = TN / (TN + FP)
        avg_sensitivity_best_acc = np.nanmean(sensitivity_ba)
        avg_specificity_best_acc = np.nanmean(specificity_ba)

        report_ba = classification_report(y_true, y_pred_ba, labels=labels, target_names=class_names, zero_division=0, output_dict=True)
        f1score_best_acc = report_ba['macro avg']['f1-score']

        report_ba_df = pd.DataFrame(report_ba).transpose()
        report_ba_df.to_csv(os.path.join(reports_dir, "Classification_Report_BestAccModel.csv"), index=False)

        metrics_best_acc_df = pd.DataFrame({
            "Class": class_names,
            "Sensitivity": sensitivity_ba,
            "Specificity": specificity_ba
        })
        metrics_best_acc_df.to_csv(os.path.join(reports_dir, "Sensitivity_Specificity_BestAccModel.csv"), index=False)
        
        resultado = {
            'sujeto': sub,
            'nombre': nombre,
            'TrainAcc': train_acc,
            'TrainLoss': train_loss,
            'ValAcc': val_acc,
            'ValLoss': val_loss,

            'TestAcc_LastModel': test_accuracy,
            'TestLoss_LastModel': test_loss,
            'Sensitivity_LastModel': avg_sensitivity_last,
            'Specificity_LastModel': avg_specificity_last,
            'Fscore_LastModel': f1score_last,

            'TestAcc_BestLossModel': best_loss_test_accuracy,
            'TestLoss_BestLossModel': best_loss_test_loss,
            'Sensitivity_BestLossModel': avg_sensitivity_best_loss,
            'Specificity_BestLossModel': avg_specificity_best_loss,
            'Fscore_BestLossModel': f1score_best_loss,

            'TestAcc_BestAccModel': best_acc_test_accuracy,
            'TestLoss_BestAccModel': best_acc_test_loss,
            'Sensitivity_BestAccModel': avg_sensitivity_best_acc,
            'Specificity_BestAccModel': avg_specificity_best_acc,
            'Fscore_BestAccModel': f1score_best_acc,

            'TrainingTime_sec': training_time,
            'EvalTime_LastModel_sec': eval_time_last,
            'EvalTime_BestLossModel_sec': eval_time_best_loss,
            'EvalTime_BestAccModel_sec': eval_time_best_acc
        }
        resultados.append(resultado)
        
    df_resultados = pd.DataFrame(resultados)
    output_csv = os.path.join(".", "Modelos", "Experimento", str(sub), "resultados.csv")
    df_resultados.to_csv(output_csv, index=False)
    print(f"Resultados guardados en: {output_csv}")
        