In [None]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import tensorflow as tf
import heartpy as hp
import pandas as pd
import numpy as np 
import pywt as pw
import openpyxl
import pickle
import tsfel
import json

# Functions

In [None]:
num_of_participants = 23
num_of_clips = 18

#In Hz:
cutoff = [0.5, 45]
sample_rate = 256.0
order = 3
filtertype ='bandpass'
def data_filter (data):
    # Separate data by channel
    for participant in data.keys():
        #print("Starting " + participant)
        for clip_num in range(0, num_of_clips):
            # Filter both channels from baseline
            CH1 = [row[0] for row in data[participant]['ECG']['baseline'][clip_num]]
            CH1_filtered = hp.filter_signal(CH1, cutoff = cutoff, sample_rate = sample_rate, order = order, filtertype=filtertype)
            # CH1_filtered = hp.butter_lowpass_filter(data, cutoff=cutoff, sample_rate=sample_rate, order=order)

            CH2 = [row[1] for row in data[participant]['ECG']['baseline'][clip_num]]
            CH2_filtered = hp.filter_signal(CH2, cutoff = cutoff, sample_rate = sample_rate, order = order, filtertype=filtertype)
            # CH2_filtered = hp.butter_lowpass_filter(data, cutoff=cutoff, sample_rate=sample_rate, order=order)

            s_clip = "clip " + str(clip_num+1) 

            data[participant]['ECG'][s_clip] = {}
            data[participant]['ECG'][s_clip]['CH1'] = {}
            data[participant]['ECG'][s_clip]['CH1']['baseline'] = CH1_filtered.tolist()
            data[participant]['ECG'][s_clip]['CH2'] = {}
            data[participant]['ECG'][s_clip]['CH2']['baseline'] = CH2_filtered.tolist()

            
            # Filter both channels from stimuli
            CH1 = [row[0] for row in data[participant]['ECG']['stimuli'][clip_num]]
            CH1_filtered = hp.filter_signal(CH1, cutoff = cutoff, sample_rate = sample_rate, order = order, filtertype=filtertype)
            # CH1_filtered = hp.butter_lowpass_filter(data, cutoff=cutoff, sample_rate=sample_rate, order=order)

            CH2 = [row[1] for row in data[participant]['ECG']['stimuli'][clip_num]]
            CH2_filtered = hp.filter_signal(CH2, cutoff = cutoff, sample_rate = sample_rate, order = order, filtertype=filtertype)
            # CH2_filtered = hp.butter_lowpass_filter(data, cutoff=cutoff, sample_rate=sample_rate, order=order)

            data[participant]['ECG'][s_clip]['CH1']['stimuli'] = CH1_filtered.tolist()
            data[participant]['ECG'][s_clip]['CH2']['stimuli'] = CH2_filtered.tolist()

            data[participant]['ECG'][s_clip]['ScoreValence'] = data[participant]['ScoreValence'][clip_num]
            data[participant]['ECG'][s_clip]['ScoreArousal'] = data[participant]['ScoreArousal'][clip_num]
            data[participant]['ECG'][s_clip]['ScoreDominance'] = data[participant]['ScoreDominance'][clip_num]
            
            print("Done filtering raw for " + participant + " at clip#" + str(clip_num+1) , end="\r", flush=True)
        data[participant]['ECG'].pop('stimuli')
        data[participant]['ECG'].pop('baseline')

        data[participant].pop('ScoreValence')
        data[participant].pop('ScoreArousal')
        data[participant].pop('ScoreDominance')
    
    print("All done.... Highpass filter applied, cutoff: [" + str(cutoff[0]) + "," + str(cutoff[1]) + "]Hz")
    return data
            
            
### Function to display graph
def show_graph(participant, clip, dtype, range_min = 0, range_max = -1):
    plt.plot(filtered_data[participant]['ECG'][dtype][clip][range_min:range_max])
    plt.ylabel("mV")
    plt.xlabel("Sample #")
    graph_title = participant + "  clip#" + str(clip) + " " + dtype + " data range: " + str(range_min) + " => " + str(range_max)
    plt.title(graph_title)

### Function to split channels
def chan_split(filtered_data, participant, clip, dtype):
    ch1 = [row[0] for row in filtered_data[participant]['ECG'][dtype][clip]]
    ch2 = [row[1] for row in filtered_data[participant]['ECG'][dtype][clip]]
    
    return ch1, ch2


