In [1]:
# Libraries 
from pathlib import Path
from scipy import signal
from numpy import fft
import os

import numpy as np
import pandas as pd
from scipy.stats import skew, kurtosis
from scipy import signal
from scipy.signal import filtfilt
from scipy.fft import fft, fftfreq    
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.model_selection import train_test_split
import pywt   
import matplotlib.pyplot as plt

In [2]:
#Load data from folders
fs = 173.61

def load_data(folders):
    data = {}
    
    for folder in folders:
        folder_path = os.path.join('E:\EEG Signal Processing\EEG-Signal-Processing\Dataset', folder)
        files = os.listdir(folder_path)
        matrix = []
    
        for file in files:
            file_path = os.path.join(folder_path, file)
            matrix.append(np.loadtxt(file_path))
        
        data[folder] = np.array(matrix)
    
    return data

folders = ['F', 'N', 'O', 'S', 'Z']
data = load_data(folders)

F = data['F']
N = data['N']
O = data['O']
S = data['S']
Z = data['Z']

In [3]:
def Bandstop(sig):
    # Design stop filter
    fl = 49.9
    fh = 50.1
    order = 3
    ftype = 'bandstop'
    b, a = signal.butter(order, [fl, fh], ftype, fs=fs)
    Sig = signal.filtfilt(b, a, sig)
    return Sig

In [4]:
def BandExt(Sig, fs):

    # bandpass butterworth filter
    Band = np.array([[0.1, 4, 8, 12, 30],
                     [4, 8, 12, 30, 70]])
    sigf = []
    Delta = []
    Theta = []
    Beta = []
    Alpha = []
    Gamma = []
    TBands = []
    
    for k in range(5):
        fl = Band[0][k]
        fh = Band[1][k]
        order = 3
        wn = [fl, fh]
        ftype = 'bandpass'
        b, a = signal.butter(order, wn, ftype, fs=fs)
        # apply designed filter
        sig_f = signal.filtfilt(b, a, Sig)
        sigf.append(sig_f)
    Delta = sigf[0]
    Theta = sigf[1]
    Alpha = sigf[2]
    Beta = sigf[3]
    Gamma = sigf[4]
    
    return sigf

