# Read Signal, Signal Processing


1. Install dependencies and packages

In [None]:
%%capture
%pip install mne numpy matplotlib pandas

2. Import dependencies and packages

In [None]:
from glob import glob #helps to read the file from the folder
import os 
import mne #used to analyze EEG dataset for python
import numpy as np
import pandas
import matplotlib.pyplot as plt

from google.colab import drive # connecting to google drive
drive.mount('/content/drive/')

Mounted at /content/drive/


3. Load the files from directory

In [None]:
# set the path to the directory containing the EDF files
dir_path = '/content/drive/MyDrive/lab/project/HS/Practice/EEG_classification/EEG_healthy_schizophrenia'

In [None]:
# get all file paths in the directory
all_file_path = glob(os.path.join(dir_path, '*.edf'))

In [None]:
# split into healthy and schizophrenia
healthy_file_path = [i for i in all_file_path if 'h' in os.path.basename(i).split('/')[0]]
schizo_file_path = [i for i in all_file_path if 's' in os.path.basename(i).split('/')[0]]

In [None]:
print(len(all_file_path)) # print the total number of files found
print(len(healthy_file_path)) # print the number of healthy files
print(len(schizo_file_path)) # print the number of schizophrenia files

28
14
14


In [None]:
all_file_path[0]

'/content/drive/MyDrive/lab/project/HS/Practice/EEG_classification/EEG_healthy_schizophrenia/h13.edf'

In [None]:
# all_file_path = glob('/content/drive/MyDrive/lab/project/HS/Practice/EEG_classification/EEG_healthy_schizophrenia')
#split into healthy and schizophrenia
#healthy_file_path = [ i for i in all_file_path if 'h' in i.split('/')[1]] #get all variables from all_file_path and store them in list, split if h is in filename
#schizo_file_path = [ i for i in all_file_path if 's' in i.split('/')[1]]
#print(len(healthy_file_path))
#print(len(schizo_file_path))
     #data shape check 10 columns ,1000 timestamp, 100Hz --> each data has t = 10 s , 10 epochs* 10 length of signal

2. Define a function which can read the data

In [None]:
def read_data(file_path):
    data=mne.io.read_raw_edf(file_path, preload=True)
    data.set_eeg_reference() #average of every channel is reference of each channels
    data.filter(l_freq = 0.5, h_freq=45)   #filtering between 0.5 Hz and 45 Hz, continuous data -> have to convert it into segments
    epochs=mne.make_fixed_length_epochs(data, duration = 5, overlap = 1)
    data = epochs.get_data()
    return data  #returns with data because in the read_data function the variable 'data' is converted into array using get_data() function

3. Reading a sample data and checking the dimensions of the sample

In [None]:
%%capture
sample_data = read_data(healthy_file_path[0])

In [None]:
sample_data.shape  # no. epochs, channels, length of the signal

(241, 19, 1250)

4. Reading all the files (%%capture does not allow to print the output)

In [None]:
%%capture  
healthy_epochs_array = [read_data(i) for i in healthy_file_path]
schizo_epochs_array = [read_data(i) for i in schizo_file_path]

In [None]:
healthy_epochs_array[0].shape

(241, 19, 1250)

In [None]:
schizo_epochs_array[0].shape

(211, 19, 1250)

5. Creating labels (each epoch is labelled as 0 or 1 whether it belongs to healthy or schizophren person)

In [None]:
healthy_epochs_array[0].shape, schizo_epochs_array[0].shape

((241, 19, 1250), (211, 19, 1250))

In [None]:
healthy_epochs_labels = [len(i)*[0] for i in healthy_epochs_array]   #231 *  [0]  (len(i) = no. of epochs for each file. each epoch is labelled as 0 if it belongs to healthy person)
schizo_epochs_labels = [len(i)*[1] for i in schizo_epochs_array]   # epoch is labelled as 1 for schizophren person
len(healthy_epochs_labels), len(schizo_epochs_labels)

(14, 14)

6. Combining the data and the labels 

In [None]:
data_list = healthy_epochs_array+schizo_epochs_array #Combining the files into a list 
label_list = healthy_epochs_labels + schizo_epochs_labels # Combining the labels into a list

7. Creating a group #split data based on the subjects, assign the group to each subject

In [None]:
group_list = [[i]*len(j) for i,j in enumerate(data_list)]  # i=0, j=first value of data_list  -> i = 1, j = data_list[1] 
# [i]*len(j) --> if there are 10 elements on the first place of data_list --> 0 x 10 --> [0000000000]
# if there are 20 elements on second place of data_list --> 1 x 20
len(group_list)

28

In [None]:
# group_list[4]  #all 4, [0] all  0 , [1] all 1 etc.

8. Convert the data from list to array

In [61]:
data_array = np.vstack(data_list)  # vstack()/ row_stack() - vertical stacking/row stacking
label_array = np.hstack(label_list) #hstack - horizontal stacking
group_array = np.hstack(group_list)
print(data_array.shape, label_array.shape, group_array.shape)  #epochs, no. channels, length of signal - (7201, 19, 1250) (7201,) (7201,)

