In [24]:
import numpy as np
import os
import pandas as pd

from scipy.io import loadmat
import scipy.io as sio
import scipy.stats
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
import torch.nn as nn

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold


In [25]:
device = (
    "cuda"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

print(f"Is CUDA supported by this system? {torch.cuda.is_available()}")
print(f"CUDA version: {torch.version.cuda}")
# Storing ID of current CUDA device
cuda_id = torch.cuda.current_device()
print(f"ID of current CUDA device: {torch.cuda.current_device()}")
print(f"Name of current CUDA device: {torch.cuda.get_device_name(cuda_id)}")

Using cpu device
Is CUDA supported by this system? True
CUDA version: 11.7
ID of current CUDA device: 0
Name of current CUDA device: NVIDIA GeForce GTX 1650


In [26]:
from sklearn.preprocessing import MinMaxScaler, RobustScaler
def preprocess_data(eeg_data):

    #standardization
    scaler = StandardScaler()
    preprocessed_data = scaler.fit_transform(eeg_data)
    # plt.figure(figsize=(10, 6))
    # plt.plot(preprocessed_data)
    # plt.xlabel('Time')
    # plt.ylabel('Amplitude')
    # plt.title('Preprocessed EEG Data')
    # plt.show()

    # Reshape the data into a 2D array for feature scaling
    reshaped_data = np.reshape(preprocessed_data, (12*7*5120, 5))

    # Perform feature scaling using MinMaxScaler
    minmax_scaler = MinMaxScaler()
    minmax_scaled_data = minmax_scaler.fit_transform(reshaped_data)

    # Perform feature scaling using RobustScaler
    robust_scaler = RobustScaler()
    robust_scaled_data = robust_scaler.fit_transform(reshaped_data)

    # Reshape the scaled data back to its original shape
    minmax_scaled_data = np.reshape(minmax_scaled_data, (12, 7, 5120, 5))
    robust_scaled_data = np.reshape(robust_scaled_data, (12, 7, 5120, 5))

    # Verify the shape of the scaled data
    print("MinMax scaled data shape:", minmax_scaled_data.shape)
    print("Robust scaled data shape:", robust_scaled_data.shape)

    return minmax_scaled_data, robust_scaled_data

In [27]:
def extract_features(eeg_data):
    mean_features = np.mean(eeg_data, axis=1)
    std_features = np.std(eeg_data, axis=1)
    variance_features = np.var(eeg_data, axis=1)
    skewness_features = scipy.stats.skew(eeg_data, axis=1)
    kurtosis_features = scipy.stats.kurtosis(eeg_data, axis=1)

    # tsallis_entropy_features = scipy.stats.entropy(eeg_data, axis=1)
    # renyi_entropy_features = scipy.stats.entropy(eeg_data, axis=1, base=2)
    # shannon_entropy_features = scipy.stats.entropy(eeg_data, axis=1, base=np.exp(1))
    # log_energy_features = np.log(np.sum(np.square(eeg_data), axis=1))
    
    extracted_features = np.column_stack((mean_features, std_features, variance_features, skewness_features, kurtosis_features))
    return extracted_features


In [28]:
def read_mat_file(file_name):
    mat = sio.loadmat(file_name)
    mat = mat["ssvep"]
    data = []
    for stimuli in mat:
        stimuli_name = stimuli[1][0]
        stimuli_data = np.array(stimuli[0])
        stimuli_data = np.delete(stimuli_data, -1, axis=1)
        stimuli_data = preprocess_data(stimuli_data)
        stimuli_features = extract_features(stimuli_data)
        data.append(stimuli_features)
    return data

In [29]:
def split_data(features, labels, test_size=0.2):
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, stratify=labels)
    return X_train, X_test, y_train, y_test

In [30]:
if __name__ == "__main__":
    volunteers = ["1", "2", "3", "4", "5", "6"]
    labels = ["A", "B"]
    data = []
    data_labels = []
    for volunteer in volunteers:
        for label in labels:
            file_name = volunteer + "_" + label + ".mat"
            stimuli_features = read_mat_file(file_name)
            data.append(stimuli_features)
            if label == "A":
                data_labels.append(1)
            else:
                data_labels.append(0)
    print("Preprocessed data format:")
    print(np.array(data).shape)
    print(data_labels)
    print("")
    X = np.array(data)
    y = np.array(data_labels)
    X = np.reshape(X, (X.shape[0], X.shape[1], -1))

ValueError: cannot reshape array of size 112640 into shape (430080,5)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs):
        lstm_out, _ = self.lstm(inputs)
        output = self.fc(lstm_out[:, -1, :])
        output = self.sigmoid(output)
        return output

In [None]:
input_size = X.shape[2]
hidden_size = 64
output_size = 1

model = LSTMModel(input_size, hidden_size, output_size)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters())

k = 5
kf = KFold(n_splits=k)
accuracies = []

In [None]:
for train_index, val_index in kf.split(X):
    print("Training on fold:", train_index, "Validating on fold:", val_index)
    X_train_fold, X_val_fold = torch.from_numpy(X[train_index]), torch.from_numpy(X[val_index])
    y_train_fold, y_val_fold = torch.from_numpy(y[train_index]), torch.from_numpy(y[val_index])

    model.train()
    optimizer.zero_grad()

    outputs = model(X_train_fold.float())
    loss = criterion(outputs.squeeze(), y_train_fold.float())
    loss.backward()
    optimizer.step()

    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_fold.float())
        val_predictions = (val_outputs.squeeze() > 0.5).float()
        accuracy = accuracy_score(y_val_fold, val_predictions.numpy())
        accuracies.append(accuracy)

