<a href="https://colab.research.google.com/github/Kalghoul/CNN-mobile/blob/main/CNN_Saad_github-v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Standard library
import os
import shutil
import random as python_random
from collections import Counter
from dataclasses import dataclass

# Third-party libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

from scipy import stats
from scipy.signal import butter, lfilter
from scipy.stats import mode

from sklearn.model_selection import train_test_split, GroupKFold, KFold, StratifiedGroupKFold
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    precision_recall_fscore_support,
    roc_auc_score,
    roc_curve,
    auc,
    f1_score,
    accuracy_score
)

# TensorFlow / Keras
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import (
    Input,
    Conv1D,
    Flatten,
    MaxPooling1D,
    Dropout,
    BatchNormalization,
    GlobalAveragePooling1D,
    AveragePooling1D,
    LSTM,
    concatenate,
    GRU,
    Bidirectional,
    Embedding,
    LayerNormalization,
    MultiHeadAttention,
    Dense,
    Activation,
    DepthwiseConv1D,
    Add,
    MultiHeadAttention
)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import Recall
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, CSVLogger
from tensorflow.keras.backend import clear_session
import tensorflow.keras.backend as K

In [None]:
clear_session()

In [None]:
np.random.seed(42)
python_random.seed(42)
tf.random.set_seed(42)

In [None]:
# New segment and slide lengths
segment_length_sec = 60
slide_length_sec = 5
sf_BVP = 64  # Sampling frequency for the BVP data of the PPGE dataset
feat_sf64 = ['bvp']

# Number of samples per segment and slide
# Converting the seconds into samples based on the frenquency sampling rate used by the device to capture the BVP data
segment_length_samples = segment_length_sec * sf_BVP
slide_length_samples = slide_length_sec * sf_BVP

In [None]:
df_segments = pd.read_pickle('/content/drive/MyDrive/i2mtc-2026/Dataframes/WARM-VR_df_segments_Olfactory_1_relx_exper_28-participants_60s_5s.pkl')

In [None]:
print("shape of original df: ", df_segments.shape )
# Step 1: Extract unique SubjectIDs
unique_subject_ids = df_segments['SubjectID'].unique()
print("unique_subject_ids of original", unique_subject_ids)

# Step 2: Shuffle the unique SubjectIDs
rng = np.random.RandomState(42)  # Create a random state object with seed 42
rng.shuffle(unique_subject_ids)  # Use the random state object to shuffle

# Step 3: Create a new DataFrame sorted according to the shuffled SubjectIDs
shuffled_df_segments_combined = pd.concat([df_segments[df_segments['SubjectID'] == sid] for sid in unique_subject_ids], ignore_index=True)

print("unique_subject_ids of new", shuffled_df_segments_combined['SubjectID'].unique())

# Verify the shuffling
print("shape of new df: ", shuffled_df_segments_combined.shape )
print(shuffled_df_segments_combined.head())

shape of original df:  (6527, 5)
unique_subject_ids of original [14 21  5 31 10  6 30 24 29 28  4 17 11 20 27 18 15 23  2  3  7 26 22 25
 12 16 13  8]
unique_subject_ids of new [28 16 29 26 14 11 23 22 17 20 18 21 10  6  5 15 25 31 13 12  2  8  7 24
  4 27  3 30]
shape of new df:  (6527, 5)
                                             Segment  ArousalLabel  \