(7201, 19, 1250) (7201,) (7201,)


# EEG feature extraction and Machine Learning classification

1. Extracting the features #feature shape for 1 feature: 7201,19 x 1 , for 2 features: 7201,19 x 2 etc.

In [None]:
np.mean(data_array, axis=-1).shape

(7201, 19)

In [62]:
from scipy import stats
def mean(x):  #calculate arithmetic mean around a specified axis
    return np.mean(x,axis=-1)

def std(x):  #standard deviation - computes the standard deviation - measure of the spread of the data around the mean
    return np.std(x,axis=-1)

def ptp(x): #peak-to-peak amplitude- computes the range of the input array along the specified axis
    return np.ptp(x,axis=-1)

def var(x):  #variance - computes the variance along the specified axis
    return np.var(x,axis=-1)

def minim(x): #minimum value along the specified axis
    return np.min(x,axis=-1)

def maxim(x): #maximum value 
    return np.max(x,axis=-1)

def argminim(x): #index of minimum value
    return np.argmin(x,axis=-1)

def argmaxim(x): #index of maximum value
    return np.argmax(x,axis=-1)

def rms(x): #root mean square of the input array x along its last axis
    return np.sqrt(np.mean(x**2,axis=-1))

def abs_diff_signal(x): # takes an x array as input,  and computes the absolute difference between the elements along x axis, then return with sum of these values
    return np.sum(np.abs(np.diff(x,axis=-1)), axis=-1)

def skewness(x): # measures the assymetry of probability distribution -  whether distribution is symmetric or skewed to one side, perfectly symmentric distribution's skewness is 0, skewed to right positive value, skewed to left negative value
    return stats.skew(x,axis=-1)

def kurtosis(x): #measures the peakedness of a probability distribution - describes how narrow or flat and broad the curve of a distribution is/ how much data is concentrated in the center of the distribution opposed to tails
    return stats.kurtosis(x,axis=-1)

#concatenating features for ML algorithms transforms features into a format that can be easily used as input to the algorithms. e.g. combining features into one feature vector
def concatenate_features(x): 
    return np.concatenate((mean(x),std(x),ptp(x),var(x),minim(x),maxim(x),argminim(x),argmaxim(x),rms(x),abs_diff_signal(x),skewness(x), kurtosis(x)), axis=-1)

2. Creating a feature list for ML model

In [63]:
features = []
for d in data_array:  
    features.append(concatenate_features(d))

In [64]:
features_array = np.array(features)
features_array.shape

(7201, 228)

In [None]:
228/19 # we have 12 features for classification

12.0

3.  Logistic Regression for classification

In [48]:
%pip install --upgrade scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
%pip list

In [65]:
from sklearn.linear_model import LogisticRegression  #relationship between a set of input features and a binary output variable
from sklearn.pipeline import Pipeline # allows to chain multiple ML operations e.g. preprocessing, feature selection, modeling into single object that can be used for training and prediction
from sklearn.preprocessing import StandardScaler #standardizing features by scaling them to have 0 mean and unit variance
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import GroupKFold  #GroupFold - generates cross-validation iterator for grouped data, splits data into folds - so that same group does not appaer in both the training and testing data

In [None]:
# clf=LogisticRegression()  #initializes the logistic regression classifier object
# gkf=GroupKFold(9) #creates a cross-fold validation object with 5 splits, where each group appears only once in the test set accross all folds
# pipe=Pipeline([('scaler', StandardScaler()),('clf', clf)]) #creates a pipeline object, where first step scales the features, and second step is the logistic regression classifier
# param_grid={'clf__C': [0.1, 0.5, 0.7, 1, 3, 5, 7]} #parameter grid for hyperparameter tuning of logistic regression classifier
# gscv =GridSearchCV(pipe, param_grid, cv=gkf, n_jobs=12) #creates a gridsearch object
# gscv.fit(features_array, label_array, groups=group_array)   #performs hyperparameter tuning by fitting the GridSearch object to the features and labels data with the grouping array provided

In [78]:
clf=LogisticRegression(penalty='l1', solver='liblinear', max_iter=500)  #initializes the logistic regression classifier object with Lasso penalty
gkf=GroupKFold(n_splits=5) #creates a cross-fold validation object with 5 splits, where each group appears only once in the test set accross all folds
pipe=Pipeline([('scaler', StandardScaler()),('clf', clf)]) #creates a pipeline object, where first step scales the features, and second step is the logistic regression classifier
param_grid={'clf__C': [0.01, 0.1, 1, 10]} #parameter grid for hyperparameter tuning of logistic regression classifier with Lasso penalty
gscv =GridSearchCV(pipe, param_grid, cv=gkf, n_jobs=-1) #creates a gridsearch object
gscv.fit(features_array, label_array, groups=group_array) #fitting the model with training data



In [79]:
gscv.best_score_

0.6663357075970254