### Function to plot Heard Rate Signal Peak Detection
def peak_detection(participant, clip, dtype, ch = 1, range_min = 0, range_max = -1):
    
    p1_c1_ch1, p1_c1_ch2 =(chan_split(participant, clip, dtype))
    wd1 = hp.process(p1_c1_ch1[range_min:range_max], 256)
    wd2 = hp.process(p1_c1_ch1[range_min:range_max], 256)
    if ch == 1:
        hp.plotter(wd1[0], wd1[1])
    else:
        hp.plotter(wd2[0], wd2[1])
            

# Load & Process RAW Data, then create and save processed data to .pkl

In [None]:
raw_data = {}
load_raw_from = '../data_set_raw_ECG/raw_data_dict.pkl'
dump_to = '../data_set_raw_ECG/processed_data_dict.pkl'

with open(load_raw_from, 'rb') as f:
    raw_data = pickle.load(f)

filtered_data = data_filter(raw_data)

with open(dump_to, 'wb') as f:
    pickle.dump(filtered_data, f)
    print("Dumped proccesed data for " , str(num_of_participants), " participants and ", str(num_of_clips), " ", dump_to)

plt.plot(raw_data['participant_1']['ECG']['clip 1']['CH1']['stimuli'][0:1024])

del raw_data
del filtered_data

# Load Filtered Data

In [None]:
data = {}
load_processed_from = '../data_set_raw_ECG/processed_data_dict.pkl'
with open(load_processed_from, 'rb') as f:
    data = pickle.load(f)

In [None]:
data['participant_1']['ECG']['clip 1'].keys()
len(data['participant_1']['ECG']['clip 1']['CH1']['stimuli'])


# Extract Time Domain

In [None]:
windows_size = 512

def extract_time_domain(processed_data):
    features = {}
    cfg = tsfel.get_features_by_domain()
    for participant in processed_data:
        features[participant] = {}
        for clip in processed_data[participant]['ECG']:
            mod = 256 % data[participant]['ECG'][clip]['CH1']['stimuli']
            sample_size = len(data['participant_1']['ECG']['clip 1']['CH1']['stimuli']) 
            exclude = ((sample_size-(mod*256))/2)
            i = 0
            while i < sample_size :
                features[participant][clip] = {}
                features[participant][str(clip)]['CH1'] = tsfel.time_series_features_extractor(cfg, processed_data[participant]['ECG'][clip]['CH1']['stimuli'][i:i+windows_size])
                features[participant][str(clip)]['CH2'] = tsfel.time_series_features_extractor(cfg, processed_data[participant]['ECG'][clip]['CH2']['stimuli'][i:i+windows_size])
                
                features[participant][str(clip)]['ScoreValence'] = data[participant]['ScoreValence']
                features[participant][str(clip)]['ScoreArousal'] = data[participant]['ScoreArousal']
                features[participant][str(clip)]['ScoreDominance'] = data[participant]['ScoreDominance']
                
                i += windows_size
            
            print("Done with time features for participant ", participant, " clip ", clip)
    return features

import warnings
warnings.filterwarnings("ignore")

def extract_time_domain_as_pd_df(processed_data):
    combined_data = pd.DataFrame()
    cfg = tsfel.get_features_by_domain()
    for CH in ['CH1', 'CH2']:
        for participant in processed_data:
            for clip in processed_data[participant]['ECG']:
                data_size = len(processed_data[participant]['ECG'][clip][CH]['stimuli'])
                for_iterations = windows_size % data_size
                for i in range(0, data_size, windows_size):
                    print("Processing " + participant + " " + clip + " " + CH + " window:" + str(int(i/windows_size)) + "/" + str(int(data_size/windows_size)) + "          ", end="\r", flush=True)
                    features_df = pd.DataFrame()
                    features_df = tsfel.time_series_features_extractor(cfg, processed_data[participant]['ECG'][clip][CH]['stimuli'][i:i+windows_size], verbose=False)
                    features_df['ScoreValence'] = data[participant]['ECG'][clip]['ScoreValence']
                    features_df['ScoreArousal'] = data[participant]['ECG'][clip]['ScoreArousal']
                    features_df['ScoreDominance'] = data[participant]['ECG'][clip]['ScoreDominance']
                
                    combined_data = pd.concat([combined_data, features_df], ignore_index=True)
                
                    # print("Done with time features for participant ", participant, " clip ", clip)
    print()
    print("Done extracting time domain features")
    return combined_data

In [None]:
# Save to dict to .pkl
# time_domain_features = extract_time_domain(data)

# dump_to = '../features/time_domain_all_dict.pkl'
# with open(dump_to, 'wb') as f:
#     pickle.dump(time_domain_features, f)
# del time_domain_features