0  [0.09251861177927405, 0.08855483433770503, 0.0...             0   
1  [-0.2037737519780108, 0.15841641174535917, 0.2...             0   
2  [-0.12558824194306173, -0.152938306289888, -0....             0   
3  [0.1715959717385762, 0.16465936121583036, 0.14...             0   
4  [-0.37778358166289117, -0.3560819001703007, -0...             0   

   ValenceLabel  RelaxationLabel  SubjectID  
0             1                1         28  
1             1                1         28  
2             1                1         28  
3             1                1         28  
4             1                1         28  


In [None]:
train_val_df = pd.read_pickle('/content/drive/MyDrive/i2mtc-2026/Dataframes/WARM-VR_train_val_df_Olfactory_1_relx_exper_28-participants_60s_5s.pkl')
print("Train/Val shape:", train_val_df.shape)

test_df = pd.read_pickle('/content/drive/MyDrive/i2mtc-2026/Dataframes/WARM-VR_test_df_Olfactory_1_relx_exper_28-participants_60s_5s.pkl')
print("Test_df shape:", test_df.shape)

Train/Val shape: (5244, 5)
Test_df shape: (1283, 5)


In [None]:
def cnn_60s_model(n_feat):
    # Building a Convolutional Neural Network (CNN) model using Sequential API

    model = Sequential()  # Initialize the model as a sequential model

    # First Convolutional Layer
    model.add(Conv1D(filters=8, kernel_size=64, strides=4, padding='same',
                     activation='relu', input_shape=(n_feat, 1)))
    # - Conv1D layer with 8 filters, a kernel size of 64, and stride of 4
    # - 'same' padding ensures output size is the same as input size
    # - ReLU activation is applied to introduce non-linearity
    # - The input shape is set to (n_feat, 1), where n_feat is the number of features

    model.add(MaxPooling1D(pool_size=4))
    # - MaxPooling layer with a pool size of 4 reduces the dimensionality by down-sampling

    model.add(BatchNormalization())
    # - BatchNormalization layer normalizes the output of the previous layer,
    #   which speeds up training and provides regularization

    # 50% Dropout Layer
    model.add(Dropout(0.5))
    # - Dropout layer with a dropout rate of 0.5 randomly drops 50% of the neurons during training
    #   to prevent overfitting

    # Second Convolutional Layer
    model.add(Conv1D(filters=16, kernel_size=32, strides=2, activation='relu', padding='same'))
    # - Conv1D layer with 16 filters, a kernel size of 32, and stride of 2
    # - 'same' padding is used again, with ReLU activation

    model.add(MaxPooling1D(pool_size=4))
    # - MaxPooling layer with a pool size of 4 further reduces dimensionality

    model.add(BatchNormalization())
    # - BatchNormalization layer normalizes the output again for faster and more stable training

    # 50% Dropout Layer
    model.add(Dropout(0.5))
    # - Another Dropout layer with a rate of 0.5 to add further regularization and reduce overfitting

    # Third Convolutional Layer
    model.add(Conv1D(filters=8, kernel_size=16, strides=1, activation='relu', padding='same'))
    # - Conv1D layer with 8 filters, a kernel size of 16, and stride of 1
    # - 'same' padding is used again with ReLU activation

    model.add(MaxPooling1D(pool_size=4))
    # - MaxPooling layer with a pool size of 4 to further reduce the feature map size

    # Flatten and Fully Connected Layers
    model.add(Flatten())
    # - Flatten layer converts the 2D feature maps to a 1D feature vector for the fully connected layers

    # 30% Dropout Layer
    model.add(Dropout(0.3))
    # - Dropout layer with a rate of 0.3 for regularization before the final output layer

    model.add(Dense(2, activation='softmax'))
    #model.add(Dense(1, activation='sigmoid'))
    # - Dense layer with 2 neurons, corresponding to the number of classes in a binary classification task
    # - Softmax activation is used to output probabilities for each class

    return model
    # - The function returns the compiled CNN model

In [None]:
#### Because the datasets are heavily imbalanced, we used weighted_categorical_crossentropy by calculating each class weight

#weights_array = np.array([class_weights[0], class_weights[1]])

def compute_class_weight_data(labels):
    # Get the unique classes
    classes = np.unique(labels)

    # Compute the class weights
    class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=labels)

    # Convert to dictionary format for use in loss function
    class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}
    weights_array = np.array([class_weights[0], class_weights[1]])

    print("Class Weights:", weights_array)
    return weights_array

def weighted_categorical_crossentropy(weights):
    weights = K.variable(weights)

    def loss(y_true, y_pred):
        y_pred /= K.sum(y_pred, axis=-1, keepdims=True)
        y_pred = K.clip(y_pred, K.epsilon(), 1 - K.epsilon())
        _loss = y_true * K.log(y_pred) * weights
        _loss = -K.sum(_loss, -1)
        return _loss

    return loss

In [None]:
#this function is needed to extract the information from the dataframe

def extract_df(df, indices):
    # If no specific indices are provided (indices is None), extract all data
    if indices is None:
        # Convert the 'Segment' column into a 2D NumPy array (using vstack for vertical stacking)
        segments = np.vstack(df['Segment'].values)
        # Extract 'ValenceLabel' and 'ArousalLabel' columns as arrays
        valence_labels = df['ValenceLabel'].values
        arousal_labels = df['ArousalLabel'].values
        relaxation_labels = df['RelaxationLabel'].values
    else:
        # If indices are provided, extract only the specified rows (using iloc for positional indexing)
        segments = np.vstack(df.iloc[indices]['Segment'].values)
        valence_labels = df.iloc[indices]['ValenceLabel'].values
        arousal_labels = df.iloc[indices]['ArousalLabel'].values
        relaxation_labels = df.iloc[indices]['RelaxationLabel'].values

    # Stack valence and arousal labels vertically and transpose them (labels become a 2D array with two columns)
    labels = np.vstack((valence_labels, arousal_labels, relaxation_labels)).T
    # Return the segments (features) and the corresponding labels
    return segments, labels

In [None]:
def create_directory_if_not_exists(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)

def run_experiment_hyperparameter(
    df_segments,
    segment_length_samp,
    model_builder,                     # <-- keep the refactor: pass a function that returns a compiled model
    save_path,
    epochs,
    num_classes=2,
    activation_type="softmax",
    loss_function="weighted_categorical",
    target_label="Valence",            # <-- "Valence" or "Arousal"
):
    """
    Runs 5-fold CV with subject-level splitting (no leakage).
    - `model_builder(segment_length_samp)` must return an uncompiled Keras model.
    - Set `activation_type="softmax"` for 2 outputs (one-hot), or "sigmoid" for single-output binary.
    - `loss_function="weighted_categorical"` enables class weighting for the chosen target.
    """
    loss_function_load = None

    # --- target selection (0: Valence, 1: Arousal) ---
    t = str(target_label).strip().lower()
    label_map = {"valence": 0, "arousal": 1, "relaxation": 2}
    if t not in label_map:
        raise ValueError("target_label must be 'Valence' or 'Arousal'or 'Relaxation' ")
    col_idx = label_map[t]
    print(f"Running target: {t.title()} (column {col_idx})")

    results = []
    unique_subject_ids = df_segments['SubjectID'].unique()
    kf = KFold(n_splits=5, shuffle=True, random_state=42)

    for fold_idx, (train_idx, test_idx) in enumerate(kf.split(unique_subject_ids)):
        print(f"\n==== Fold {fold_idx + 1} ({t.title()}) ====")

        train_subjects = unique_subject_ids[train_idx]
        test_subjects  = unique_subject_ids[test_idx]

        train_df = df_segments[df_segments['SubjectID'].isin(train_subjects)]
        test_df  = df_segments[df_segments['SubjectID'].isin(test_subjects)]

        print(f"Running experiment with batch size {BATCH_SIZE} and learning rate {LEARNING_RATE}")

        # Reset graph/session for cleanliness across folds
        tf.keras.backend.clear_session()

        # Extract data
        train_segments, train_labels = extract_df(train_df, None)
        test_segments,  test_labels  = extract_df(test_df, None)

        y_train = train_labels[:, col_idx].astype(int)
        y_test  = test_labels[:,  col_idx].astype(int)

        # Optional class weighting for chosen target
        if loss_function == "weighted_categorical":
            class_weights = compute_class_weight_data(y_train)
            loss_function_load = "weighted_categorical"
            loss_fn = weighted_categorical_crossentropy(class_weights)
            print("loss_function is weighted_categorical")
        else:
            loss_fn = loss_function

        print(f"Train segments shape: {train_segments.shape}")
        print(f"Test segments shape:  {test_segments.shape}")

        # Encode targets depending on head type
        if activation_type == "softmax":
            y_train_encoded = to_categorical(y_train, num_classes=num_classes)
            y_test_encoded  = to_categorical(y_test,  num_classes=num_classes)
        else:
            # sigmoid case: keep 0/1
            y_train_encoded = y_train
            y_test_encoded  = y_test

        print("y_train shape:", np.shape(y_train_encoded))
        print("y_test shape:",  np.shape(y_test_encoded))

        # ---- Build & compile model via the injected builder ----
        model = model_builder(segment_length_samp)  # <-- your factory, e.g., create_cnn_lstm_model
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
            loss=loss_fn,
            metrics=['accuracy']
        )

        # Unique checkpoint per target/fold config
        filepath = f"/content/drive/MyDrive/i2mtc-2026/Dataframes/ModelCheckpointLogs/i2mtc_ML_models/weights-best-{t}-{BATCH_SIZE}-{LEARNING_RATE}.keras"
        if os.path.exists(filepath):
            os.remove(filepath)

        checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')
        early_stop = EarlyStopping(monitor='val_accuracy', patience=80, verbose=1)
        callbacks_list = [checkpoint, early_stop]

        # Train
        history = model.fit(
            train_segments, y_train_encoded,
            epochs=epochs,
            batch_size=BATCH_SIZE,
            validation_data=(test_segments, y_test_encoded),
            callbacks=callbacks_list
        )

        # Reload best model
        if loss_function_load == "weighted_categorical":
            best_model = tf.keras.models.load_model(
                filepath,
                custom_objects={'loss': weighted_categorical_crossentropy(class_weights)}
            )
        else:
            best_model = tf.keras.models.load_model(filepath)

        # Evaluate
        val_loss, val_accuracy = best_model.evaluate(test_segments, y_test_encoded, verbose=0)
        pred_test_probs = best_model.predict(test_segments)

        # --- Binary predictions + AUC ---
        if activation_type == "softmax":
            pred_test_labels = np.argmax(pred_test_probs, axis=1)
            true_test_labels = y_test
            pos_probs = pred_test_probs[:, 1] if pred_test_probs.shape[1] == 2 else pred_test_probs.ravel()
            auc = roc_auc_score(true_test_labels, pos_probs) if len(np.unique(true_test_labels)) == 2 else None
        else:
            pos_probs = pred_test_probs.ravel()
            pred_test_labels = (pos_probs > 0.5).astype(int)
            true_test_labels = y_test
            auc = roc_auc_score(true_test_labels, pos_probs) if len(np.unique(true_test_labels)) == 2 else None

        precision, recall, f1, support = precision_recall_fscore_support(true_test_labels, pred_test_labels, average=None)
        f1_c0 = f1[0] if len(f1) > 0 else np.nan
        f1_c1 = f1[1] if len(f1) > 1 else np.nan
        print(f"[{t.title()}] F1 per class: 0={f1_c0:.4f}, 1={f1_c1:.4f}")

        results.append({
            'target': t,
            'batch_size': BATCH_SIZE,
            'learning_rate': LEARNING_RATE,
            'train_loss': history.history.get('loss', [None])[-1],
            'train_accuracy': history.history.get('accuracy', [None])[-1],
            'val_loss': val_loss,
            'val_accuracy': val_accuracy,
            'auc': auc,
            'unweighted_F1_class_0': f1_c0,
            'unweighted_F1_class_1': f1_c1,
        })

        print(f"Completed fold with batch size {BATCH_SIZE} and learning rate {LEARNING_RATE}")

    # ---- Aggregate & save ----
    if results:
        avg_train_loss = np.nanmean([r['train_loss'] for r in results])
        avg_train_accuracy = np.nanmean([r['train_accuracy'] for r in results])
        avg_val_loss = np.nanmean([r['val_loss'] for r in results])
        avg_val_accuracy = np.nanmean([r['val_accuracy'] for r in results])
        aucs = [r['auc'] for r in results if r['auc'] is not None]
        avg_auc = np.mean(aucs) if aucs else np.nan
        avg_unweighted_f1_class_0 = np.nanmean([r['unweighted_F1_class_0'] for r in results])
        avg_unweighted_f1_class_1 = np.nanmean([r['unweighted_F1_class_1'] for r in results])

        print(f"[{t.title()}] Overall Results:")
        print(f"Average Train Loss: {avg_train_loss:.4f}")
        print(f"Average Train Accuracy: {avg_train_accuracy:.4f}")
        print(f"Average Val Loss: {avg_val_loss:.4f}")
        print(f"Average Val Accuracy: {avg_val_accuracy:.4f}")
        print(f"Average AUC: {avg_auc:.4f}")
        print(f"Average Unweighted F1 Score Class 0: {avg_unweighted_f1_class_0:.4f}")
        print(f"Average Unweighted F1 Score Class 1: {avg_unweighted_f1_class_1:.4f}")

        df_metrics = pd.DataFrame(results)
        create_directory_if_not_exists(os.path.dirname(save_path))
        df_metrics.to_csv(save_path, index=False)
        print(f"Metrics saved to {save_path}")
    else:
        print("No results to summarize. Exiting...")

    return results


In [None]:
LEARNING_RATE: float = 0.001
EPOCHS:        int = 350
BATCH_SIZE:    int = 512
# you can use binary_crossentropy as a loss_function if you don't want to use the weighted one.

print ("For Relaxation")
save_path = '/content/drive/MyDrive/i2mtc-2026/Dataframes/Models_Results/WARM-VR/cnn_60s_model/train_val_df/5-folds-split/cnn_60s_model_metrics_softmax_results_60s_5s-0-001_Rel.csv'
result = run_experiment_hyperparameter(shuffled_df_segments_combined, segment_length_samples, cnn_60s_model, save_path, EPOCHS, target_label="Relaxation", num_classes=2, activation_type="softmax", loss_function="weighted_categorical")


print ("For Arousal")
save_path = '/content/drive/MyDrive/i2mtc-2026/Dataframes/Models_Results/WARM-VR/cnn_60s_model/train_val_df/5-folds-split/cnn_60s_model_metrics_softmax_results_60s_5s-0-001_Aro.csv'
result = run_experiment_hyperparameter(shuffled_df_segments_combined, segment_length_samples, cnn_60s_model, save_path, EPOCHS, target_label="Arousal", num_classes=2, activation_type="softmax", loss_function="weighted_categorical")

print ("For Valence")
save_path = '/content/drive/MyDrive/i2mtc-2026/Dataframes/Models_Results/WARM-VR/cnn_60s_model/train_val_df/5-folds-split/cnn_60s_model_metrics_softmax_results_60s_5s-0-001_Val.csv'
result = run_experiment_hyperparameter(shuffled_df_segments_combined, segment_length_samples, cnn_60s_model, save_path, EPOCHS, target_label="Valence", num_classes=2, activation_type="softmax", loss_function="weighted_categorical")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8716 - loss: 0.2856 - val_accuracy: 0.7533 - val_loss: 1.3552
Epoch 132/350
[1m 7/10[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m0s[0m 10ms/step - accuracy: 0.8750 - loss: 0.2856
Epoch 132: val_accuracy did not improve from 0.79094
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8729 - loss: 0.2857 - val_accuracy: 0.7729 - val_loss: 1.2683
Epoch 133/350
[1m 7/10[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m0s[0m 10ms/step - accuracy: 0.8729 - loss: 0.2888
Epoch 133: val_accuracy did not improve from 0.79094
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step - accuracy: 0.8727 - loss: 0.2881 - val_accuracy: 0.7626 - val_loss: 1.3037
Epoch 134/350
[1m 7/10[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m0s[0m 10ms/step - accuracy: 0.8785 - loss: 0.2774
Epoch 134: va