In [32]:
import spkit as sp
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
%matplotlib inline  
import mne
from copy import deepcopy
from mne.preprocessing import compute_proj_ecg
from mne_connectivity import envelope_correlation
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import random
from mne.preprocessing import ICA, corrmap, create_ecg_epochs, create_eog_epochs
import autoreject
from autoreject import AutoReject
from autoreject import get_rejection_threshold
from autoreject import Ransac
from mne.preprocessing import annotate_amplitude
from sklearn.preprocessing import PowerTransformer
from sklearn.preprocessing import MinMaxScaler
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
from scipy.signal import welch
from scipy.stats import entropy, skew, kurtosis
import networkx as nx
import time
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import accuracy_score
from sklearn.feature_selection import VarianceThreshold, SelectKBest, f_classif
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC

In [3]:
import logging
logging.getLogger('mne').setLevel(logging.WARNING)

In [4]:
class FPG_Dataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.filepaths, self.labels = self._load_filepaths_and_labels()

    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, index):
        filepath = self.filepaths[index]
        label = self.labels[index]

        eeg_data = self._load_eeg(filepath)

        if self.transform is not None:
            eeg_data = self.transform(eeg_data)

        return torch.tensor(eeg_data), torch.tensor(label)

    def _load_filepaths_and_labels(self):
        filepaths = []
        labels = []

        classes = sorted(os.listdir(self.root_dir))
        for class_index, class_name in enumerate(classes):
            class_dir = os.path.join(self.root_dir, class_name)
            if os.path.isdir(class_dir):
                filenames = os.listdir(class_dir)
                for filename in filenames:
                    filepaths.append(os.path.join(class_dir, filename))
                    labels.append(class_index)

        return filepaths, labels

    def _load_eeg(self, filepath):
        data = mne.read_epochs(filepath, preload=False).get_data(picks='eeg');
        normals = []
        scaler = StandardScaler()
        for idx in range(len(data)):
            normals.append(scaler.fit_transform(data[idx]))

        return np.array(normals)

In [5]:
def custom_collate_fn(batch):
    # Concatenate data samples along the first dimension (window_count)
    # Assumes each sample is a tuple (uid, data_sample)
    uids, data_samples = zip(*batch)
    concatenated_data = torch.stack(data_samples, dim=0)
    return uids, concatenated_data

In [6]:
def PLV(sample, verbose=False):
        
        EEG_data = sample[0]
        threshold = 0.3
        G = nx.Graph()

        # Add nodes (brain regions)
        G.add_nodes_from(range(21))  # Assuming 21 brain regions

        # Add edges (based on functional connectivity)
        for i in range(21):
            for j in range(i + 1, 21):
                # Calculate functional connectivity strength between nodes i and j
                # (e.g., using correlation coefficients from EEG signals)

                #phase locking value
                connectivity_strength = np.abs(np.mean(np.exp(1j * np.angle(EEG_data[i] * np.conj(EEG_data[j])))))  # Replace with actual method

                if connectivity_strength > threshold:
                    G.add_edge(i, j, weight=connectivity_strength)

        # Calculate graph metrics
        local_efficiency = nx.local_efficiency(G)
        global_efficiency = nx.global_efficiency(G)

        if (verbose):
                
            print(f"Local efficiency: {local_efficiency}")
            print(f"Global efficiency: {global_efficiency}")

        return local_efficiency, global_efficiency