In [None]:
# Save pandas df to .pkl

time_domain_df = extract_time_domain_as_pd_df(data)

dump_to = '../features/time_domain_df.pkl'
print("Dumping to pickle file")
time_domain_df.to_pickle(dump_to)
print("Dumping to excel file")
time_domain_df.to_excel('../features/time_domain_df.xlsx')
print("Dumping to csv file")
time_domain_df.to_csv('../features/time_domain_df.csv')

del time_domain_df

# Train the Model using sklearn

In [None]:
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn.multioutput import MultiOutputClassifier
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

In [None]:
# Common to all:

# Load data
features_df = pd.read_pickle('../features/time_domain_df.pkl')
features_df.fillna(0, inplace=True)

# Split Data into X and Y
X = features_df.drop(['ScoreValence', 'ScoreArousal', 'ScoreDominance'], axis=1)
Y = features_df[['ScoreValence', 'ScoreArousal', 'ScoreDominance']]  # Ensure correct DataFrame is used

# Normalize data
scaler = StandardScaler()
X_normalized = scaler.fit_transform(X)

# Train-test split
X_train, X_test, Y_train, Y_test = train_test_split(X_normalized, Y, test_size=0.2, random_state=42)

### SVM with 'linear' kernel

In [None]:
# Train the SVM
svm_linear_model = SVC(kernel='linear')  # Correctly named SVM model
multi_target_svm = MultiOutputClassifier(svm_linear_model)
multi_target_svm.fit(X_train, Y_train)

# Predict and evaluate
Y_pred = multi_target_svm.predict(X_test)

# Evaluate each target
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    accuracy = accuracy_score(Y_test.iloc[:, i], Y_pred[:, i])
    print(f"Accuracy for {target}:", accuracy)

#### SVM with 'rbf' kernel

In [None]:
# Train the SVM
svm_rbf_model = SVC(kernel='rbf')  # Correctly named SVM model
multi_target_svm = MultiOutputClassifier(svm_rbf_model)
multi_target_svm.fit(X_train, Y_train)

# Predict and evaluate
Y_pred = multi_target_svm.predict(X_test)

# Evaluate each target
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    accuracy = accuracy_score(Y_test.iloc[:, i], Y_pred[:, i])
    print(f"Accuracy for {target}:", accuracy)

#### SVM with 'poly' kernel

In [None]:
# Train the SVM
svm_poly_model = SVC(kernel='poly')  # Correctly named SVM model
multi_target_svm = MultiOutputClassifier(svm_poly_model)
multi_target_svm.fit(X_train, Y_train)

# Predict and evaluate
Y_pred = multi_target_svm.predict(X_test)

# Evaluate each target
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    accuracy = accuracy_score(Y_test.iloc[:, i], Y_pred[:, i])
    print(f"Accuracy for {target}:", accuracy)

### SVM with PCA and 'rbf' kernel 

In [None]:
#define PCA model to use
pca = PCA(0.95) #PCA with 95% variance

X_train_pca = pca.fit_transform(X_train) #PCA on train data
X_test_pca = pca.transform(X_test)#PCA on test data

print(pca.explained_variance_ratio_)

# Train the SVM
svm_pca_rbf_model = SVC(kernel='rbf', C=100, gamma=1, verbose=True)
multi_target_svm = MultiOutputClassifier(svm_pca_rbf_model)
multi_target_svm.fit(X_train_pca, Y_train)

# Predict and evaluate
Y_pred = multi_target_svm.predict(X_test_pca)

# Evaluate each target
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    accuracy = accuracy_score(Y_test.iloc[:, i], Y_pred[:, i])
    print(f"Accuracy for {target}:", accuracy)

In [None]:
# Scaling
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_pca)
X_test_scaled = scaler.transform(X_test_pca)

# Multi-Output SVM Classifier
# Best parameters: {'estimator__C': 100, 'estimator__gamma': 1, 'estimator__kernel': 'rbf'}
svm_rbf_model = SVC(kernel='rbf', C=100, gamma=1)  # Base SVM model
multi_target_svm = MultiOutputClassifier(svm_rbf_model)  # Multi-output wrapper

# Training
multi_target_svm.fit(X_train_scaled, Y_train)

# Predictions
y_pred = multi_target_svm.predict(X_test_scaled)

# Evaluation
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    print(f"Classification Report for {target}:")
    print(classification_report(Y_test.iloc[:, i], y_pred[:, i]))

