In [1]:
!pip install mne

Collecting mne
  Downloading mne-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading mne-1.8.0-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m28.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mne
Successfully installed mne-1.8.0


In [2]:
# In google collab we need to mount google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [32]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import gridspec

# mne library to analyse EEG
import mne
from mne import Epochs, pick_types
from mne.channels import make_standard_montage
from mne.datasets import eegbci
from mne.decoding import CSP
from mne.io import concatenate_raws, read_raw_edf
from mne.decoding import Vectorizer
mne.set_log_level('error') # Avoid long log


from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score, train_test_split, GridSearchCV, StratifiedKFold, cross_val_predict
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support

from scipy import stats

# Models
from sklearn import svm
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression

In [4]:
# Create a list with each tipe of experimental run
openeye_runs = [1]
closedeye_runs = [2]
fists_runs = [3, 7, 11]
imaginefists_runs = [4, 8, 12]
fistsfeet_runs = [5, 9, 13]
imaginefistsfeet_run = [6, 10, 14]

# List with the ID of each participant
participants = [_ for _ in range(1,110)]


# Defining the EEG standard EEG bands. These are indicative situation in which we see appearing this type of waves on healty subjects.
delta_waves = {
    'freq_min': 0.5,
    'f_max': 4
} # normally occur during deep sleep

theta_waves = {
    'freq_min': 4,
    'freq_max': 8
} # transiently during sleep

alpha_waves = {
    'freq_min': 8,
    'freq_max': 13
} # relaxed but awake state, resting with the eyes closed

beta_waves = {
    'freq_min': 13,
    'freq_max': 30
} # attention to tasks or stimuli,logical thinking

gamma_waves = {
    'freq_min': 30,
    'freq_max': 70
} # large-scale brain network activity and cognitive phenomena such as working memory, attention



In [5]:
# Charging the data


# Get the path to the data
def file_path(participant, run):
    return f'files/S{participant:03}/S{participant:03}R{run:02}.edf'

# In google_collab
def file_path_collab(participant, run):
  return f'/content/drive/MyDrive/Colab Notebooks/Project-TFE/files/S{participant:03}/S{participant:03}R{run:02}.edf'

# Load the data
# Preload = True charges also the data, not just the headers
# raw = concatenate_raws([read_raw_edf(file_path(participant, run), preload = True) for run in fists_runs])

In [7]:
# We eliminate participants  88, 92, 100 because the experiments have been done using differents timings
participants = [i for i in range(1,110) if i not in [88, 92, 100]]

# Read the data
raws = [read_raw_edf(file_path_collab(participant, run)) for participant in range(1,10) for run in fists_runs]

In [8]:
# We choose the "Standard_1020" montage

montage = mne.channels.make_standard_montage("standard_1020")


# Here we change the names of the electrode to match the standard notation and set the choosen montage on the raw data charged.

# Dictionary with the structure old_name : correct_cases_name. To respect the upper and lower cases of the standard notation for the electrde's position.
replacement = {
    'Fc': 'FC',
    'Cp': 'CP',
    'Af': 'AF',
    'Ft': 'FT',
    'Tp': 'TP',
    'Po': 'PO'
}

# new_name is the dictionary to use to cange the name of the electrode's positions to respect the usual sandard notataions.
# First get rid of the excessive "."
new_names = {
    name : name.replace(".", "") for name in raws[0].info['ch_names']
}

# Change the lower and upper case of the electrode's names
for key in new_names.keys():
    for old_string, new_string in replacement.items():
        new_names[key] = new_names[key].replace(old_string, new_string)

# Choose the montage and set it for the uploaded data
# montage = 'standard_1020'
for raw in raws:
    raw.rename_channels(new_names)
    raw.set_montage(montage)



In [9]:
low_cut = 0.1 # We filter the low frequency to remove slow drift
high_cut = 30 # We filter the high frequency to eliminate noise, and because the motor signals appears mostly as alpha and beta waves