In [7]:
def extract_eeg_features(eeg_data):
    eeg_data = np.array(eeg_data)
    num_channels = eeg_data.shape[0]

    # Initialize arrays to store features
    mean_values = np.zeros(num_channels)
    std_values = np.zeros(num_channels)
    min_values = np.zeros(num_channels)
    max_values = np.zeros(num_channels)
    skewness_values = np.zeros(num_channels)
    kurtosis_values = np.zeros(num_channels)
    p2p_values = np.zeros(num_channels)
    p2rms_values = np.zeros(num_channels)
    rss_values = np.zeros(num_channels)
    amplitude = np.zeros(num_channels)
    rms = np.zeros(num_channels)
    zero_crossing_rate = np.zeros(num_channels)
    delta_power = np.zeros(num_channels)
    theta_power = np.zeros(num_channels)
    alpha_power = np.zeros(num_channels)
    beta_power = np.zeros(num_channels)
    gamma_power = np.zeros(num_channels)
    spectral_entropy = np.zeros(num_channels)

    for channel in range(num_channels):
        # Extract data for the current channel
        channel_data = eeg_data[channel]

        # Compute mean, STD, min, max
        mean_values[channel] = np.mean(channel_data)
        std_values[channel] = np.std(channel_data)
        min_values[channel] = np.min(channel_data)
        max_values[channel] = np.max(channel_data)

        # Compute skewness and kurtosis
        skewness_values[channel] = skew(channel_data)
        kurtosis_values[channel] = kurtosis(channel_data)

        # Compute peak-to-peak (P2P)
        p2p_values[channel] = max_values[channel] - min_values[channel]

        # Compute peak-to-root sum square (P2RMS)
        p2rms_values[channel] = np.sqrt(np.sum(channel_data**2))

        # Compute root sum square (RSS)
        rss_values[channel] = np.sqrt(np.sum(channel_data**2))

        # Compute amplitude (peak-to-peak divided by 2)
        amplitude[channel] = p2p_values[channel] / 2

        # Compute root mean square (RMS)
        rms[channel] = np.sqrt(np.mean(channel_data**2))

        # Compute zero-crossing rate
        zero_crossing_rate[channel] = np.mean(np.diff(np.sign(channel_data)) != 0)

        # Compute power spectral density using Welch method
        f, psd = welch(channel_data, fs=1000, nperseg=256)
        delta_power[channel] = np.sum(psd[(f >= 1) & (f <= 4)])
        theta_power[channel] = np.sum(psd[(f >= 4) & (f <= 8)])
        alpha_power[channel] = np.sum(psd[(f >= 8) & (f <= 14)])
        beta_power[channel] = np.sum(psd[(f >= 14) & (f <= 30)])
        gamma_power[channel] = np.sum(psd[f > 30])

        # Compute spectral entropy
        spectral_entropy[channel] = -np.sum(psd * np.log2(psd))

    # Organize features into a dictionary
    features = np.array([mean_values,std_values,min_values,max_values,skewness_values,kurtosis_values,p2p_values,p2rms_values, rss_values,amplitude, rms, zero_crossing_rate,delta_power, theta_power, alpha_power,beta_power,gamma_power,spectral_entropy])

    return features.T

#extracted_features = extract_eeg_features(sample[0])
#extracted_features.shape

In [12]:
def Load_Data(path, verbose = False):
    dataset = FPG_Dataset(root_dir=path)

    batch_size = 1  # Set your desired batch size
    train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, collate_fn=custom_collate_fn)

    label = []
    feature_space = []
    
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        for patient in inputs:
            
            for epoch in patient:
                feature_space.append(extract_eeg_features(epoch))
                label.append(targets)
    
    feature_space = np.squeeze(feature_space)
    labels = np.array([tensor.numpy() for tensor in label])
    flat_features = feature_space.reshape(feature_space.shape[0], -1)

    if (verbose):
        print(f"X size: {flat_features.shape}")
        print(f"y size: {labels.shape}")
    return flat_features, labels.reshape(-1)

In [9]:
X_train, y_train = Load_Data(r"D:\20second_MNE_3CLASS", verbose= True)

X size: (38619, 378)
y size: (38619, 1)


In [13]:
X_test, y_test = Load_Data(r"D:\20second_MNE_3CLASS_TEST", verbose= True)

X size: (4599, 378)
y size: (4599, 1)


In [34]:
start = time.time()

rf_classifier = RandomForestClassifier(n_estimators=80, random_state=42)
knn_classifier = KNeighborsClassifier(n_neighbors=100)
svm_classifier = SVC()
ovr_classifier = OneVsRestClassifier(rf_classifier)
ovr_classifier.fit(X_train, y_train)
y_pred = ovr_classifier.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2f}")
print(f"training time: {time.time() - start}")