In [5]:
def GetFFT(Sig, fs):
    
    sig_fft = np.fft.fft(Sig)
    sig_fft = np.abs(sig_fft)          

    # Get the signal duration
    
    sig_len = len(Sig)
    fft_freq = np.fft.fftfreq(sig_len,1/fs)

    # Remove the first half

    sig_fft = sig_fft[:sig_len//2]
    fft_freq = fft_freq[:sig_len//2]


    return sig_fft, fft_freq


In [6]:
 Band = np.array([[0.1, 4, 8, 12, 30],
                  [4, 8, 12, 30, 70]])
def GetBfft(Sig, fs, Band):

    band = []
    bands = []
    all_bands = []
    sig, rf = GetFFT(Sig, fs)
    

    for k in range(Band.shape[1]):
        fll = Band[0][k]
        fhh = Band[1][k]
        indx = np.where((rf >= fll) & (rf<fhh))
        band = sig[indx]
        bands.append(band)
   

    return bands

In [7]:
def GetWavelet(Sig, Wavelet, Level):

    # Wavelet Decomposition with the Given Properties
    WaveDec = pywt.wavedec(Sig, Wavelet, level=Level)
    
    return WaveDec

In [8]:
def STFT(Sig, fs, window_size, overlap):
    sig_len = len(Sig)
    window = np.hamming(window_size)
    num_frames = int(np.ceil(sig_len / (window_size - overlap)))
    stft = np.zeros((window_size, num_frames), dtype=complex)
    
    for i in range(num_frames):
        start = i * (window_size - overlap)
        end = min(start + window_size, sig_len)
        frame = Sig[start:end]
        frame *= window[:end-start]
        stft[:end-start, i] = np.fft.fft(frame)
    
    stft = np.abs(stft)
    stft = stft[:window_size//2, :]
    
    freq = np.fft.fftfreq(window_size, 1/fs)[:window_size//2]
    
    return stft, freq


In [9]:
def myfeatureExtraction(Sig):

    Mean   = np.mean(Sig)              
    
    Var = np.var(Sig)
    
    Power= np.mean(np.square(Sig))
    
    std = np.std(Sig)
     # Check if standard deviation is zero
    if std == 0:
        Skew = 0.0
        kurt = 0.0
    else:
        # Compute skewness
        Skew = np.sum(((Sig - Mean) / std) ** 3) / len(Sig)

        # Compute kurtosis
        kurt =  np.sum(((Sig - Mean) / std) ** 4) / len(Sig) - 3

    Ent = np.sum(np.power(Sig, 2) * np.log(np.power(Sig, 2)+0.0001))
    
    return np.array([Mean ,Var, Power, kurt, Skew, Ent]).reshape(1, -1)       

In [10]:
F_df = pd.DataFrame(F)
N_df = pd.DataFrame(N)
O_df = pd.DataFrame(O)
S_df = pd.DataFrame(S)
Z_df = pd.DataFrame(Z)

Data = pd.concat([F_df, N_df, O_df, S_df, Z_df], axis=0, ignore_index=True).to_numpy()

# Create a dictionary to store the feature vectors for each subdata
features_dict = {'F': [], 'N': [], 'O': [], 'S': [], 'Z': []}

# Create a dictionary to store the extracted bands for each subdata
bands_dict = {'F': [], 'N': [], 'O': [], 'S': [], 'Z': []}

Totaldata = pd.DataFrame()
label = pd.DataFrame()

# Iterate over subdata
for subdata_name, subdata in zip(['F', 'N', 'O', 'S', 'Z'], [F, N, O, S, Z]):
    
    # Iterate over rows of subdata
    for Signal in subdata:
        Signal = Bandstop(Signal)
        
        # Signal Rhythmic Extraction
        rhythms  = BandExt(Signal, fs)
        
        feature_vector = np.array([])
        
        for rhythm in rhythms:
            
            # Time domain feature extraction
            feature_vector = np.append(feature_vector, myfeatureExtraction(rhythm))
            
            # FFT domain feature extraction
            fft, _ = GetFFT(rhythm, fs)
            feature_vector =  np.append(feature_vector, myfeatureExtraction(fft))
            
        # STFT domain features extraction and append to feature vector
        window_size = 256
        overlap = 128
        Band = np.array([[0.1, 4, 8, 12, 30], [4, 8, 12, 30, 70]])
        
        stft, freq = STFT(Signal, fs, window_size, overlap)
        
        for i in range(Band.shape[1]):
            low_freq = Band[0, i]
            high_freq = Band[1, i]
            mask = (freq >= low_freq) & (freq <= high_freq)
            band_stft = stft[mask, :]
        
            feature_vector = np.append(feature_vector, myfeatureExtraction(stft))
        
        # Wavelet domain features extraction and append to feature vector
        wave_dec = GetWavelet(Signal, 'db6', 5)

        for wave in wave_dec:
            feature_vector = np.append(feature_vector, myfeatureExtraction(wave))
            
        # Reshape the feature vector into a single column and concatenate to the features dataframe
        Totaldata = pd.concat([Totaldata, pd.DataFrame(feature_vector.reshape(1, -1))], axis=0, ignore_index=True)
        
        if subdata_name == 'Z' or subdata_name == 'O':       # Normal
            label = np.append(label, 0)
        
        elif subdata_name == 'N' or subdata_name == 'F':     # Epileptic
            label = np.append(label, 1)
            
        elif subdata_name == 'S':                            # Epilepsy-Seizure
            label = np.append(label, 2)


# sanity check
print(Totaldata.shape)
print(label.shape)

Totaldata = np.array(Totaldata)
label = np.array(label)

(500, 126)
(500,)


In [11]:
# datatrain, datatest, dtrain, dtest = train_test_split(Totaldata, label, test_size=0.3, shuffle=True)

# # K nearest Neighbors Parameter 
# K = 5

# # Feature selection
# indx_f = np.arange(datatrain.shape[1])
# sel = []
# max_numf = 30
# bestperfomance = []

# for iter in range(1, max_numf+1):
#     perfomance = np.zeros(len(indx_f))
#     c = 0
    
#     for indx in (indx_f):
#         c += 1
#         indx_cond = np.concatenate((sel, [indx])).astype(int)
        
#         # print(indx_cond)
        
#         datatrain_cond = datatrain[:, indx_cond]
#         datatest_cond = datatest[:, indx_cond]
        
#         # Train classifier
#         mdl = KNeighborsClassifier(n_neighbors=K, n_jobs=4, algorithm='auto', weights='distance')

#         mdl.fit(datatrain_cond, dtrain)
        
#         # Test trained classifier
#         output = mdl.predict(datatest_cond)
        
#         # Validation
#         Cmat = confusion_matrix(dtest, output)
#         accuracy = np.sum(np.diag(Cmat)) / np.sum(Cmat) * 100
        
#         perfomance[c-1] = accuracy
        
# #     print("select ", sel, " perform : ", perfomance)
        
#     ind = np.argmax(perfomance)
#     print(ind)
#     bestperfomance.append(perfomance[ind])
#     sel.append(indx_f[ind])
    
#     print("Best Feature ", np.argmax(perfomance), " performance", np.max(perfomance), " sel ", sel)

#     indx_f = np.delete(indx_f, ind)
#     #print(ind)
       

# sel = np.array(sel)
# bestperfomance = np.array(bestperfomance)

In [12]:
# print("Selected Features : ", sel)

# datatrain = datatrain[:, sel]
# datatest = datatest[:, sel]

# print("Train data size: ", datatrain.shape)
# print("Test data size: ", datatest.shape)

In [13]:
# # Train classifier
# K = 7
# mdl = KNeighborsClassifier(n_neighbors=K, n_jobs=4, algorithm='auto', weights='distance')

# mdl.fit(datatrain_cond, dtrain)

# # Test trained classifier
# output = mdl.predict(datatest_cond)

# # Validation
# Cmat = confusion_matrix(dtest, output)
# accuracy = np.sum(np.diag(Cmat)) / np.sum(Cmat) * 100

# print("Confusion Matrix = \n", Cmat)
# print("\n >>>> Final Model Accuracy = ", accuracy, "\n")

# for i in range(Cmat.shape[0]):
#     print(f"Class {i+1} Accuracy  = ", Cmat[i, i] / np.sum(Cmat[i, :]) * 100)

In [11]:
datatrain, datatest, dtrain, dtest = train_test_split(Totaldata, label, test_size=0.3, shuffle=True)

num_f = np.arange(datatrain.shape[1])
selected = []
max_f = 10

for i in range(max_f):
    perform = []
    
    for j in num_f:
        idx = selected + [j]
        
        mdl = KNeighborsClassifier(n_neighbors=6, n_jobs=-1, algorithm='auto')
        
        mdl.fit(datatrain[:, idx], dtrain)
        
        output = mdl.predict(datatest[:, idx])
        
        perform.append(accuracy_score(dtest, output))
    
    bestFeature = np.argmax(perform)
    selected.append(num_f[bestFeature])
    
    print(" best performance ", np.max(perform), "Best Features ", selected)
    
    num_f = np.delete(num_f, bestFeature)
        

 best performance  0.8266666666666667 Best Features  [43]
 best performance  0.94 Best Features  [43, 20]
 best performance  0.96 Best Features  [43, 20, 101]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0, 1]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0, 1, 2]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0, 1, 2, 3]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0, 1, 2, 3, 4]
 best performance  0.9666666666666667 Best Features  [43, 20, 101, 53, 0, 1, 2, 3, 4, 6]


