In [None]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from operator import itemgetter
import json 
import os
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

In [None]:
!pip install pickle5
import pickle5 as pickle

In [None]:
with open('/content/sample_data/hr(tuples).pkl', "rb") as fh:
  data = pickle.load(fh)

In [None]:
def getter(x, hr = True):
    if isinstance(x,tuple):
        return itemgetter(0)(x) if hr else itemgetter(1)(x)
    else:
        return x

In [None]:
type(data)

In [None]:
data

In [None]:
data.shape

In [None]:
data.info()

In [None]:
data.Stage.value_counts()

In [None]:
data['Target'].value_counts()

In [None]:
pd.set_option('display.max_rows', None)
data.isna().sum()

In [None]:
#change timesteps round it, not important
time_stamps = list(data.columns[2:-13])
time_stamps = [i[:i.find('.')+3] for i in time_stamps]
dic_rename = dict(zip(data.columns[2:-13], time_stamps))
data.rename(columns = dic_rename, inplace = True)
data.columns

In [None]:
#spilting dataset into stages with hr and ppi
stage_dic_hr = {}
stage_dic_ppi = {}
for s in data.Stage.unique():
  stage_dic_hr[s] = data[(data['Stage'] == s) & (data['Target'] != 'BASELINE')].applymap(lambda x:getter(x))

stage_dic_hr['BASELINE_STIM'] = data[(data['Stage'] == 'STIMULUS') & (data['Target'] == 'BASELINE')].applymap(lambda x:getter(x))
stage_dic_hr['BASELINE_QEST'] = data[(data['Stage'] == 'QUESTIONNAIRES') & (data['Target'] == 'BASELINE')].applymap(lambda x:getter(x))

for s in data.Stage.unique():
  stage_dic_ppi[s] = data[(data['Stage'] == s) & (data['Target'] != 'BASELINE')].applymap(lambda x:getter(x, False))

stage_dic_ppi['BASELINE_STIM'] = data[(data['Stage'] == 'STIMULUS') & (data['Target'] == 'BASELINE')].applymap(lambda x:getter(x, False))
stage_dic_ppi['BASELINE_QEST'] = data[(data['Stage'] == 'QUESTIONNAIRES') & (data['Target'] == 'BASELINE')].applymap(lambda x:getter(x, False))

In [None]:
data[(data['Participant'] == 44) & (data['Target'] == 'ENTHUSIASM')].index

In [None]:
sns.lineplot(data = data.iloc[684, 2:-13].map(lambda x:getter(x)).values)

In [None]:
stages = ['WASHOUT', 'STIMULUS', 'QUESTIONNAIRES']
emodf_hr = {}
for s in stages:
  for emo in list(stage_dic_hr[s].Target.unique()):
      emodf_hr[emo+'_'+s] = stage_dic_hr[s][stage_dic_hr[s]['Target'] == emo]

emodf_ppi = {}
for s in stages:
  for emo in list(stage_dic_ppi[s].Target.unique()):
      emodf_ppi[emo+'_'+s] = stage_dic_ppi[s][stage_dic_ppi[s]['Target'] == emo]


In [None]:
emo_lensignalhr = {}
for emo in emodf_hr:
  emo_lensignalhr[emo] = (6290 - emodf_hr[emo].iloc[:, 2:-13].T.isna().sum())

emo_lensignalppi = {}
for emo in emodf_ppi:
  emo_lensignalppi[emo] = (6290 - emodf_ppi[emo].iloc[:, 2:-13].T.isna().sum())


In [None]:
target_map = {k: ind  for ind, k in enumerate(data['Target'].unique())}

from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(data['Target'])

In [None]:
dataset_hr = data[data['Stage'] == 'STIMULUS'].applymap(lambda x:getter(x))
dataset_hr = dataset_hr.iloc[:, 2:-12]

dataset_ppi = data[data['Stage'] == 'STIMULUS'].applymap(lambda x:getter(x, False))
dataset_ppi = dataset_ppi.iloc[:, 2:-12]

