In [5]:
# General imports
import os
import sys
import pandas as pd
import numpy as np
import tensorflow as tf
import gc
import optuna
import uuid
 
# Sklearn imports
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    confusion_matrix, roc_auc_score, roc_curve
)
 
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import random
 
# Keras imports
from keras.utils import to_categorical
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
 
# Tensorflow Imports
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional, Input, BatchNormalization, Attention, LayerNormalization, Lambda, Multiply, Conv1D, MaxPooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.regularizers import l2, l1
from tensorflow.keras import backend as K  # Importando o backend
 
 
# Import library with current code functions
sys.path.append(fr"C:\Users\joao.miranda\Documents\POC\lib\neural_network_functions.py")
import neural_network_functions as neural_net_fun

ImportError: cannot import name 'to_categorical' from 'keras.utils' (c:\Users\joao.miranda\AppData\Local\miniconda3\envs\bio-signals-dataset_env\lib\site-packages\keras\utils\__init__.py)

In [None]:


# Load and clean the dataset
def load_and_clean_dataset(dataset_path):
    SEQUENCES = pd.read_csv(dataset_path)

    if 'Unnamed: 0' in SEQUENCES.columns:
        SEQUENCES.drop(columns=['Unnamed: 0'], inplace=True)
    return SEQUENCES

# Filter emotions based on a query
def filter_emotions(SEQUENCES, emotions_query):
    return SEQUENCES.query(emotions_query)

# Encode labels into numerical values
def encode_labels(SEQUENCES_ENCODED):
    encoder = LabelEncoder()
    SEQUENCES_ENCODED['label_numerical'] = encoder.fit_transform(SEQUENCES_ENCODED['label'])
    label_mapping = dict(zip(encoder.classes_, encoder.transform(encoder.classes_)))
    print("Label Mapping:", label_mapping)
    return SEQUENCES_ENCODED, label_mapping

# Select features
def select_features(SEQUENCES_ENCODED):
    features = SEQUENCES_ENCODED.columns.to_list()[4:26]
    print(f"Features being used: {features}")
    n_features = len(features)
    len_sample_max = SEQUENCES_ENCODED.sample_id.value_counts().max()
    return features, n_features, len_sample_max

# Pad sequences
def pad_sequences(SEQUENCES_ENCODED, features, len_sample_max):
    grouped_data = []
    for sample_id, group in SEQUENCES_ENCODED.groupby('sample_id'):
        sequence_features = group[features]
        if len(sequence_features) < len_sample_max:
            N_rows = len_sample_max - len(sequence_features)
            pad = pd.DataFrame(np.zeros((N_rows, len(features))), columns=sequence_features.columns)
            sequence_features_pad = pd.concat([pad, sequence_features], ignore_index=True)
        else:
            sequence_features_pad = sequence_features
        label = SEQUENCES_ENCODED[SEQUENCES_ENCODED.sample_id == sample_id].iloc[0].label_numerical
        grouped_data.append((sequence_features_pad, label))

    X = np.array([item[0] for item in grouped_data])
    Y = np.array([item[1] for item in grouped_data])
    return X, Y

# Normalize data (using external function)
def normalize_data(X_balanced):
    return neural_net_fun.normalize_data(X_balanced)

# Split data into train, validation, and test sets
def split_data(X, Y, train_size=0.7, val_size=0.15, test_size=0.15):
    X_train_val, X_test, Y_train_val, Y_test = train_test_split(X, Y, test_size=test_size, stratify=Y, random_state=42, shuffle=True)
    val_relative_size = val_size / (train_size + val_size)
    X_train, X_val, Y_train, Y_val = train_test_split(X_train_val, Y_train_val, test_size=val_relative_size, stratify=Y_train_val, random_state=42, shuffle=True)

    Y_train = Y_train.reshape(-1, 1)
    Y_val = Y_val.reshape(-1, 1)
    Y_test = Y_test.reshape(-1, 1)

    return X_train, X_val, X_test, Y_train, Y_val, Y_test

In [None]:
#Main preprocessing function
def preprocess_data_current_dataset(dataset_path, emotions, emotions_augmentation_factors=None, train_size=0.7, val_size=0.15, test_size=0.15):
    SEQUENCES = load_and_clean_dataset(dataset_path)
    SEQUENCES = filter_emotions(SEQUENCES, emotions)

    SEQUENCES_ENCODED, label_mapping = encode_labels(SEQUENCES)
    features, n_features, len_sample_max = select_features(SEQUENCES_ENCODED)
    timesteps = len_sample_max
    n_classes = len(label_mapping)

    # Perform augmentation
    if emotions_augmentation_factors:
        SEQUENCES_ENCODED_BEFORE = SEQUENCES_ENCODED.copy()  # Save original for comparison
        SEQUENCES_ENCODED = perform_augmentation(SEQUENCES_ENCODED, features, emotions_augmentation_factors)

    # Padding happens after augmentation
    X, Y = pad_sequences(SEQUENCES_ENCODED, features, len_sample_max)
    
    X_complete = normalize_data(X)

    X_train, X_val, X_test, Y_train, Y_val, Y_test = split_data(X_complete, Y, train_size, val_size, test_size)

    return {
        'label_mapping': label_mapping,
        'timesteps': timesteps,
        'n_classes': n_classes,
        'n_features': n_features,
        'X_complete': X_complete,
        'Y_complete': Y,
        'X_train': X_train,
        'X_val': X_val,
        'X_test': X_test,
        'Y_train': Y_train,
        'Y_val': Y_val,
        'Y_test': Y_test,
        'SEQUENCES_BEFORE': SEQUENCES_ENCODED_BEFORE,  # Data before augmentation
        'SEQUENCES_AFTER': SEQUENCES_ENCODED           # Data after augmentation
    }
