<a href="https://colab.research.google.com/github/abelowska/eegML/blob/main/Classes_04_perfectionism.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prediction of perfectionism CMDA subscale from error-related negativity component

Install additional libraries

In [None]:
!pip install MNE

Imports

In [None]:
import os
import re
import glob
import os
import sys
import ast
import os.path as op
from collections import defaultdict
from copy import deepcopy
import copy

import pickle
import mne
import scipy
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score

from sklearn.linear_model import LinearRegression

import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.base import TransformerMixin, BaseEstimator

import warnings

from sklearn import datasets, linear_model
import seaborn as sns
sns.set_theme(style="whitegrid", palette="deep")

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn import set_config
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn import tree
from sklearn.model_selection import cross_val_score

warnings.filterwarnings("ignore")

---
## Load data

Loading EEG and questionnaire data. By default create_df_data loads all info from given .csv file but one can specify it by passing a list of desired labels.

In [None]:
def create_df_data(
    dir_path,
    info_filename=None,
    info="all",
):
    """Loads data for all participants and create DataFrame with optional additional info from given .csv file.

    On default, loads a train set: chooses only 80% of participants
    and for each of them chooses 80% of epochs.
    It will choose them deterministically.

    If test_participants is set to True, it will load remaining 20% of participants.

    Parameters
    ----------
    test: bool
        whether load data for training or final testing.
        If true load participants data for testing.
    info_filename: String | None
        path to .csv file with additional data.
    info: array
        listed parameters from the info file to be loaded.
        if 'all', load all parameters

    Returns
    -------
    go_nogo_data_df : pandas.DataFrame

    """
    print(dir_path)
    header_files_glob = "responses_100_600/*.vhdr"
    print(header_files_glob)

    # extract header files
    header_files = glob.glob(header_files_glob)
    header_files = sorted(header_files)
    print(header_files)

    # create dataframe for results
    go_nogo_data_df = pd.DataFrame()

    for file in header_files:
        # load eeg data for given participant
        participant_epochs = load_epochs_from_file(file)

        # and compute participant's id from file_name
        participant_id = re.match(r".*_(\w+).*", file).group(1)

        error = participant_epochs["error_response"]._data
        correct = participant_epochs["correct_response"]._data

        # exclude those participants who have too few samples
        if len(error) < 5 or len(correct) < 5:
            # not enough data for this participant
            continue

        # construct dataframe for participant with: id|epoch_data|response_type|additional info...
        participant_df = create_df_from_epochs(
            participant_id, participant_epochs, info_filename, info
        )

        # add participant's data to results dataframe
        print(participant_id)
        go_nogo_data_df = go_nogo_data_df.append(participant_df, ignore_index=True)

    return go_nogo_data_df

In [None]:
def load_epochs_from_file(file, reject_bad_segments="auto", mask=None):
    """Load epochs from a header file.

    Args:
        file: path to a header file (.vhdr)
        reject_bad_segments: 'auto' means that bad segments are rejected automatically.

    Returns:
        mne Epochs

    """
    # Import the BrainVision data into an MNE Raw object
    raw = mne.io.read_raw_brainvision(file)

    # Construct annotation filename
    annot_file = file[:-4] + "vmrk"

    # Read in the event information as MNE annotations
    annotations = mne.read_annotations(annot_file)

    # Add the annotations to our raw object so we can use them with the data
    raw.set_annotations(annotations)

    # Map with response markers only
    event_dict = {
        "Stimulus/RE*ex*1_n*1_c_1*R*FB": 10004,
        "Stimulus/RE*ex*1_n*1_c_1*R*FG": 10005,
        "Stimulus/RE*ex*1_n*1_c_2*R": 10006,
        "Stimulus/RE*ex*1_n*2_c_1*R": 10007,
        "Stimulus/RE*ex*2_n*1_c_1*R": 10008,
        "Stimulus/RE*ex*2_n*2_c_1*R*FB": 10009,
        "Stimulus/RE*ex*2_n*2_c_1*R*FG": 10010,
        "Stimulus/RE*ex*2_n*2_c_2*R": 10011,
    }

    # Map for merged correct/error response markers
    merged_event_dict = {"correct_response": 0, "error_response": 1}

    # Reconstruct the original events from Raw object
    events, event_ids = mne.events_from_annotations(raw, event_id=event_dict)

    # Merge correct/error response events
    merged_events = mne.merge_events(
        events,
        [10004, 10005, 10009, 10010],
        merged_event_dict["correct_response"],
        replace_events=True,
    )
    merged_events = mne.merge_events(
        merged_events,
        [10006, 10007, 10008, 10011],
        merged_event_dict["error_response"],
        replace_events=True,
    )

    # epochs = []
    this_reject_by_annotation = True

    # Read epochs
    epochs = mne.Epochs(
        raw=raw,
        events=merged_events,
        event_id=merged_event_dict,
        tmin=tmin,
        tmax=tmax,
        baseline=None,
        reject_by_annotation=this_reject_by_annotation,
        preload=True,
        # verbose='CRITICAL',
    )
    
    return epochs