In [None]:
def split_dataset(df, target_map):
  df = df.copy()

  df['Target'].replace(target_map, inplace = True)
  y = df['Target'].copy()
  X = df.drop('Target', axis = 1).copy()
  
  X_train,  X_test, y_train, y_test = train_test_split(X, y, train_size = 0.7, random_state = 123, stratify=y)

  return X_train,  X_test, y_train, y_test

In [None]:
X_train,  X_test, y_train, y_test = split_dataset(dataset_hr, target_map)

In [None]:
X_train

In [None]:
y_test.value_counts()

In [None]:
y_train.value_counts()

In [None]:
inputs = tf.keras.Input(shape = X_train.shape[1])

h1 = tf.keras.layers.Dense(128, activation = 'relu')(inputs)
h2 = tf.keras.layers.Dense(128, activation = 'relu')(h1)
h3 = tf.keras.layers.Dense(64, activation = 'relu')(h2)

outputs = tf.keras.layers.Dense(11, activation = 'softmax')(h3)

model_simple_dnn = tf.keras.Model(inputs = inputs, outputs = outputs)

print(model_simple_dnn.summary())

In [None]:
model_simple_dnn.compile(
    optimizer = 'adam',
    loss = 'sparse_categorical_crossentropy',
    metrics = ['accuracy']
)

history = model_simple_dnn.fit(
    X_train, 
    y_train,
    validation_split = 0.2,
    batch_size = 32,
    epochs = 50, 
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor = 'val_loss',
            patience = 5,
            restore_best_weights = True
        )
    ]
)

In [None]:
modeldnn_acc = model_simple_dnn.evaluate(X_test, y_test, verbose=0)[1]
print(f"Test Accuracy : {round(modeldnn_acc, 3)*100} %")

In [None]:
inputs = tf.keras.Input(shape = X_train.shape[1])

expand_dim = tf.expand_dims(inputs, axis = 2)
gru = tf.keras.layers.GRU(265, return_sequences = True)(expand_dim)
flatten = tf.keras.layers.Flatten()(gru)

outputs = tf.keras.layers.Dense(11, activation = 'softmax')(flatten)

model_gru = tf.keras.Model(inputs = inputs, outputs = outputs)

print(model_gru.summary())

In [None]:
model_gru.compile(
    optimizer = 'adam',
    loss = 'sparse_categorical_crossentropy',
    metrics = ['accuracy']
)

history = model_gru.fit(
    X_train, 
    y_train,
    validation_split = 0.2,
    batch_size = 32,
    epochs = 50, 
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor = 'val_loss',
            patience = 5,
            restore_best_weights = True
        )
    ]
)

In [None]:
modelgru_acc = model_gru.evaluate(X_test, y_test, verbose=0)[1]
print(f"Test Accuracy : {round(modelgru_acc, 3)*100} %")

In [None]:
model_lstm = tf.keras.Sequential()

model_lstm.add(tf.keras.layers.LSTM(units=50,return_sequences=True,input_shape=(X_train.shape[1],1)))
model_lstm.add(tf.keras.layers.Dropout(0.2))
model_lstm.add(tf.keras.layers.LSTM(units=50,return_sequences=True))
model_lstm.add(tf.keras.layers.Dropout(0.2))
model_lstm.add(tf.keras.layers.LSTM(units=50,return_sequences=True))
model_lstm.add(tf.keras.layers.Dropout(0.2))
model_lstm.add(tf.keras.layers.LSTM(units=50))
model_lstm.add(tf.keras.layers.Dropout(0.2))
model_lstm.add(tf.keras.layers.Dense(units=11,activation='softmax'))   


model_lstm.compile(optimizer='adam',loss='sparse_categorical_crossentropy',metrics='accuracy')
print(model_lstm.summary())

In [None]:
history = model_lstm.fit(
    X_train, 
    y_train,
    validation_split = 0.2,
    batch_size = 32,
    epochs = 50, 
    callbacks=[
        tf.keras.callbacks.EarlyStopping(
            monitor = 'val_loss',
            patience = 5,
            restore_best_weights = True
        )
    ]
)

In [None]:
modellstm_acc = model_lstm.evaluate(X_test, y_test, verbose=0)[1]
print(f"Test Accuracy : {round(modellstm_acc, 3)*100} %")