# Data Preprocessing & Feature Engineering

In [None]:
import pandas as pd
import numpy as np

file_path = None # input file_path of Kaggle database-for-emotion-recognition-system dataset
# https://www.kaggle.com/datasets/sigfest/database-for-emotion-recognition-system-gameemo

gameemo_df = pd.read_csv(file_path)
processed_df = pd.DataFrame()

processed_df['TP9'] = (gameemo_df['T7'] + gameemo_df['P7']) / 2
processed_df['TP10'] = (gameemo_df['T8'] + gameemo_df['P8']) / 2
processed_df['AF7'] = (gameemo_df['AF3'] + gameemo_df['F7'] + gameemo_df['FC5'] + gameemo_df['F3']) / 4
processed_df['AF8'] = (gameemo_df['AF4'] + gameemo_df['F8'] + gameemo_df['FC6'] + gameemo_df['F4']) / 4

In [None]:
from scipy.stats import skew, kurtosis

# the most basic training data that includes features from full windows after processing
# with batch size input
def generate_summary_stats_training_data(df, batch_size):
    # df : unprocessed dataframe (No batch processing)
    # batch_size : indicate processing size (# of seconds)
    # training columns without creating covariance matrices
    train_cols = ['mean_tp9', 'mean_tp10', 'mean_af7', 'mean_af8',
                     'std_tp9', 'std_tp10', 'std_af7', 'std_af8',
                     'vari_tp9', 'vari_tp10', 'vari_af7', 'vari_af8',
                     'max_tp9', 'max_tp10', 'max_af7', 'max_af8',
                     'min_tp9', 'min_tp10', 'min_af7', 'min_af8',
                     'skew_tp9', 'skew_tp10', 'skew_af7', 'skew_af8',
                     'kurt_tp9', 'kurt_tp10', 'kurt_af7', 'kurt_af8']
    train_cols_dict = dict(zip(train_cols, [[] for i in range(len(train_cols))]))



    for start in range(0, df.shape[0], batch_size):
        end = start + batch_size
        if end > len(df):
            end = len(df)
        batch = df[start:end]

        # add mean summary stats on all 4 channels
        train_cols_dict['mean_tp9'].append(batch['TP9'].mean())
        train_cols_dict['mean_tp10'].append(batch['TP10'].mean())
        train_cols_dict['mean_af7'].append(batch['AF7'].mean())
        train_cols_dict['mean_af8'].append(batch['AF8'].mean())

        # add std summary stats on all 4 channels
        train_cols_dict['std_tp9'].append(np.std(batch['TP9'], ddof=1))
        train_cols_dict['std_tp10'].append(np.std(batch['TP10'], ddof=1))
        train_cols_dict['std_af7'].append(np.std(batch['AF7'], ddof=1))
        train_cols_dict['std_af8'].append(np.std(batch['AF8'], ddof=1))

        # add variance
        train_cols_dict['vari_tp9'].append(np.var(batch['TP9'], ddof=1))
        train_cols_dict['vari_tp10'].append(np.var(batch['TP10'], ddof=1))
        train_cols_dict['vari_af7'].append(np.var(batch['AF7'], ddof=1))
        train_cols_dict['vari_af8'].append(np.var(batch['AF8'], ddof=1))

        # add min
        train_cols_dict['min_tp9'].append(batch['TP9'].min())
        train_cols_dict['min_tp10'].append(batch['TP10'].min())
        train_cols_dict['min_af7'].append(batch['AF7'].min())
        train_cols_dict['min_af8'].append(batch['AF8'].min())

        # add max
        train_cols_dict['max_tp9'].append(batch['TP9'].max())
        train_cols_dict['max_tp10'].append(batch['TP10'].max())
        train_cols_dict['max_af7'].append(batch['AF7'].max())
        train_cols_dict['max_af8'].append(batch['AF8'].max())

        # add skew
        train_cols_dict['skew_tp9'].append(skew(np.array(batch['TP9'])))
        train_cols_dict['skew_tp10'].append(skew(np.array(batch['TP10'])))
        train_cols_dict['skew_af7'].append(skew(np.array(batch['AF7'])))
        train_cols_dict['skew_af8'].append(skew(np.array(batch['AF8'])))

        # add kurtosis
        train_cols_dict['kurt_tp9'].append(kurtosis(np.array(batch['TP9'])))
        train_cols_dict['kurt_tp10'].append(kurtosis(np.array(batch['TP10'])))
        train_cols_dict['kurt_af7'].append(kurtosis(np.array(batch['AF7'])))
        train_cols_dict['kurt_af8'].append(kurtosis(np.array(batch['AF8'])))

    output_training_df = pd.DataFrame(train_cols_dict)
    return output_training_df