In [None]:
def create_df_from_epochs(
    id, 
    participant_epochs, 
    info_filename, 
    info
):
    """Create df for each participant. DF structure is like: {id: String ; epoch: epoch_data ; marker: 1.0|0.0}
    1.0 means correct and 0.0 means error response.

    Parameters
    ----------
    id: String
        participant's id extracted from filename
    participant_epochs: mne Epochs
        epoched eeg data
    info_filename: String
        path to .csv file with questionnaire data.
    info: array
        listed parameters from the info file to be loaded.
        if 'all', load all parameters

    Returns
    -------
    participant_df : pandas.DataFrame

    """

    # create dataframe for participant's questionnaire data
    info_df = pd.DataFrame()

    # extract questionnaire data from .csv file
    if info_filename is not None:
        if info == "all":
            this_info_df = pd.read_csv(info_filename)
        else:
            this_info_df = pd.read_csv(info_filename, usecols=["Demo_kod"] + info)
        info_df = (
            this_info_df.loc[this_info_df["Demo_kod"] == id]
            .reset_index()
            .drop("index", axis=1)
        )
        
    # create dataframe record with participant's data: ID, eeg data, questionnaire data    
    participant_df = pd.DataFrame(
        {
            "id": [id], 
            "epoch": [participant_epochs], 
        }).join(
            info_df
        )

    return participant_df

### Read data

In [None]:
# constants
tmin, tmax = -0.101562, 0.5937525  # Start and end of the segments
signal_frequency = 256
random_state = 42
test_size = 0.3

ERROR = 1
CORRECT = 0

In [None]:
# mount google drive in colab
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# display data in folder
!ls gdrive/MyDrive/perfectionism_data

'GNG_perfectionism (1).pkl'  'GNG_perfectionism (6).pkl'
'GNG_perfectionism (2).pkl'   GNG_perfectionism.pkl
'GNG_perfectionism (3).pkl'   picked
'GNG_perfectionism (4).pkl'   responses_100_600.zip
'GNG_perfectionism (5).pkl'   scales


In [None]:
# unzip and load eeg data
# !unzip gdrive/MyDrive/perfectionism_data/responses_100_600.zip

In [None]:
# paths to data
dir_path = "gdrive/MyDrive/perfectionism_data/"
questionnaire_filename = dir_path + "scales/all_scales.csv"

# define dataframe name in a way: TASK_questionnaire
df_name = "GNG_perfectionism"

# check whether pickled data exists
pickled_data_filename = dir_path + df_name + ".pkl"

if os.path.isfile(pickled_data_filename):
    print("Pickled file found. Loading pickled data...")
    epochs_df = pd.read_pickle(pickled_data_filename)
    print("Done")
    pass
else:
    print("Pickled file not found. Loading data...")
    epochs_df = create_df_data(
        dir_path = dir_path,
        info_filename=questionnaire_filename,
        info="all"
    )

    epochs_df.name = df_name
    
    # save loaded data into a pickle file
    epochs_df.to_pickle(dir_path + epochs_df.name + ".pkl")
    print("Done. Pickle file created")

Pickled file found. Loading pickled data...
Done


In [None]:
print(epochs_df.shape)
epochs_df.head()

(138, 11)


