<a href="https://colab.research.google.com/github/FedotovD/WESAD_ECG_SSL/blob/main/Case_study_ECG_affect_recognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

This Python Notebook file will guide you through the process of affect recognition based on electrocardiographic data (ECG) using three approaches:


1.   Classical: utilizing expert knowledge based features and training a Support Vector Machines (SVM) model for three class classification problem.
2.   Fully-supervised deep learning based: utilizing raw signal to train a Convolutional Neural Network (CNN) from scratch, directly on target data.
3.   Self-supervised deep learning based: utilizing model pretrained on transformed dataset using transfer learning. 



# Dataset

We will use WESAD (Wearable Stress and Affect Detection) database, collected by Corporate Research Group of Robert Bosch GmbH. It is available for download [here](https://archive.ics.uci.edu/ml/datasets/WESAD+%28Wearable+Stress+and+Affect+Detection%29). For more detailed information about the dataset, please refer to the [original paper](https://www.eti.uni-siegen.de/ubicomp/papers/ubi_icmi2018.pdf) by Philip Schmidt, Attila Reiss, Robert Dürichen, Claus Marberger, Kristof Van Laerhoven.

## Preprocessing
For classical approach we have extracted the following features:

For deep-learning based approach we have downsampled data to 256Hz with accordance to [the recent paper](https://arxiv.org/pdf/2002.03898.pdf) and to facilitate ease of the model training.

Further preprocessing (e.g. train / dev / validation split, data normalization) is done in corresponding code cells below.

Let's start by importing necessery libraries.

In [None]:
import os
import gc

import pandas as pd
import numpy as np
import tensorflow as tf

from tensorflow import keras
from tensorflow.keras import layers

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import LeaveOneGroupOut,LeavePGroupsOut

from matplotlib import pyplot as plt

Let's now download the data from a git repository.

In [None]:
# Remove the folder if already exists
if os.path.exists('WESAD_ECG_SSL'):
    ! rm -rf WESAD_ECG_SSL

# Clone the git repo
! git clone --quiet https://github.com/FedotovD/WESAD_ECG_SSL.git

# Classical approach

We start by calculating a baseline for this data. Let's read the expert knowledge based features and print several rows.

In [None]:
ecg_expert = pd.read_csv('WESAD_ECG_SSL/data/ECG_expert_features.csv', index_col=0)
ecg_expert.head()

We now split it into train and test, then normalize the data.

In [None]:
def class_to_number(labels):
    # This function converts classes from categories to numbers, e.g. "Base" to 0, "Fun" to 1, etc.

    classes = np.unique(labels)

    for c in range(len(classes)):
        labels[labels == classes[c]] = c
    
    return np.array(labels, dtype=int)

def one_hot_encoding(labels, num_classes):
    # This function performs one hot encoding for labels

    if type(labels[0]) == str:
        labels = class_to_number(labels)

    elif type(labels[0]) == float:
        labels = int(labels)

    Y = np.eye(num_classes)[labels]

    return Y

In [None]:
# Define train and test sets
train_participants = ['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10', 'S11', 'S13', 'S14']
test_participants  = ['S15', 'S16', 'S17']

# Subsample indices for each set
train_idx = ecg_expert[ecg_expert.pid.isin(train_participants)].index
test_idx = ecg_expert[ecg_expert.pid.isin(test_participants)].index

# Split data
X_train = ecg_expert.loc[train_idx].iloc[:,2:].to_numpy()    # iloc[:,2:] drops first two columns (pid and lab)
y_train = ecg_expert.loc[train_idx].lab.to_numpy()
y_train = class_to_number(y_train)

X_test  = ecg_expert.loc[test_idx].iloc[:,2:].to_numpy()
y_test  = ecg_expert.loc[test_idx].lab.to_numpy()
y_test  = class_to_number(y_test)

# Normalize data
scaler = StandardScaler()
X_train_norm = scaler.fit_transform(X_train)
X_test_norm  = scaler.transform(X_test)

Now we are ready to train the classifier.

In [None]:
# Define classifier (SVM for classification) with default parameters
clf = SVC()

# Fit it to train data
clf.fit(X_train_norm, y_train)

# Make predictions
pred_classical = clf.predict(X_test_norm)

Let's check the results.

In [None]:
print('Accuracy\t{}'.format(np.round(accuracy_score(pred_classical, y_test),3)))
print('F1 score\t{}'.format(np.round(f1_score(pred_classical, y_test, average='macro'),3)))
print('UAR\t\t{}'.format(np.round(recall_score(pred_classical, y_test, average='macro'),3)))

Can we do better? Sure! Let's try deep learning based approach

# Fully supervised deep learning based approach

At this step we will use CNN to recognize affect. The architecture is proposed [here](https://https://arxiv.org/pdf/2002.03898.pdf). We will use TensorFlow and Keras to create, train and test our model.

We read the data at first. Note that for the data of each participant a separate file is assigned. Therefore, the procedure is slightly different here.

Also, in order to avoid overfitting during training, we need an additional development set. We will check the performance on it while training to decide when to stop.

In [None]:
def read_data(participants):
    # This function reads the data according to participants list taken as an argument

    X = []

    for filename in train_participants:
        temp = pd.read_csv('WESAD_ECG_SSL/data/{}.csv'.format(filename), index_col=0)
        X.append(temp)
    
    X = pd.concat(X, axis=0)
    y = X.lab.to_numpy()
    y = class_to_number(y)
    X = X.iloc[:,2:].to_numpy()
    
    return X, y

In [None]:
# Define train and test sets
train_participants = ['S2', 'S3', 'S4', 'S5', 'S6', 'S7', 'S8', 'S9', 'S10']
devel_participants = ['S11', 'S13', 'S14']
test_participants  = ['S15', 'S16', 'S17']


# Read files for train and test participants
X_train, y_train = read_data(train_participants)
X_devel, y_devel = read_data(devel_participants)
X_test, y_test = read_data(test_participants)


# Normalize data
scaler = StandardScaler()
X_train_norm = scaler.fit_transform(X_train)
X_devel_norm = scaler.transform(X_devel)
X_test_norm  = scaler.transform(X_test)


# Convert labels to one-hot
y_train_one_hot = one_hot_encoding(y_train, 3)
y_devel_one_hot = one_hot_encoding(y_devel, 3)
y_test_one_hot = one_hot_encoding(y_test, 3)

Now we have the data ready. Let's get to modeling!

In [None]:
def create_graph(input_shape=2560, num_classes=3):
    inputs = keras.Input(shape=(input_shape,1))

    # Conv1
    x = layers.Conv1D(filters=32, kernel_size=32, strides=1)(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(layers.LeakyReLU())(x)

    x = layers.Conv1D(filters=32, kernel_size=32, strides=1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(layers.LeakyReLU())(x)

    x = layers.MaxPooling1D(pool_size=8, strides=2)(x)

    # Conv2
    x = layers.Conv1D(filters=64, kernel_size=16, strides=1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(layers.LeakyReLU())(x)

    x = layers.Conv1D(filters=64, kernel_size=16, strides=1)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation(layers.LeakyReLU())(x)

    x = layers.GlobalMaxPooling1D()(x)
    x = layers.Flatten()(x)

    # Head
    x = layers.Dense(128)(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Activation(layers.LeakyReLU())(x)
    outputs = layers.Dense(num_classes, 'softmax')(x)
  
    return inputs, outputs

In [None]:
# Create a model
inputs, outputs = create_graph(input_shape=2560, num_classes=3)
model_fully_supervised = keras.Model(inputs, outputs)

# Define an optimizer and callbacks (to stop the training when the development loss does not decrease)
opt = keras.optimizers.Adam(lr=0.001)
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Compile and fit the model
model_fully_supervised.compile(optimizer=opt, loss='categorical_crossentropy')
history_fs = model_fully_supervised.fit(X_train_norm, y_train_one_hot, validation_data=(X_devel_norm, y_devel_one_hot), callbacks=[callback], epochs=100, batch_size=32, verbose=2)

Let's plot the graphs of train and validation loss.

In [None]:
plt.plot(history_fs.history['loss'])
plt.plot(history_fs.history['val_loss'])
plt.show()

Next, let's make a prediction for text set and calculate metric values

In [None]:
pred_fully_supervised = model_fully_supervised.predict(X_test_norm)

In [None]:
print('Accuracy\t{}'.format(np.round(accuracy_score(np.argmax(pred_fully_supervised, axis=1), y_test),3)))
print('F1 score\t{}'.format(np.round(f1_score(np.argmax(pred_fully_supervised, axis=1), y_test, average='macro'),3)))
print('UAR\t\t{}'.format(np.round(recall_score(np.argmax(pred_fully_supervised, axis=1), y_test, average='macro'),3)))

# Self supervised + transfer learning

Next, let's use self-supervised approach to learn representations from data (pretext model) and then apply transfer learning to use these representations for the target task (downstream model). 

First, we will need a way to create a proxy task for the model to train on. In our case it will be classification of transformation type applied to the signal (if any). 

We apply one of these transormations: horizontal flip, addition of noise, scaling by a certain factor.

In [None]:
# Three functions below are taken from source code of the autors of this paper:
# https://arxiv.org/pdf/2002.03898.pdf

def hor_filp(signal):
    """ 
    flipped horizontally 
    """
    hor_flipped = np.flip(signal)
    return hor_flipped

def add_noise_with_SNR(signal, noise_amount):
    """ 
    adding noise
    created using: https://stackoverflow.com/a/53688043/10700812 
    """
    
    target_snr_db = noise_amount #20
    x_watts = signal ** 2                       # Calculate signal power and convert to dB 
    sig_avg_watts = np.mean(x_watts)
    sig_avg_db = 10 * np.log10(sig_avg_watts)   # Calculate noise then convert to watts
    noise_avg_db = sig_avg_db - target_snr_db
    noise_avg_watts = 10 ** (noise_avg_db / 10)
    mean_noise = 0
    noise_volts = np.random.normal(mean_noise, np.sqrt(noise_avg_watts), len(x_watts))     # Generate an sample of white noise
    noised_signal = signal + noise_volts        # noise added signal

    return noised_signal 

def scaled(signal, factor):
    """"
    scale the signal
    """
    scaled_signal = signal * factor
    return scaled_signal

def transform_data(X):
    # This function performes defined above transformations and creates artificial labels.

    X_noised = np.zeros(X.shape)
    X_scaled = np.zeros(X.shape)
    X_flipped = np.zeros(X.shape)

    for i in range(X.shape[0]):
        X_noised[i,:] = add_noise_with_SNR(X[i,:], 20)
        X_scaled[i,:] = scaled(X[i,:], 1.1)
        X_flipped[i,:] = hor_filp(X[i,:])
    
    full_data = np.concatenate((X, X_noised, X_scaled, X_flipped))
    full_labels = np.concatenate((np.full(X.shape[0], 0),
                                  np.full(X.shape[0], 1),
                                  np.full(X.shape[0], 2),
                                  np.full(X.shape[0], 3)))
    
    return full_data, full_labels

We will directly use normalized data from previous task. 

In [None]:
# Transform data and get artificial labels
X_train_transformed, y_train_transformed = transform_data(X_train_norm)
X_devel_transformed, y_devel_transformed = transform_data(X_devel_norm)

# Perform one hot encoding for labels
y_train_transformed_one_hot = one_hot_encoding(y_train_transformed, 4)
y_devel_transformed_one_hot = one_hot_encoding(y_devel_transformed, 4)

Let's create a pretext model for transformation classification task and train it. In this case the number of classes is 4: [original, noised, scaled, flipped] signal. We use the same pipeline as before.

In [None]:
# Create a model
inputs, outputs = create_graph(input_shape=2560, num_classes=4)
model_pretext = keras.Model(inputs, outputs)

# Define an optimizer and callbacks (to stop the training when the development loss does not decrease)
opt = keras.optimizers.Adam(lr=0.001)
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Compile and fit the model
model_pretext.compile(optimizer=opt, loss='categorical_crossentropy')
history = model_pretext.fit(X_train_transformed, y_train_transformed_one_hot, validation_data=(X_devel_transformed, y_devel_transformed_one_hot),
                            callbacks=[callback], epochs=1, batch_size=32, verbose=1)

Let's plot the graphs of train and validation loss here as well.

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.show()

Now, we don't need to calculate a metric here, as it is a proxy task.

We will further use this model in the following way: on top of the last pooling (global average pooling) we will stack a hidden dense layer and then output layer with softmax activation. 

In [None]:
# Stack the layers
x = layers.Dense(128)(model_pretext.layers[-5].output)
x = layers.Dropout(0.5)(x)
x = layers.Activation(layers.LeakyReLU())(x)
outputs_downstream = layers.Dense(3, 'softmax')(x)

Now we are ready to construct our downstream model. Let's do this and train the model afterwards.

In [None]:
# Create a model
model_downstream = keras.Model(inputs, outputs_downstream)
for i in range(len(model_downstream.layers)-5):
    model_downstream.layers[i].trainable = False

# Define an optimizer and callbacks (to stop the training when the development loss does not decrease)
opt = keras.optimizers.Adam(lr=0.001)
callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Compile and fit the model
model_downstream.compile(optimizer=opt, loss='categorical_crossentropy')
history = model_downstream.fit(X_train_norm, y_train_one_hot, validation_data=(X_devel_norm, y_devel_one_hot), callbacks=[callback], epochs=100, batch_size=32, verbose=2)

We plot loss graphs here as well.

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.show()

And now we are ready for the final prediction

In [None]:
pred_downstream = model_downstream.predict(X_test_norm)

In [None]:
print('Accuracy\t{}'.format(np.round(accuracy_score(np.argmax(pred_downstream, axis=1), y_test),3)))
print('F1 score\t{}'.format(np.round(f1_score(np.argmax(pred_downstream, axis=1), y_test, average='macro'),3)))
print('UAR\t\t{}'.format(np.round(recall_score(np.argmax(pred_downstream, axis=1), y_test, average='macro'),3)))

Let's compare the results of these two deep learning based approaches.

In [None]:
print('Self supervised UAR\t{}'.format(np.round(recall_score(np.argmax(pred_downstream, axis=1), y_test, average='macro'),3)))
print('Fully supervised UAR\t{}'.format(np.round(recall_score(np.argmax(pred_fully_supervised, axis=1), y_test, average='macro'),3)))

# Further task
Now, in order to get a clearer understanding if self-supervised learning helps to obtain better predictions, we need to perform cross-validation. As we have only 15 participants, a good idea may be to use LOGO (Leave-One-Group-Out) cross-validation. In this case each group will contain samples of one participant. This data will be held out for one experiment and serve as test set. We do this procedure 15 times (one time for each participant) and then average the performance or concatenate prediction vectors and calculating the metric afterwards. 

In [None]:
# Read saved results
self_supervised = pd.read_csv('WESAD_ECG_SSL/results_cv/self_supervised.csv', index_col=0)
fully_supervised = pd.read_csv('WESAD_ECG_SSL/results_cv/fully_supervised.csv', index_col=0)
classical = pd.read_csv('WESAD_ECG_SSL/results_cv/classical.csv', index_col=0)

# Extract mean over cross validation partitions
self_supervised_mean = np.mean(self_supervised, axis=0)
fully_supervised_mean = np.mean(fully_supervised, axis=0)
classical_mean = np.mean(classical, axis=0)

x = np.arange(3)

# Plot the figure
plt.figure(figsize=(6,8))
plt.bar(x-0.2, classical_mean, width=0.2, color=(0.835,0.047,0), label='Classical')
plt.bar(x, fully_supervised_mean, width=0.2, color=(0.055,0.471,0.773), label='Fully supervised')
plt.bar(x+0.2, self_supervised_mean, width=0.2, color=(0.404,0.705,0.098), label='Self supervised')

plt.ylim([0.4,1])
plt.xticks([0,1,2], ['Accuracy', 'F1 score', 'UAR'], size=12)

plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.05), ncol=3)
plt.show()