# Grid Search (Note: This is tricky with multi-output classifiers)
# For simplicity, the following code is a template for a single-output grid search
param_grid = {
    'estimator__C': [0.1, 1, 10, 100],  # Note the 'estimator__' prefix
    'estimator__gamma': [1, 0.1, 0.01, 0.001],
    'estimator__kernel': ['rbf', 'linear', 'poly']
}
grid_search = GridSearchCV(MultiOutputClassifier(SVC()), param_grid, refit=True, verbose=2)
grid_search.fit(X_train_scaled, Y_train)

# Best parameters
print("Best parameters:", grid_search.best_params_)

# CHECK THIS ONE ^^^^^^

### Random Forest with PCA

In [None]:
# PCA model to use
pca = PCA(0.95)  # PCA with 95% variance

X_train_pca = pca.fit_transform(X_train)  # PCA on train data
X_test_pca = pca.transform(X_test)  # PCA on test data

print(pca.explained_variance_ratio_)

# Train the Random Forest
rf_model = RandomForestClassifier(verbose=True)  # Random Forest model
multi_target_rf = MultiOutputClassifier(rf_model)
multi_target_rf.fit(X_train_pca, Y_train)

# Predict and evaluate
Y_pred = multi_target_rf.predict(X_test_pca)

# Evaluate each target
for i, target in enumerate(['Valence', 'Arousal', 'Dominance']):
    accuracy = accuracy_score(Y_test.iloc[:, i], Y_pred[:, i])
    print(f"Accuracy for {target}:", accuracy)


# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In [None]:
# PC_values = np.arange(pca.n_components_) + 1
# plt.plot(PC_values, pca.explained_variance_ratio_, 'o-', linewidth=2, color='blue')
# plt.title('Scree Plot')
# plt.xlabel('Principal Component')
# plt.ylabel('Variance Explained')
# plt.show()

In [None]:
# percent_explained = np.cumsum(pca.explained_variance_ratio_)/np.sum(pca.explained_variance_ratio_)
# plt.plot(PC_values, percent_explained, 'o-', linewidth=2, color='blue')
# plt.hlines(0.95,1,15)
# plt.title('Scree Plot')
# plt.xlabel('Principal Component')
# plt.ylabel('Percent Explained')
# plt.grid()
# plt.show()

In [None]:
# matrix = features_df.corr()
# plt.figure(figsize=(15, 10))  # Adjust the figure size as needed
# sns.heatmap(matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1)
# plt.title('Correlation Matrix Heatmap')
# plt.show()

In [None]:
# from sklearn.model_selection import GridSearchCV
# from sklearn import svm
# from sklearn import metrics

# # defining parameter range
# param_grid = {'C': [0.1, 1, 10, 100, 1000], 
#               'gamma': [1, 0.1, 0.01, 0.001, 0.0001],
#               'kernel': ['rbf', 'poly']} 
  
# grid = GridSearchCV(multi_target_svm, param_grid, refit = True, verbose = 3) #Grid model definition
  
# grid.fit(X_train, Y_train) #fit the grid mode

In [None]:
# # Load data
# features_df = pd.read_pickle('../features/time_domain_df.pkl')
# features_df.fillna(0, inplace=True)

# # Split Data into X and Y
# X = features_df.drop(['ScoreValence', 'ScoreArousal', 'ScoreDominance'], axis=1)
# Y = features_df[['ScoreValence', 'ScoreArousal', 'ScoreDominance']]  # Ensure correct DataFrame is used

# # Normalize data
# scaler = StandardScaler()
# X_normalized = scaler.fit_transform(X)
# # 
# # Train-test split
# X_train, X_test, Y_train, Y_test = train_test_split(X_normalized, Y, test_size=0.2, random_state=42)

# # pca = PCA(0.95) #PCA with 95% variance

# # X_train = pca.fit_transform(X_train) #PCA on train data
# # X_test = pca.transform(X_test)#PCA on test data

# #define PCA model to use
# n_components=100
# pca = PCA(n_components=n_components)

# #fit PCA model to data
# pca_fit = pca.fit(features_df)

# print(pca.explained_variance_ratio_)
# # print("PCA size: ", str(len(pca)))

# # Train a separate SVM model for each target
# for i, target in enumerate(['ScoreValence', 'ScoreArousal', 'ScoreDominance']):
#     svm_model = SVC(kernel='rbf')  # Correctly named SVM model
#     svm_model.fit(X_train, Y_train[target])
#     Y_pred = svm_model.predict(X_test)
#     accuracy = accuracy_score(Y_test[target], Y_pred)
#     print(f"Accuracy for {target}:", accuracy)