# copy the raw data and apply the filter
raws_filt = [raw.load_data().copy().filter(low_cut, high_cut) for raw in raws]

In [10]:
tmin =  -1.  # start of each epoch (in sec)
tmax =  4.1  # end of each epoch (in sec)
baseline = (-1, 0) # for the baseline correction we choose the interval that reflect the resting state before the event

# Making it easyer to read the events
event_mapping = {
    1: 'rest',
    2: 'left_fist',
    3: 'right_fist'
}
event_id = {v:k for k,v in event_mapping.items()}

epochs = [
    Epochs(raw_filt, mne.events_from_annotations(raw_filt)[0], event_id, tmin=tmin, tmax=tmax, baseline= baseline, preload= True)
    for raw_filt in raws_filt
]

In [11]:
# Get the data and labels
raws_epochs = [epoch.get_data() for epoch in epochs]
labels = [epoch.events[:,-1] for epoch in epochs]

In [12]:
# Check the dimensions of all the data, we want every epoch to have the same dimensions, and the labels to be
def lenght_check(raws_epochs, labels):
  if len(raws_epochs) != len(labels):
    raise ValueError(f"The lenght of the raws_epochs and labes are {len(raws_epochs)} and {len(labels)} respectively. They should be equal")
  else:
    print('The lenght of the raws_epochs and labes are equal')

def dimensios_check(raws_epochs):
  for i in range(len(raws_epochs)):
    if raws_epochs[i].shape[-2:] != raws_epochs[0].shape[-2:]:
      raise ValueError(f"There are epochs that do not have the same number of captors or time points.")
      break

lenght_check(raws_epochs, labels)
dimensios_check(raws_epochs)

The lenght of the raws_epochs and labes are equal


In [13]:
# We concatenate the epochs data and the labels to obtain only two numpy arrays
X = np.concatenate(raws_epochs, axis = 0)
y = np.concatenate(labels)

In [14]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [None]:
def applyCrossValidation(models, model_names, data, labels, kfold):
  results = []
  for i in range (len(models)):
    #print(model_names[i])
    cv_accuracy = cross_val_score(models[i], data, labels, cv=kfold)
    results.append(cv_accuracy)
  return results

In [15]:
clf_lda_pip = make_pipeline(Vectorizer(), StandardScaler(), LinearDiscriminantAnalysis(solver='svd'))


In [22]:
scores = cross_val_score(clf_lda_pip, X, y, cv=5)

In [23]:
scores

array([0.60784314, 0.68627451, 0.66013072, 0.60130719, 0.62745098])

In [26]:
svm_pip = make_pipeline(Vectorizer(), StandardScaler(), svm.SVC(random_state=42))
parameters = {'svc__kernel':['linear', 'rbf', 'sigmoid'], 'svc__C':[0.1, 1, 10]}
gs_svm = GridSearchCV(svm_pip, parameters, scoring='accuracy', cv=StratifiedKFold(n_splits=5), return_train_score=True)

In [36]:
gs_svm.fit(X_train, y_train)

In [37]:
gs_svm.best_params_

{'svc__C': 0.1, 'svc__kernel': 'linear'}

In [38]:
gs_svm.best_score_

0.6373831775700933

In [39]:
y_pred = gs_svm.predict(X_test)
y_pred

