In [None]:
# Importing required libraries and frameworks
import pandas as pd
import numpy as np
import glob
import warnings
import re
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import GroupKFold,train_test_split
from sklearn.model_selection import GroupShuffleSplit
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.impute import KNNImputer
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import mean_squared_error
warnings.filterwarnings('ignore')

In [None]:
# function to extract the participant's number from the file's name
def participant_number_extraction(string):
    #Regular expression pattern for matching the format 'participant_x'
    pattern = r'participant_(\d+)'
    # Matching the pattern with the filename
    match = re.search(pattern, string)
    # Extracting the participant number and convert it to an integer when match is found
    if match:
        return int(match.group(1))
    # If no match was found, return None
    else:
        return None

In [None]:
def get_first_non_null_row(df, col_name):
    # Getting a Series of boolean values indicating whether each row has a non-null value in the specified column
    non_null_mask = df[col_name].notnull()
    # Finding the index of the first True value in the Series 
    first_non_null_idx = non_null_mask.idxmax()
    return first_non_null_idx

In [None]:
def feature_selection(df):
    col = 'Pupil diameter left'
    #Finding the 1st row with non null value on pupil diameter left column
    starting_index_left = get_first_non_null_row(df,col)
    col = 'Pupil diameter right'
    #Finding the 1st row with non null value on pupil diameter right column
    starting_index_right = get_first_non_null_row(df,col)
    #Comparing both to find the 1st measured value of pupil diameter
    if starting_index_left < starting_index_right :
        selected_df = df.iloc[starting_index_left::3] # Incrementing 3 rows each time as pupil diameter is measered on 40HQs frequency
    else :
        selected_df = df.iloc[starting_index_right::3]
    return selected_df

In [None]:
def unique_val_cols(df):
    unique_values = []
    for column in df.columns :
        if column != 'Participant name' and column != 'Recording timestamp':
# If columns have unique values more than 1, it means that columns have variance
            if df[column].nunique() <= 1:
                unique_values.append(column)
    df = df.drop(columns = unique_values) # Removing columns with zero variance
    return df

In [None]:
# Function for removal of columns that do not affect the target and hence will not be used in course of the model training
def unwanted_columns_removal(df, unwanted_cols):
    df = df.drop(columns = unwanted_cols, axis =1)
    return df

In [None]:
# Function for type casting object data points into numerical data points
def numeric_conversion(df):
    df = df.replace(',','.',regex = True)
    for col in df.columns:
        if not pd.api.types.is_numeric_dtype(df[col]):
            df[col] = pd.to_numeric(df[col], errors='ignore')
    return df

In [None]:
# Function to remove duplicates from the dataset considering the Eyetracker Time Stamp Column
def duplicate_removal(df):
    df = df.drop_duplicates(subset = 'Recording timestamp')
    return df

In [None]:
# Function for mapping empathy scores which is our target variable to participant number
def empathy_score(df,df_question):
    empathy_scores= {} # Empathy score dictonary
    for index, row in df_question.iterrows():
        empathy_scores[int(row['Participant nr'])] = row['Total Score extended'] # Adding empathy to the dictornary as value and participant number as the key
    df['Empathy Score'] = 0
    df['Empathy Score'] = df['Participant name'].apply(lambda x: empathy_scores.get(x, 0)) # Adding Empathy Score to the dataframe based on the participant number
    return df, empathy_scores


In [None]:
# funtion for data imputation using the K-Nearest Neighbors (KNN) imputation technique
def imputation(df):
    imputer = KNNImputer(n_neighbors=5)
    imputed_df = imputer.fit_transform(df)
    imputed_df = pd.DataFrame(imputed_df, columns=df.columns)
    return imputed_df

In [None]:
# Function for processing a DataFrame by selecting specific columns, calculating mode values for those columns, 
# and then creating a summary DataFrame containing these mode values and additional 
def moddata(df):
    selected_columns = [
        'Gaze point X', 'Gaze point Y',
        'Pupil diameter left', 'Pupil diameter right',
        'Eye position left X (DACSmm)', 'Eye position left Y (DACSmm)',
        'Eye position left Z (DACSmm)', 'Eye position right X (DACSmm)',
        'Eye position right Y (DACSmm)', 'Eye position right Z (DACSmm)',
        'Gaze event duration', 'Empathy Score'
    ]
    
    try:
        mode_values = df[selected_columns].mode().iloc[0]
    except KeyError as e:
        missing_col = str(e).strip('\'[]\'')
        print(f"Column {missing_col} not found in the dataframe")
        return None
    
    mode_df = pd.DataFrame([mode_values], columns=selected_columns)
    mode_df['Participant name'] = df['Participant name'].iloc[0]
    mode_df['Avg Gaze event duration'] = df['Gaze event duration'].mean()
    mode_df['Total Gaze event duration'] = df['Gaze event duration'].sum()
    
    return mode_df