average_accuracy = np.mean(accuracies)
print("Average Accuracy:", average_accuracy*100)

Training on fold: [ 3  4  5  6  7  8  9 10 11] Validating on fold: [0 1 2]
Training on fold: [ 0  1  2  6  7  8  9 10 11] Validating on fold: [3 4 5]
Training on fold: [ 0  1  2  3  4  5  8  9 10 11] Validating on fold: [6 7]
Training on fold: [ 0  1  2  3  4  5  6  7 10 11] Validating on fold: [8 9]
Training on fold: [0 1 2 3 4 5 6 7 8 9] Validating on fold: [10 11]
Average Accuracy: 86.66666666666666


In [None]:
class EEGClassifierCNN(nn.Module):
    def __init__(self, input_shape, num_classes):
        super(EEGClassifierCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(32 * (input_shape[0] // 2 - 1) * (input_shape[1] // 2 - 1), 64)
        self.relu2 = nn.ReLU()
        self.fc2 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu2(x)
        x = self.fc2(x)
        return x

    def train(self, train_data, train_labels, batch_size, epochs):
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(self.parameters(), lr=0.001)

        train_data = torch.tensor(train_data, dtype=torch.float32)
        train_labels = torch.tensor(train_labels, dtype=torch.long)

        dataset = torch.utils.data.TensorDataset(train_data, train_labels)
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(device)

        for epoch in range(epochs):
            running_loss = 0.0
            for inputs, labels in dataloader:
                inputs, labels = inputs.to(device), labels.to(device)

                optimizer.zero_grad()

                outputs = self(inputs.unsqueeze(1))
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

            print(f"Epoch {epoch+1} Loss: {running_loss / len(dataloader):.4f}")

    def evaluate(self, test_data, test_labels):
        test_data = torch.tensor(test_data, dtype=torch.float32)
        test_labels = torch.tensor(test_labels, dtype=torch.long)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(device)
        test_data = test_data.to(device)
        test_labels = test_labels.to(device)

        with torch.no_grad():
            outputs = self(test_data.unsqueeze(1))
            _, predicted = torch.max(outputs.data, 1)
            correct = (predicted == test_labels).sum().item()
            accuracy = correct / test_labels.size(0)

        print(f"Test Accuracy: {accuracy * 100:.2f}%")

    def predict(self, data):
        data = torch.tensor(data, dtype=torch.float32)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(device)
        data = data.to(device)

        with torch.no_grad():
            outputs = self(data.unsqueeze(1))
            _, predicted = torch.max(outputs.data, 1)

        return predicted.cpu().numpy()


In [None]:
# Assuming you have your EEG data and labels stored in train_data, train_labels, test_data, and test_labels

# Create an instance of the EEGClassifierCNN
input_shape = (7, 5120)  # Specify the input shape of your EEG data
num_classes = 2  # Specify the number of classes (e.g., normal vs. caffeinated)
model = EEGClassifierCNN(input_shape, num_classes)

# Train the model
batch_size = 16
epochs = 10
model.train(train_data, train_labels, batch_size, epochs)

# Evaluate the model 
model.evaluate(test_data, test_labels)

# Make predictions
predictions = model.predict(test_data)


NameError: name 'train_data' is not defined

In [None]:
# with open('/Anne/Facultate/licenta/ScienceDirect_files_16Dec2022_10-38-19/CSV\sig10Hz_V1_A.csv') as f:
#     train = pd.read_csv(f)
# train = train.drop('ECG', axis=1)
# train.head(5120)

# with open('/Anne/Facultate/licenta/ScienceDirect_files_16Dec2022_10-38-19/CSV\sig10Hz_V1_B.csv') as f:
#     test = pd.read_csv(f)
# # test = test.drop('ECG', axis=1)
# test.head(5120)


In [None]:
# new_train = list(train.columns)
# train.columns = new_train

# fig, axes = plt.subplots(nrows=22, ncols=1, figsize=(20, 40), sharey = True)

# for i, col in enumerate(train.columns):
#   ax = axes[i]
#   ax.plot(train[col])
#   ax.set_title(col)

# plt.tight_layout()
# # plt.show
# plt.savefig("pictures/EEG1.jpg")

In [None]:
directory = '/Anne/Facultate/licenta/Dataset/CSV'
for filename in os.listdir(directory):
    if filename.endswith('.csv'):
        with open(os.path.join(directory, filename)) as f:
            file_name = os.path.join(directory, filename)
            # print (file_name)
#5120 DATA POINTS - 20s duration of 7 photic stimulus and resting period of 10s in between stimuli freq
column_names = ["FP1", "FP2", "PG1", "PG2", "F7", "F3", "F2", "F4", "F8", "C3", "C2", "C4", "P3", "P4", "P2", "T3", "T4", "T5", "T6", "O1", "OZ", "O2", "ECG"]


In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout2d(p=0.25)
        self.fc1 = nn.Linear(4608, 128)
        self.dropout2 = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.pool(x)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        x = nn.functional.sigmoid(x)
        return x