array([1, 1, 1, 3, 1, 3, 1, 3, 1, 1, 3, 1, 1, 3, 3, 3, 3, 1, 3, 1, 1, 2,
       1, 1, 1, 1, 1, 1, 1, 1, 3, 1, 3, 2, 3, 2, 1, 1, 3, 2, 2, 2, 2, 3,
       1, 2, 1, 1, 1, 1, 2, 3, 2, 3, 1, 3, 2, 1, 3, 2, 1, 1, 1, 1, 1, 3,
       3, 1, 1, 1, 1, 1, 1, 1, 1, 3, 1, 3, 2, 1, 2, 1, 1, 3, 3, 1, 1, 3,
       1, 3, 1, 3, 2, 1, 2, 1, 1, 1, 3, 1, 1, 1, 2, 2, 1, 1, 1, 3, 2, 3,
       2, 1, 1, 1, 1, 2, 1, 1, 3, 1, 1, 1, 1, 2, 1, 3, 2, 1, 3, 3, 1, 1,
       1, 2, 2, 2, 2, 1, 1, 1, 2, 2, 3, 2, 2, 1, 1, 3, 1, 1, 3, 1, 1, 1,
       1, 1, 3, 3, 1, 1, 1, 3, 3, 3, 1, 2, 2, 2, 1, 2, 1, 3, 1, 1, 3, 1,
       1, 2, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 2, 1, 2, 2, 1, 1, 2, 2,
       1, 3, 1, 3, 1, 1, 2, 3, 3, 1, 1, 1, 3, 2, 1, 1, 3, 1, 3, 2, 1, 1,
       1, 2, 2, 1, 3, 3, 1, 1, 1, 3])

In [52]:
sum(y_pred == y_test)/len(y_test)

0.6826086956521739

In [53]:
report_svm = classification_report(y_test, y_pred, target_names=['rest', 'left', 'right'])
print('SVM Clasification Report:\n {}'.format(report_svm))

SVM Clasification Report:
               precision    recall  f1-score   support

        rest       0.73      0.77      0.75       117
        left       0.64      0.60      0.62        57
       right       0.61      0.59      0.60        56

    accuracy                           0.68       230
   macro avg       0.66      0.65      0.66       230
weighted avg       0.68      0.68      0.68       230



In [46]:
lr_pip = make_pipeline(Vectorizer(), StandardScaler(), LogisticRegression(random_state=42))
lr_pip.named_steps

{'vectorizer': <mne.decoding.transformer.Vectorizer at 0x7ab0df75f340>,
 'standardscaler': StandardScaler(),
 'logisticregression': LogisticRegression(random_state=42)}

In [49]:
parameters = {'logisticregression__solver':['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga'], 'logisticregression__penalty': ['l2', 'l1', 'elasticnet']}
gs_lr = GridSearchCV(lr_pip, parameters, scoring='accuracy', cv=StratifiedKFold(n_splits=5), return_train_score=True)

In [50]:
gs_lr.fit(X_train, y_train)

40 fits failed out of a total of 75.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
5 fits failed with the following error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/sklearn/model_selection/_validation.py", line 888, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 1473, in wrapper
    return fit_method(estimator, *args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/sklearn/pipeline.py", line 473, in fit
    self._final_estimator.fit(Xt, y, **last_step_params["fit"])
  File "/usr/local/lib/python3.10/dist-packages/sklearn/base.py", line 1473, in wrapper
    return fit_method(estimator, *args, **k

In [51]:
print('The best parameters are: ',gs_lr.best_params_)
print('The best score is: ',gs_lr.best_score_)

The best parameters are:  {'logisticregression__penalty': 'l1', 'logisticregression__solver': 'liblinear'}
The best score is:  0.6971962616822429


In [54]:
y_pred_lr = gs_lr.predict(X_test)
print('percentage of correct classifications: ', sum(y_pred_lr == y_test)/len(y_test))

report_lr = classification_report(y_test, y_pred_lr, target_names=['rest', 'left', 'right'])
print('SVM Clasification Report:\n {}'.format(report_lr))


percentage of correct classifications:  0.7347826086956522
SVM Clasification Report:
               precision    recall  f1-score   support

        rest       0.81      0.78      0.79       117
        left       0.72      0.72      0.72        57
       right       0.62      0.66      0.64        56

    accuracy                           0.73       230
   macro avg       0.71      0.72      0.72       230
weighted avg       0.74      0.73      0.74       230



In [57]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