In [None]:
def drop_correlation(df):
    cor_matrix = df.corr()
    plt.subplots(figsize = (42,42))
    plt.title('Pearson Corealation Matrix')
    sns.heatmap(cor_matrix,vmax = 0.13,annot=True)
    cor_col = set()
    for i in range(len(cor_matrix.columns)):
        for j in range(i):
            if abs(cor_matrix.iloc[i,j]) > .7 :
                col_n = cor_matrix.columns[i]
                cor_col.add(col_n)
    print('Columns with Corelation are - ', cor_col)
    df  = df.drop(columns = cor_col, axis =1)
    return df

In [None]:
def min_max_scaler(df):
    scaler = MinMaxScaler()
    scaler.fit(df)
    X_scaled = pd.DataFrame(scaler.transform(df),columns=df.columns)
    return X_scaled

In [None]:
def feature_seperation(df):
    y_df = df['Empathy Score']
    x_df = df.drop(['Empathy Score'], axis= 1)
    return x_df,y_df

In [None]:
def cross_validation(x_df, y_df, var):
    groups = x_df['Participant name'].tolist()
    n_splits = 10
    gkf = GroupKFold(n_splits=n_splits)
    model = RandomForestRegressor(n_estimators=10, random_state=13)
    scores = []

    for train_index, test_index in gkf.split(x_df, y_df, groups=groups):
        x_train, x_test = x_df.drop(['Participant name'], axis=1).iloc[train_index], x_df.drop(['Participant name'], axis=1).iloc[test_index]
        y_train, y_test = y_df.iloc[train_index], y_df.iloc[test_index]
        model.fit(x_train, y_train)
        y_pred = model.predict(x_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = 1 - (mse / var)
        scores.append(r2)
        print("R2 score - Cross: {:.3f}".format(r2))
        print("MSE - Cross: {:.3f}".format(mse))
        print("Var: {:.3f}".format(var))

    fig, ax = plt.subplots()
    ax.plot(range(1, n_splits + 1), scores, marker='*')
    ax.set_xlabel('Sample')
    ax.set_ylabel('R-Squared')
    ax.set_title('Cross-Validation Scores')
    plt.show()

# Visualizing the results from the cross-validation
cross_validation(x_df, y_df, var)

In [None]:
def train_test_split(x_df, y_df, test_size, random_state):
    groups = x_df['Participant name'].tolist()
    gss = GroupShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
    
    train_idx, test_idx = next(gss.split(x_df, y_df, groups=groups))
    
    x_train = x_df.drop('Participant name', axis=1).iloc[train_idx]
    x_test = x_df.drop('Participant name', axis=1).iloc[test_idx]
    
    y_train = y_df.iloc[train_idx]
    y_test = y_df.iloc[test_idx]
    
    return x_train, x_test, y_train, y_test

# Usage
test_size = 0.2
random_state = 45
x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size, random_state)


def model_training (x_train, x_test, y_train, y_test, var):
    model = RandomForestRegressor(n_estimators=10, random_state=13)
    model.fit(x_train, y_train)
    
    y_pred = model.predict(x_test)
    mse = mean_squared_error(y_test, y_pred)
    r2 = 1 - (mse / var)
    
    print("R2 score: {:.3f}".format(r2))
    print("MSE: {:.3f}".format(mse))
    
    importances = model.feature_importances_
    indices = np.argsort(importances)[::-1]

    print("Features in order of priority:")
    for f in range(x_train.shape[1]):
        print("%d. feature %s (%f)" % (f + 1, x_train.columns[indices[f]], importances[indices[f]]))

    plt.figure(figsize=(10, 5))
    plt.title("Feature importances")
    plt.bar(range(x_train.shape[1]), importances[indices], color="r", align="center")
    plt.xticks(range(x_train.shape[1]), x_train.columns[indices], rotation='vertical')
    plt.xlim([-1, x_train.shape[1]])
    plt.show()