# Feature Selection using 5-Fold Cross Validation

In [None]:
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.preprocessing import StandardScaler
from lightgbm import LGBMRegressor

def feature_selection(df):
  # df : processed sliding window training data and corresponding output labels

  k_range = range(1, 29) # num of features = 28

  # Initialize variables to keep track of the best feature set and its RMSE
  best_feature_set = None
  best_rmse = float('inf')

  # create train test split
  pred = 'Boring'
  data = df.drop(columns=['Valence', 'Boring', 'Arousal', 'Funny', 'Calm', 'Satisfaction', 'Horrible'])
  label = df[pred]
  X_train, X_test, y_train, y_test = train_test_split(data, label, test_size=0.2, random_state=42)

  # normalize/ sclae training data
  scaler = StandardScaler()
  scaler.fit(X_train)
  X_train_scaled = scaler.transform(X_train)


  # Iterate over different values of k for feature selection
  for k in k_range:

      # Perform feature selection
      selector = SelectKBest(score_func=f_regression, k=k)
      X_train_selected = selector.fit_transform(X_train_scaled, y_train)

      # Initialize LGBMRegressor
      regressor = LGBMRegressor()

      # Evaluate accuracy using cross-validation
      cv_scores = cross_val_score(regressor, X_train_selected, y_train, cv=5, scoring='neg_mean_squared_error')
      rmse_scores = np.sqrt(-cv_scores)
      avg_rmse = np.mean(rmse_scores)
      print("current r MSE: ", avg_rmse)
      print("number of features", k)

      # Update the best feature set and its RMSE if necessary
      if avg_rmse < best_rmse:
          best_rmse = avg_rmse
          best_feature_set = selector.get_support(indices=True)

  print("best feature set: ", best_feature_set)
  print("best r MSE: ", best_rmse)

# Perform Grid Search for Hyperparameter Optimization

In [None]:
from sklearn.model_selection import GridSearchCV
import lightgbm as lgb
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import mean_squared_error

def optimize_hyperparameters(df):
  # df : processed sliding window training data and corresponding output labels


  # create train test split
  pred = 'Arousal'
  data = df.drop(columns=['Valence', 'Boring', 'Arousal', 'Funny', 'Calm', 'Satisfaction', 'Horrible'])
  print("feature columns: ", data.columns)
  label = df[pred]
  X_train, X_test, y_train, y_test = train_test_split(data, label, test_size=0.2, random_state=42)

  # normalize/ sclae training data
  scaler = StandardScaler()
  scaler.fit(X_train)
  X_train_scaled = scaler.transform(X_train)

  # Define the parameter grid
  param_grid = {
      'num_leaves': [20, 30, 40],
      'learning_rate': [0.05, 0.1, 0.2],
      'n_estimators': [50, 100, 200]
  }

  # Initialize the LGBMRegressor
  lgbm_regressor = LGBMRegressor()

  # Initialize GridSearchCV
  grid_search = GridSearchCV(lgbm_regressor, param_grid, cv=5, scoring='neg_mean_squared_error')

  # Perform grid search
  grid_search.fit(X_train_scaled, y_train)

  # Get the best hyperparameters
  best_params = grid_search.best_params_
  print("Best Hyperparameters:", best_params)

# Run lightGBM Model Using Selected Features and Optimized Hyperparameters

In [None]:
from sklearn.multioutput import MultiOutputRegressor

def run_lgbm_regre_model(df):
    # df : processed dataframe by batch size. Features include full time window summary stats, etc.

    pred = ['Valence', 'Boring', 'Arousal']

    data = df.drop(columns=['Valence', 'Boring', 'Arousal', 'Funny', 'Calm', 'Satisfaction', 'Horrible'])
    label = df[pred]
    X_train, X_test, y_train, y_test = train_test_split(data, label, test_size=0.2, random_state=42)
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train_scaled = scaler.transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # define best hyperparameters as result of GridSearch
    hyperparameters = {
    'learning_rate': 0.1,
    'n_estimators': 200,
    'num_leaves': 30
    }
    clf = MultiOutputRegressor(LGBMRegressor(** hyperparameters)).fit(X_train_scaled, y_train)
    pred = clf.predict(X_test_scaled)
    print(pred)
    return clf