In [24]:
print("Selected Features : ", selected)

datatrain = datatrain[:, selected]
datatest = datatest[:, selected]

print("Train data size: ", datatrain.shape)
print("Test data size: ", datatest.shape)

Selected Features :  [43, 19, 101, 107, 0, 1, 2, 3, 4, 6]
Train data size:  (350, 10)
Test data size:  (150, 10)


In [27]:
# Train classifier
K = 7
mdl = KNeighborsClassifier(n_neighbors=K, n_jobs=-1, algorithm='auto', weights='distance')

mdl.fit(datatrain, dtrain)

# Test trained classifier
output = mdl.predict(datatest)

# Validation
Cmat = confusion_matrix(dtest, output)
accuracy = np.sum(np.diag(Cmat)) / np.sum(Cmat) * 100

print("Confusion Matrix = \n", Cmat)
print("\n >>>> Final Model Accuracy = ", accuracy, "\n")

for i in range(Cmat.shape[0]):
    print(f"Class {i+1} Accuracy  = ", Cmat[i, i] / np.sum(Cmat[i, :]) * 100)

Confusion Matrix = 
 [[59  1  0]
 [ 0 69  0]
 [ 0  2 19]]

 >>>> Final Model Accuracy =  98.0 

Class 1 Accuracy  =  98.33333333333333
Class 2 Accuracy  =  100.0
Class 3 Accuracy  =  90.47619047619048