# Usage
model_training(x_train, x_test, y_train, y_test, var)


In [None]:
# Function to train a neural network model and evaluate its performance
def model_training(x_train, x_test, y_train, y_test, var):
    # Create a sequential neural network model
    nn_model = Sequential()
    nn_model.add(Dense(64, activation='relu', input_shape=(x_train.shape[1],)))
    nn_model.add(Dense(32, activation='relu'))
    nn_model.add(Dense(1))

    # Compile the model with the Adam optimizer and mean squared error loss
    nn_model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

    # Train the neural network model using training data
    nn_model.fit(x_train, y_train, epochs=100, batch_size=32, verbose=0)

    # Make predictions using the trained model
    y_pred_nn = nn_model.predict(x_test)

    # Create a deep neural network model
    dnn_model = Sequential()
    dnn_model.add(Dense(128, activation='relu', input_shape=(x_train.shape[1],)))
    dnn_model.add(Dense(64, activation='relu'))
    dnn_model.add(Dense(32, activation='relu'))
    dnn_model.add(Dense(1))

    # Compile the model with the Adam optimizer and mean squared error loss
    dnn_model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')

    # Train the deep neural network model using training data
    dnn_model.fit(x_train, y_train, epochs=150, batch_size=64, verbose=0)

    # Make predictions using the trained DNN model
    y_pred_dnn = dnn_model.predict(x_test)

    # Calculate mean squared error and R-squared score for NN and DNN
    mse_nn = mean_squared_error(y_test, y_pred_nn)
    r2_nn = 1 - (mse_nn / var)

    mse_dnn = mean_squared_error(y_test, y_pred_dnn)
    r2_dnn = 1 - (mse_dnn / var)

    # Print evaluation metrics for both NN and DNN
    print("Neural Network R2 score: {:.3f}".format(r2_nn))
    print("Neural Network MSE: {:.3f}".format(mse_nn))

    print("Deep Neural Network R2 score: {:.3f}".format(r2_dnn))
    print("Deep Neural Network MSE: {:.3f}".format(mse_dnn))

    # Visualize the predictions against the actual values for NN and DNN
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 2, 1)
    plt.scatter(y_test, y_pred_nn, alpha=0.5)
    plt.title("Neural Network: Actual vs. Predicted Empathy Scores")
    plt.xlabel("Actual Empathy Scores")
    plt.ylabel("Predicted Empathy Scores")

    plt.subplot(1, 2, 2)
    plt.scatter(y_test, y_pred_dnn, alpha=0.5)
    plt.title("Deep Neural Network: Actual vs. Predicted Empathy Scores")
    plt.xlabel("Actual Empathy Scores")
    plt.ylabel("Predicted Empathy Scores")

    plt.tight_layout()
    plt.show()

# Usage
model_training(x_train, x_test, y_train, y_test, var)


In [None]:
from sklearn.linear_model import SGDRegressor, LinearRegression
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
from sklearn.neural_network import MLPRegressor
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt

def model_training(x_train, x_test, y_train, y_test, var):
    # SGDRegressor
    sgd_model = SGDRegressor(random_state=13)
    sgd_model.fit(x_train, y_train)
    y_pred_sgd = sgd_model.predict(x_test)
    mse_sgd = mean_squared_error(y_test, y_pred_sgd)
    r2_sgd = 1 - (mse_sgd / var)
    print("SGDRegressor:")
    print("R2 score: {:.3f}".format(r2_sgd))
    print("MSE: {:.3f}".format(mse_sgd))

    # Linear Regression
    lr_model = LinearRegression()
    lr_model.fit(x_train, y_train)
    y_pred_lr = lr_model.predict(x_test)
    mse_lr = mean_squared_error(y_test, y_pred_lr)
    r2_lr = 1 - (mse_lr / var)
    print("\nLinear Regression:")
    print("R2 score: {:.3f}".format(r2_lr))
    print("MSE: {:.3f}".format(mse_lr))

    # XGBoost Regressor
    xgb_model = XGBRegressor(random_state=13)
    xgb_model.fit(x_train, y_train)
    y_pred_xgb = xgb_model.predict(x_test)
    mse_xgb = mean_squared_error(y_test, y_pred_xgb)
    r2_xgb = 1 - (mse_xgb / var)
    print("\nXGBoost Regressor:")
    print("R2 score: {:.3f}".format(r2_xgb))
    print("MSE: {:.3f}".format(mse_xgb))

    # Neural Networks (Deep Neural Networks)
    nn_model = keras.Sequential([
        keras.layers.Dense(64, activation='relu', input_shape=(x_train.shape[1],)),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(1)
    ])
    nn_model.compile(optimizer='adam', loss='mean_squared_error')
    nn_model.fit(x_train, y_train, epochs=10, batch_size=32, verbose=0)
    y_pred_nn = nn_model.predict(x_test).flatten()
    mse_nn = mean_squared_error(y_test, y_pred_nn)
    r2_nn = 1 - (mse_nn / var)
    print("\nNeural Networks (Deep Neural Networks):")
    print("R2 score: {:.3f}".format(r2_nn))
    print("MSE: {:.3f}".format(mse_nn))

    # Feature importances (for SGDRegressor)
    if hasattr(sgd_model, 'coef_'):
        importances = np.abs(sgd_model.coef_)
        indices = np.argsort(importances)[::-1]
        print("\nFeatures in order of priority (SGDRegressor):")
        for f in range(x_train.shape[1]):
            print("%d. feature %s (%f)" % (f + 1, x_train.columns[indices[f]], importances[indices[f]]))
        plt.figure(figsize=(10, 5))
        plt.title("Feature importances (SGDRegressor)")
        plt.bar(range(x_train.shape[1]), importances[indices], color="r", align="center")
        plt.xticks(range(x_train.shape[1]), x_train.columns[indices], rotation='vertical')
        plt.xlim([-1, x_train.shape[1]])
        plt.show()

# Usage
model_training(x_train, x_test, y_train, y_test, var)


In [None]:
from sklearn.linear_model import SGDRegressor, LinearRegression
from xgboost import XGBRegressor
from sklearn.metrics import mean_squared_error
from sklearn.neural_network import MLPRegressor
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt

def model_training(x_train, x_test, y_train, y_test, var):
    # SGDRegressor
    sgd_model = SGDRegressor(random_state=13)
    sgd_model.fit(x_train, y_train)
    y_pred_sgd = sgd_model.predict(x_test)
    mse_sgd = mean_squared_error(y_test, y_pred_sgd)
    r2_sgd = 1 - (mse_sgd / var)
    print("SGDRegressor:")
    print("R2 score: {:.3f}".format(r2_sgd))
    print("MSE: {:.3f}".format(mse_sgd))

    # Linear Regression
    lr_model = LinearRegression()
    lr_model.fit(x_train, y_train)
    y_pred_lr = lr_model.predict(x_test)
    mse_lr = mean_squared_error(y_test, y_pred_lr)
    r2_lr = 1 - (mse_lr / var)
    print("\nLinear Regression:")
    print("R2 score: {:.3f}".format(r2_lr))
    print("MSE: {:.3f}".format(mse_lr))

    # XGBoost Regressor
    xgb_model = XGBRegressor(random_state=13)
    xgb_model.fit(x_train, y_train)
    y_pred_xgb = xgb_model.predict(x_test)
    mse_xgb = mean_squared_error(y_test, y_pred_xgb)
    r2_xgb = 1 - (mse_xgb / var)
    print("\nXGBoost Regressor:")
    print("R2 score: {:.3f}".format(r2_xgb))
    print("MSE: {:.3f}".format(mse_xgb))

    # Neural Networks (Deep Neural Networks)
    nn_model = keras.Sequential([
        keras.layers.Dense(64, activation='relu', input_shape=(x_train.shape[1],)),
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dense(1)
    ])
    nn_model.compile(optimizer='adam', loss='mean_squared_error')
    nn_model.fit(x_train, y_train, epochs=10, batch_size=32, verbose=0)
    y_pred_nn = nn_model.predict(x_test).flatten()
    mse_nn = mean_squared_error(y_test, y_pred_nn)
    r2_nn = 1 - (mse_nn / var)
    print("\nNeural Networks (Deep Neural Networks):")
    print("R2 score: {:.3f}".format(r2_nn))
    print("MSE: {:.3f}".format(mse_nn))

    # Feature importances (for SGDRegressor)
    if hasattr(sgd_model, 'coef_'):
        importances = np.abs(sgd_model.coef_)
        indices = np.argsort(importances)[::-1]
        print("\nFeatures in order of priority (SGDRegressor):")
        for f in range(x_train.shape[1]):
            print("%d. feature %s (%f)" % (f + 1, x_train.columns[indices[f]], importances[indices[f]]))
        plt.figure(figsize=(10, 5))
        plt.title("Feature importances (SGDRegressor)")
        plt.bar(range(x_train.shape[1]), importances[indices], color="r", align="center")
        plt.xticks(range(x_train.shape[1]), x_train.columns[indices], rotation='vertical')
        plt.xlim([-1, x_train.shape[1]])
        plt.show()