Unnamed: 0,id,epoch,Demo_kod,17-Perfect CM-Concern over Mistakes (9 items mean),17-Perfect PS-Personal Standards (7 items mean),17-Perfect PE-Parental Expectations (5 items mean),17-Perfect PC=Parental Criticism (4 items mean),17-Perfect D=Doubts about Actions (4 items mean),17-Perfect O=Organization (6 items mean),17-Perfectionism full scale (mean),17-Perfectionism CMDA
0,AA0303,"<Epochs | 201 events (all good), -0.101562 - ...",AA0303,2.22,2.43,1.8,1.5,2.75,3.83,2.46,4.97
1,AB0601,"<Epochs | 221 events (all good), -0.101562 - ...",AB0601,1.78,2.14,1.8,1.75,2.0,3.33,2.14,3.78
2,AB0612,"<Epochs | 253 events (all good), -0.101562 - ...",AB0612,2.56,1.86,1.4,2.25,2.5,4.0,2.46,5.06
3,AC2011,"<Epochs | 173 events (all good), -0.101562 - ...",AC2011,1.67,4.57,1.2,1.75,3.75,5.0,3.0,5.42
4,AD1308,"<Epochs | 202 events (all good), -0.101562 - ...",AD1308,4.22,4.86,3.4,2.5,4.5,3.67,3.97,8.72


---
## Prepare data

#### 1. Check for Nans

In [None]:
epochs_df.isnull().sum()

id                                                    0
epoch                                                 0
Demo_kod                                              1
17-Perfect CM-Concern over Mistakes (9 items mean)    1
17-Perfect PS-Personal Standards (7 items mean)       1
17-Perfect PE-Parental Expectations (5 items mean)    1
17-Perfect PC=Parental Criticism (4 items mean)       1
17-Perfect D=Doubts about Actions (4 items mean)      1
17-Perfect O=Organization (6 items mean)              1
17-Perfectionism full scale (mean)                    1
17-Perfectionism CMDA                                 1
dtype: int64

In [None]:
# replace Nan values with the mean of given column
epochs_df.fillna(epochs_df.mean(), inplace=True)
epochs_df.isnull().sum()

id                                                    0
epoch                                                 0
Demo_kod                                              1
17-Perfect CM-Concern over Mistakes (9 items mean)    0
17-Perfect PS-Personal Standards (7 items mean)       0
17-Perfect PE-Parental Expectations (5 items mean)    0
17-Perfect PC=Parental Criticism (4 items mean)       0
17-Perfect D=Doubts about Actions (4 items mean)      0
17-Perfect O=Organization (6 items mean)              0
17-Perfectionism full scale (mean)                    0
17-Perfectionism CMDA                                 0
dtype: int64

In [None]:
# create deep copy for multiple operations on epochs_df
# work on COPY (!)
epochs_df_copy = pd.DataFrame(copy.deepcopy(epochs_df.to_dict()))

#### 2. Create X and y sets
Adopt following pipeline:
* Select one channel (e.g. Fz)
* Select time-window 0 to 100ms (usual time-window for ERN)
* Average signal in selected window.

In [None]:
y = epochs_df_copy["17-Perfectionism CMDA"].to_numpy().ravel()

In [None]:
# extract MNE epochs from DF to list
epochs_list = epochs_df_copy.epoch.to_list()

In [None]:
# transform MNE epochs into MNE evoked
evoked_list = [epoch["error_response"].average() for epoch in epochs_list] 

In [None]:
# pick one channel

# your code goes here

In [None]:
# crop time window

# your code goes here

In [None]:
# select data from MNE Evoked with get_data() method

# your code goes here

In [None]:
# average signal in the selected time-window

# your code goes here

#### 3. Train-test split

In [None]:
# train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, 
    y, 
    test_size=test_size, 
    random_state=random_state
)

---
## Create models with `Pipelines`

Test:
- Linear Regression;
- KNN;
- SVR with both linear and rbf kernels.
- For model evaluation use method `evaluate_model()`. Example of `evaluate_model()` usage is in *Classes_04_nonlinear_regression_pipelines.ipynb* notebook.

Do not forget scale your data befor training. Use `Pipeline` to chain `Scaler` and `Regressor`.




**You can play with the SVR and test various C and epsilon values :)**

Enjoy! 🔥

In [None]:
def evaluate_model(pipe, X_train, y_train, X_test, y_test):
  '''
  Takes Pipeline model, train and test data. 
  Fit model and evaluate it on test data.
  '''

  # fit model
  pipe.fit(X_train, y_train)

  # predict test data
  y_pred = pipe.predict(X_test)

  # predict train data
  y_train_pred = pipe.predict(X_train)

  # The coefficient of determination: 1 is perfect prediction
  print(f"Coefficient of determination: {r2_score(y_test, y_pred )}")

  return y_pred, y_train_pred

In [None]:
# your code goes here