**Motor imagery-based EEG signal processing**<br/>
Presented by: Reza Saadatyar  2022-2023 <br/>
E-mail: Reza.Saadatyar92@gmail.com  <br/>
[Link](https://github.com/RezaSaadatyar/Motor-imagery-based-EEG-signal-processing)


================================== Importing the required Libraries ===========================

In [3]:
import os
import sys
import numpy as np
import pandas as pd
from scipy import io, signal
import matplotlib.pyplot as plt
from sklearn import preprocessing

========================================= Functions ==================================

In [16]:
# =============================================== Preparing data =========================================================
def Prepare_data(data, channels=None):  
    # ?------------------------------------------- Check type data -------------------------------------------      
    if 'DataFrame' not in str(type(data)):      
        data = pd.DataFrame(data)
    # !--------------------------------------- Check dimensional data ----------------------------------------
    if data.ndim < 3:
        if data.shape[0] < data.shape[1]:
            data = data.T
    elif data.ndim > 3:
        print("Please configure the data as two-dimensional (Sample * Channels)")
        sys.exit()
    # *--------------------------------------------- Set channels --------------------------------------------
    if channels is not None:
        data.columns=channels 
    else:
        data.columns = np.arange(1,data.shape[1]+1)
       
    return data, list(data.columns )

# =================================================== Labels =============================================================
def labels(Labels):
    
    labels = preprocessing.LabelEncoder()       # Encode target labels with value between 0 and n_classes-1
    Labels = labels.fit_transform(Labels)
    
    return Labels

# ================================================= Plot data ============================================================
def plot_data(data, Fs=None, channels=None, first_point=0, last_point=100, normalize_data='', val_ylim='', size_fig=(7,5),title=''):
    # ?--------------------------- Check type, dimensional data & set channels -------------------------------
    if data.ndim < 3:
        if data.shape[0] < data.shape[1]:
            data = data.T
    elif data.ndim > 3:
        print("Please configure the data as two-dimensional (Sample * Channels)")
        sys.exit()
        
    if 'DataFrame' not in str(type(data)):     
        data = pd.DataFrame(data)
        
    if channels is not None:
        data.columns=channels 
    else:
        data.columns = np.arange(1,data.shape[1]+1)
    # *------------------------------------------ Normalize data ---------------------------------------------
    if normalize_data == 'on':
        data = (data-np.min(data.values, axis=0))/(np.max(data.values, axis=0)-np.min(data.values, axis=0))
    # !-------------------------------------------- Set figure -----------------------------------------------   
    _, axs = plt.subplots(nrows=1,sharey='row', figsize=size_fig)

    data = data.iloc[first_point:last_point,:]
    if Fs is not None and np.array(Fs) > 0:
        time = (np.linspace(start=first_point/Fs, stop=last_point/Fs, num=len(data))).flatten()
        axs.plot(time, data + val_ylim*np.arange(data.shape[1],0,-1))
        axs.set_xlabel('Time (sec)', fontsize=10)
    else:
        axs.plot(data + val_ylim*np.arange(data.shape[1],0,-1))
        axs.set_xlabel('sample', fontsize=10)
        
    axs.set_title(f"chnannels: {len(channels)}#; {title}", fontsize=10)
    axs.set_yticks((val_ylim*np.arange(data.shape[1],0,-1)))
    axs.set_yticklabels(data.columns)
    axs.tick_params(axis='x', labelsize=8)
    axs.tick_params(axis='y', labelsize=8)
    axs.set_ylabel('Channels', fontsize=10)
    axs.autoscale(enable=True, axis="x",tight=True)
    axs.tick_params(axis='y', color='k', labelcolor='k')
    axs.set_ylim([np.min(val_ylim*np.arange(data.shape[1],0,-1))+ np.min([np.min(data.iloc[:,data.shape[1]-1]), np.min(data.iloc[:,data.shape[1]-2])]), 
                  np.max(val_ylim*np.arange(data.shape[1],0,-1))+np.max([np.max(data.iloc[:,0]), np.max(data.iloc[:,1])])])
    
# =========================================== Channel locations ==========================================================     
def channel_locations(x_pos, y_pos, channels, color='#D6D6D6'):
    
    _, axs = plt.subplots(nrows=1, sharey='row', figsize=(2.7, 2.7), facecolor=color)
    circ = np.linspace(start=0, stop=2*np.pi, num=200).flatten()
    
    if channels is None:
        channels = np.arange(1,len(x_pos)+1)
    # ?------------------------------------------- Check type data -------------------------------
    if 'Series' not in str(type(x_pos)):      # Check type Th
        x_pos = pd.Series(x_pos)
    if 'Series' not in str(type(y_pos)):      # Check type Rd
        y_pos = pd.Series(y_pos)
    # *------------------------------------------ Radius < 2 -------------------------------------
    if x_pos.max() < 2 and y_pos.max() < 2:
        rx = (max(x_pos)+0.05)*np.sin(circ)
        ry = (max(x_pos)+0.05)*np.cos(circ)
        EarX = np.array([.497-.005, .510, .518, .5299, .5419, .54, .547, .532, .510, .489-.005])+max(rx)/2-0.02
        EarY = np.array([0.04+.0555, 0.04+.0775, 0.04+.0783, 0.04+.0746, 0.02+.0555, -.0055, -.0932, -.1313, -.1384, -.1199])+0.04
        
        axs.plot(rx, ry, 'k')
        axs.plot(x_pos, y_pos, '.')
        axs.plot(EarX, EarY, 'k')    # plot right ear
        axs.plot(-EarX, EarY, 'k')   # plot left ear
        axs.plot(np.array([0.08, 0.01, 0, -0.01, -0.08]),
            np.array([0.4954, 0.57, 0.575, 0.57, 0.4954])+max(rx)/2-0.02, 'k')  # plot nose
        for ind, val in enumerate(channels):
            axs.text(x_pos[ind], y_pos[ind]+0.05, val, fontsize=5.5,
                 horizontalalignment='center', verticalalignment='center')
    # *-------------------------------------- Convert degree to radian ---------------------------    
    elif x_pos.max() > 2 or y_pos.max() > 2:
        if x_pos.max() > 20:
            x_pos = (np.pi/180)*x_pos
        elif y_pos.max() > 20:
            y_pos = (np.pi/180)*y_pos
            
        sq = 0.5/max(min(1.0, max(y_pos)*1.02), 0.5)
        x = y_pos * np.cos(x_pos) * sq
        y = y_pos * np.sin(x_pos) * sq
        EarX = np.dot([.497-.005, .510, .518, .5299, .5419, .54, .547, .532, .510, .489-.005], 1.13)
        EarY = np.dot([0.04+.0555, 0.04+.0775, 0.04+.0783, 0.04+.0746,
                0.02+.0555, -.0055, -.0932, -.1313, -.1384, -.1199], 1.13) 
    # !------------------------------------------- Set figure -------------------------------------
    axs.set_axis_off()
    axs.set_title('Channel locations',  fontsize=8.5)
    axs.set_aspect('equal', adjustable='box')
    plt.tight_layout()
    plt.subplots_adjust(wspace=0.0, hspace=0.0)
    
# ================================================ Filtering =============================================================  
def filtering(data, f_low, f_high, order, fs, type_filter):
    f_low = f_low / (fs / 2)
    f_high = f_high / (fs / 2)
    if type_filter == "low":
        b, a = signal.butter(order, f_low, btype='low')
    elif type_filter == "high":
        b, a = signal.butter(order, f_high, btype='high')
    elif type_filter == "bandpass":
        b, a = signal.butter(order, [f_low, f_high], btype='bandpass')
    elif type_filter == "bandstop":
        b, a = signal.butter(order, [f_low, f_high], btype='bandstop')
        
    data_filter = pd.DataFrame(signal.filtfilt(b, a, data))
    data_filter.columns = data.columns
    return data_filter

# ================================================ CAR Filter =============================================================  
def car_filter(data):
    # !--------------------------------------- Check dimensional data ----------------------------------------
    if data.ndim < 3:
        if data.shape[0] < data.shape[1]:
            data = data.T
    # *---------------------------------- Calculating the Average Signal -------------------------------------
    data_car = np.zeros((data.shape))
    mean = np.mean(data, axis=1)                                                
    #  ---------------------------------- Subtracting the Average Signal -------------------------------------
    for ind in range(data.shape[1]):                     
        data_car[:, ind] = data.iloc[:, ind] - mean
    
    data_car = pd.DataFrame(data_car)
    data_car.columns = data.columns
    return data_car

# ============================================= Laplacian Filter ========================================================== 
def laplacian(data, x_pos, y_pos, type_laplacian='low', display_figure='on'):

    if data.ndim < 3:
        if data.shape[0] < data.shape[1]:
            data = data.T
    elif data.ndim > 3:
        print("Please configure the data as two-dimensional (Sample * Channels)")
        sys.exit()
        
    if 'DataFrame' not in str(type(data)):     
        data = pd.DataFrame(data)
    
    if x_pos.max() > 10:
        x_pos = (np.pi/180)*x_pos
    elif y_pos.max() > 10:
        y_pos = (np.pi/180)*y_pos
    
    xy = np.array([x_pos, y_pos])                           # Electrodes Position
    data_laplacian = np.zeros(np.shape(data))
    for i in range(data.shape[1]):
        dis = np.zeros(data.shape[1])
        for j in range(data.shape[1]):                       
            dis[j] = np.linalg.norm(xy[:, i] - xy[:, j])      # Euclidean distance
            
        ind = np.argsort(dis)                                 # Sort distance
        if type_laplacian == 'low' and data.shape[1] > 9: 
            ind = ind[1:9]
        elif type_laplacian == 'high' and data.shape[1] > 21:  
            ind = ind[9:21]
        else:
            print('\033[91m'+"Error ---> The data should have more than 9 dimensions for low Laplacian and more than 21 dimensions for high Laplacian"+'\033[91m')
            break
      
        neighbors_electrode = xy[:, ind]
        neighbors_dis = dis[ind]
        ind_select = ind
        dis_x_axis = np.zeros(len(ind))
        dis_y_axis = np.zeros(len(ind))
        # ------------------------------------ X and y axis -------------------------------------
        for j in range(len(ind)):                            # Euclidean distance for x & y axis
            dis_x_axis[j] = np.linalg.norm(xy[0, i] - neighbors_electrode[0, j])
            dis_y_axis[j] = np.linalg.norm(xy[1, i] - neighbors_electrode[1, j])
        ind_x = np.argsort(dis_x_axis)
        ind_y = np.argsort(dis_y_axis)
        ind_x = ind_x[0:2] 
        ind_y = ind_y[0:2] 
        # --------------------------------------------------------------------------------------- 
        ind = np.concatenate((ind_x, ind_y), axis=0)
        neighbors_select = neighbors_electrode[:, ind]
        W = (1 / neighbors_dis[ind]) / sum(1 / neighbors_dis[ind])
        ind = ind_select[ind]
        dis = np.zeros((np.shape(data)[0], 4))
        for j in range(4):
            dis[:, j] = data.iloc[:, ind[j]] * W[j]
        data_laplacian[:, i] = data.iloc[:, i] - np.sum(dis, axis=1) 
        # --------------------------------------- Plot -------------------------------------------
        if display_figure == 'on' and i == data_laplacian.shape[1]:
            _, axs = plt.subplots(nrows=1, sharey='row', figsize=(2.7, 2.7), facecolor='#cdd3ce')
            axs.plot(xy[0, :], xy[1, :], 'o', color='#0804f5', markerfacecolor='#0804f5', markersize=6)
            axs.plot(xy[0, i], xy[1, i], 'o', color='#035310', markerfacecolor='#035310', markersize=6)
            axs.plot(neighbors_select[0, :], neighbors_select[1, :], 'o', color='#ee06cf', markerfacecolor='#ee06cf', markersize=6)
            axs.set_axis_off()
            axs.set_title('Channel locations',  fontsize=8.5)
            axs.set_aspect('equal', adjustable='box')
            plt.tight_layout()
            plt.subplots_adjust(wspace=0.0, hspace=0.0)
    
    data_laplacian  = pd.DataFrame(data_laplacian)
    data_laplacian.columns = data.columns
    return data_laplacian 

*1. [Dataset](https://www.bbci.de/competition/iv/desc_1.html)*<br/>
*Parameters of data:*
  - *cnt:* the continuous EEG signals, size [time x channels]. The array is stored in datatype INT16. To convert it to uV values, use cnt= 0.1*double(cnt); in Matlab.
  - *mrk:* structure of target cue information with fields (the file of evaluation data does not contain this variable).
    - *pos:* vector of positions of the cue in the EEG signals given in unit sample, length #cues.
    - *y:* vector of target classes (-1 for class one or 1 for class two), length #cues.
  - *nfo:* structure providing additional information with fields
    - *fs:* sampling rate
    - *clab:* cell array of channel labels
    - *classes:* cell array of the names of the motor imagery classes
    - *xpos:* x-position of electrodes in a 2d-projection
    - *ypos:* y-position of electrodes in a 2d-projection

*Step 1: Importing pathway files*

In [5]:
folder_name = "Data" 

dir = os.path.dirname(os.path.abspath(os.getcwd()))    # Get the path to the current script's directory
data_folder = os.path.join(dir, folder_name)           # Construct the path to the data folder
files = os.listdir(data_folder)                        # Get Files within data folder
for ind, val in enumerate(files):
    print(f"ind: {ind} ---> data name: {val} ")

ind: 0 ---> data name: BCICIV_calib_ds1b_100Hz.mat 
ind: 1 ---> data name: BCICIV_calib_ds1c_100Hz.mat 
ind: 2 ---> data name: BCICIV_calib_ds1d_100Hz.mat 
ind: 3 ---> data name: BCICIV_calib_ds1e_100Hz.mat 
ind: 4 ---> data name: BCICIV_calib_ds1f_100Hz.mat 
ind: 5 ---> data name: BCICIV_calib_ds1g_100Hz.mat 


In [6]:
ind = 0
data_set = io.loadmat(os.path.join(data_folder, files[ind]))  # Construct the path to the data file within the data folder and then load data  
print(f"Variables = {list(data_set.keys())[3:]}")                

Variables = ['cnt', 'mrk', 'nfo']


*Step 2.1:  Importing & preparing the raw data*

In [9]:
data = np.double(data_set['cnt']) * 0.1

Mrk = data_set['mrk']                   # Time of trials
Time_Trial = np.concatenate(*Mrk['pos'].flatten())
Labels = np.concatenate(*Mrk['y'].flatten())

Nfo = data_set['nfo']
Fs = Nfo['fs']
channels = np.concatenate(np.array(Nfo['clab'].tolist()).flatten())
Name_class = Nfo['classes']
x_pos = np.concatenate(*Nfo['xpos'].flatten())
y_pos = np.concatenate(*Nfo['ypos'].flatten())

In [10]:
data, channels = Prepare_data(data, channels)
Labels =labels(Labels)                          # Encode target labels with value between 0 and n_classes-1

*Step 2.3: Plot data and channels*

In [None]:
plot_data(data, Fs, channels, first_point=0, last_point=600, normalize_data='off',val_ylim=150,
        size_fig=(7,9),title='Raw signal')
channel_locations(x_pos, y_pos, channels, color='#D6D6D6')

*Step 2.4: Seperating Trials*

*Step 3: Filtering*
 - *Band pass filtering to get beta and mu band information*

In [17]:
data_filter = filtering(data, f_low=8, f_high=30, order=3, fs=100, type_filter='bandpass') # btype:'low','high','bandpass','bandstop'

*Step 4: Spatial filters:*<br/>
[Article](https://www.sciencedirect.com/science/article/abs/pii/S0030402613012473)


*Step 4.1: Common Average Reference (CAR):*<br/>
CAR is a widely used technique in EEG signal processing to mitigate the effects of common noise across multiple electrode channels. It aims to improve the quality of EEG data by removing or reducing common sources of noise, such as the placement of electrodes, cable movement, or external electromagnetic interference or muscle activity. CAR $\rightarrow$ *Improving the signal-to-noise ratio of the EEG data.*<br/> $x_{i}(t)=x_{i}(t)-{1 \over C}\sum_{j=1}x_{j}(t); j=1,...,C$<br/>

*Process:*<br/>
 - *Step 1: Calculating the Average Signal* $\rightarrow$ Calculate the average signal across all electrodes at each time point. This average represents the common noise present in the recording.
 - *Step 2: Subtracting the Average Signal:*  $\rightarrow$ Subtract the calculated average signal from the individual electrode signals. This helps remove the common noise component.


In [None]:
data_car = car_filter(data_filter)
plot_data(data_car, Fs, channels, first_point=0, last_point=600, normalize_data='off',val_ylim=150, 
        size_fig=(7,9), title='CAR filter')

*Step 4.2: Laplacian:*<br/>
[Article](https://www.sciencedirect.com/science/article/abs/pii/S0030402613012473)<br/>
It  is a spatial filtering technique used in EEG signal processing. It aims to enhance spatial resolution by highlighting local changes in EEG signal activity while attenuating more widespread activity. This technique can help improve the detection of localized brain events and reduce the effects of distant sources and common noise. The Laplacian $x_{i}$ for electrode i is calculated as follows:<br/>
$x_{i}(t)=x_{i}(t) - \sum_{j\epsilon N_{i}}w_{ij}{x_{j}}; w_{ij}={1/d_{ij} \over \sum_{j\epsilon N_{i}} 1/d_{ij}}$<br/>
$x_{i}(t)$ is the potential of the electrode i compare to the reference electrode, $ω_{ij}$ is the constant weight, $d_{ij}$ is the Euclidean distance from electrode i to electrode j. $N_{i}$ is the set of neighborhood electrodes of center electrode i.

In [None]:
type_laplacian='low'
data_laplacian = laplacian(data_filter, x_pos, y_pos, type_laplacian, display_figure='on')
plot_data(data_laplacian, Fs, channels, first_point=0, last_point=600, normalize_data='off',val_ylim=150,
        size_fig=(7,9),title= f"Laplacian_{type_laplacian}")

*Step 4.3: Common Spatial Pattern (CSP) analysis <br/>
[Article](https://www.sciencedirect.com/science/article/abs/pii/S0030402613012473)<br/>
It's particularly employed for feature extraction and classification in scenarios where EEG signals are recorded from multiple electrodes and need to be processed for tasks such as motor imagery classification, emotion recognition, and more. The primary goal of CSP is to find spatial filters that maximize the difference in variance between two classes of EEG signals while minimizing the variance within each class. This helps to enhance the separability of the classes and extract relevant discriminative features for subsequent classification.