# Usage
model_training(x_train, x_test, y_train, y_test, var)


##READING THE DATA AND SUBJECTING IT TO THE FUNTIONS FOR TRAINING

In [None]:
path = r"C:\Users\SHURI\Desktop\Final Emapthy\EyeT\EyeT_group_dataset_III_image_name_letter_card_participant_**_trial_*.csv"
df_question =pd.read_csv(r"C:\Users\SHURI\Desktop\Final Emapthy\Questionnaire_datasetIB.csv", encoding = 'ISO-8859-1')
filename = glob.glob(path)
df = pd.DataFrame()
for file in filename:
    df_pre = pd.read_csv(file)
    participant_number = extract_participant_number(file)
    df_pre['Participant name'] = participant_number
    selected_df = feature_selection(df_pre)
    selected_df = numeric_conversion(selected_df)
    selected_df = duplicate_removal(selected_df)
    unwanted_cols = ['Eye movement type','Computer timestamp','Eye movement type index','Unnamed: 0','Sensor',
                     'Event','Event value','Validity left','Validity right','Presented Stimulus name',
                     'Presented Media width','Presented Media name','Presented Media height','Presented Media position X (DACSpx)',
                     'Presented Media position Y (DACSpx)','Original Media width','Original Media height','Mouse position X','Mouse position Y']
    selected_df = unwanted_columns_removal(selected_df, unwanted_cols)
    selected_df = unique_val_cols(selected_df)
    selected_df,empathy_scores = empathy_score(selected_df,df_question)
    selected_df = imputation(selected_df)
    selected_df = moddata(selected_df)
    df = pd.concat([df,selected_df])

df = min_max_scaler(df)
x_df, y_df = feature_seperation(df)
var = np.var(y_df)
x_df = drop_correlation(x_df)
cross_validation(x_df,y_df,var)
x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=85)
model = model_training(x_train, x_test, y_train, y_test,var)


In [None]:
path = r"C:\Users\SHURI\Desktop\Final Emapthy\EyeT\EyeT_group_dataset_II_image_name_grey_blue_participant_**_trial_*.csv"
df_question =pd.read_csv(r"C:\Users\SHURI\Desktop\Final Emapthy\Questionnaire_datasetIB.csv", encoding = 'ISO-8859-1')
filename = glob.glob(path)
df = pd.DataFrame()
for file in filename:
    df_pre = pd.read_csv(file)
    participant_number = participant_number_extraction(file)
    df_pre['Participant name'] = participant_number
    selected_df = feature_selection(df_pre)
    selected_df = numeric_conversion(selected_df)
    selected_df = duplicate_removal(selected_df)
    unwanted_cols = ['Eye movement type','Computer timestamp','Eye movement type index','Unnamed: 0','Sensor',
                     'Event','Event value','Validity left','Validity right','Presented Stimulus name','Presented Media width',
                     'Presented Media name','Presented Media height','Presented Media position X (DACSpx)',
                     'Presented Media position Y (DACSpx)','Original Media width','Original Media height']
    selected_df = unwanted_columns_removal(selected_df, unwanted_cols)
    selected_df = unique_val_cols(selected_df)
    selected_df,empathy_scores = empathy_score(selected_df,df_question)
    selected_df = imputation(selected_df)
    selected_df = moddata(selected_df)
    df = pd.concat([df,selected_df])
    
df = min_max_scaler(df)
x_df, y_df = feature_seperation(df)
var = variance = np.var(y_df)
x_df = drop_correlation(x_df)
cross_validation(x_df,y_df,var)
x_train, x_test, y_train, y_test = train_test_split(x_df, y_df, test_size=0.2, random_state=45)
model_training(x_train, x_test, y_train, y_test,var)