## Imports and label setup

In [None]:
!pip install mne

In [None]:
%%capture
import sklearn
from sklearn import datasets
import matplotlib.pyplot as plt
import numpy as np
import scipy.io as sio
from scipy import signal
import pandas as pd
from sklearn import metrics
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import mne
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def epoch_signal(signal, epoch_length, overlap, sampling_rate):
    # Get number of samples per epoch and step size for overlap
    samples_per_epoch = int(epoch_length * sampling_rate)
    step_size = int(samples_per_epoch * (1 - overlap))

    epochs = []
    for i in range(0, signal.shape[1] - samples_per_epoch + 1, step_size):
        # Extract current epoch and add to running list
        epoch = signal[:, i:i+samples_per_epoch]
        epochs.append(epoch)

    epochs = np.array(epochs)
    return epochs

def get_signal(file_path):
  mne.set_log_level('WARNING')
  raw_data = mne.io.read_raw_eeglab(file_path)
  #bandpass filter between 0.5 and 49 Hz
  raw_data.filter(l_freq=0.5, h_freq=49.0)
  #Resample to 120, sufficiently above safe frequency to prevent aliasing
  raw_data.resample(sfreq=120)
  #set reference
  raw_data.set_eeg_reference(ref_channels='average')
  raw_data = raw_data.get_data()
  return raw_data

In [None]:
#Makes list of participant group types from participants.tsv file
# A: Alzheimer group; C: Healthy (Control) group; F: Frontotemporal Dementia group
subj_types = []
participants_path = f"/content/drive/Shared drives/NeurotechX Shared Drive/Alzheimer's Dataset/participants.tsv"
with open(participants_path) as file:
  for line in file:
    l = line.split('\t')
    subj_types.append(l[3])
subj_types.pop(0)

'Group'

## Create and store coefficient data in Drive

In [None]:
#Generate CWT data for all epochs from one patient -- for use when memory is not limiting factor
def generate_cwt_data(eeg_data, fs=120):
    #Input: 3D array (epochs, channels, samples), fs of data
    #Output: 4D array (epochs, channels, frequencies, time) - formatted to
    #feed into dataloader after making labels array

    # Get the number of epochs, channels, and samples
    epochs, channels, samples = eeg_data.shape

    # Initialize an empty list to hold the spectrograms
    cwts = []

    # Loop over the epochs and channels
    for i in tqdm(range(epochs.shape[0])):
        epoch_cwt = []
        for j in range(channels):
            # Compute the spectrogram of the current channel in the current epoch
            num_scales = 90
            min_scale = 1
            max_scale = 128
            scales = np.logspace(np.log10(min_scale), np.log10(max_scale), num=num_scales)
            wavelet = 'morl'  # Morlet wavelet
            # wavelet = 'mexh'  # mexican hat wavelet

            # perform CWT
            coefficients, frequencies = pywt.cwt(eeg_data[i,j,:], scales, wavelet)
            epoch_cwt.append(coefficients)

        # Append the list of channel cwts to the list of epoch cwts
        cwts.append(epoch_cwt)

    # Convert the list of cwts to a 4D numpy array and return it
    cwt_array= np.array(cwts)

    return np.array(cwt_array)

def generate_labels(sgram_array, patient_label):
    #create labels vector
    labels = []
    for i in range(0, len(sgram_array)):
      labels.append(patient_label)
    return labels

In [None]:
import pywt

# Compute the cwt of all channels for current epoch
def get_cwt_per_epoch(epoch_data):
  num_scale = 256
  min_scale = 1
  max_scale = 256
  scales = np.linspace(min_scale,max_scale, num_scale)  #decrease scales for faster computation
  wavelet = 'morl'  # Morlet wavelet is likely best for these purposes
  coeffs_per_channel = []
  freq_per_channel = []
  # perform CWT
  for channel in range(19):
    #calculate cwt coefficients and append to array storing coeffs for all channels
    coefficients, frequencies = pywt.cwt(epoch_data[channel,:], scales, wavelet)
    coeffs_per_channel.append(coefficients)
    freq_per_channel.append(frequencies)
  coeffs_per_channel = np.array(coeffs_per_channel)

  return coeffs_per_channel.astype('float32')


In [None]:
import os
import gc
#Create and store coefficient data in Drive

epoch_length = 5  # 5 seconds per epoch
overlap = 0.75  # 75% overlap
sampling_rate = 120  # Signal is sampled at 120 Hz

#Get data for all subjects
for subject in range(1,89):
  print("Starting subject #", subject)
  #get label for current subject
  subject_label = subj_types[subject-1]
  #set path to the EEG data file for this subject
  file_path = f"/content/drive/Shared drives/NeurotechX Shared Drive/Alzheimer's Dataset/derivatives/sub-{subject:03}/eeg/sub-{subject:03}_task-eyesclosed_eeg.set"
  #set base directory
  base_dir = f"/content/drive/Shared drives/NeurotechX Shared Drive/CWT Coeffs/{subject_label}/"

  #load in EEG data
  data = get_signal(file_path)
  #split data into epochs
  subject_epochs = epoch_signal(data, epoch_length, overlap, sampling_rate)

  #process in batches to preserve memory
  batch_size = 100
  num_batches = int(np.ceil(subject_epochs.shape[0] / batch_size))

  for batch in range(num_batches):
      start = batch * batch_size
      end = min(start + batch_size, subject_epochs.shape[0])
      cwts_for_subject = []

      for epoch in tqdm(range(start, end)):
          coeffs_per_channel = get_cwt_per_epoch(subject_epochs[epoch,:,:])
          cwts_for_subject.append(coeffs_per_channel)

      filename = f"patient_{subject:03}_batch{batch+1}.npy"
      file_path = os.path.join(base_dir, filename)
      np.save(file_path, cwts_for_subject)

      #manually clear memory
      del cwts_for_subject, coeffs_per_channel
      gc.collect()


In [None]:
#Parallelize cwt since it's computationally expensive
from multiprocessing import Pool

def layout_from_epoch(epoch):
  coeffs_per_channel = get_cwt_per_epoch(epoch)
  final_array = create_layout(coeffs_per_channel)
  return final_array


#Create pool
with Pool() as pool:
    results = pool.map(layout_from_epoch, epochs)



## Create images from cwt data and store in Drive if necessary for space

In [None]:
#Plot channel cwt data in spatially accurate orientation

#Can also try changing layout to 4,3,5,3,4 TO 2,5,5,5,2 to avoid too many
#"edges" for convolutions

def create_layout(coeffs_per_channel):
  #input: 3D array of coefficients per channel for one epoch
  #output: 2D image of all channels' cwt plots spatially organized
  #Correct order of channels before plotting
  order = [10,0,1,11,2,16,3,12,4,17,5,13,6,18,7,14,8,9,15]
  reordered_coeffs_per_channel = [coeffs_per_channel[i] for i in order]

  #define grid structure and array shape
  grid_structure = [4, 3, 5, 3, 4]
  total_columns = 5  # total number of columns in the grid
  array_shape = (256, 600)  # the shape of each array

  final_array_rows = []

  #calculate total width of grid
  total_width = total_columns * array_shape[1]

  plots_done = 0

  # Loop through each row
  for num_plots in grid_structure:
      #calculate the width of plots and padding for this row
      plots_width = num_plots * array_shape[1]
      padding_width = (total_width - plots_width) // 2
      #create padding arrays
      padding = np.zeros((array_shape[0], padding_width))
      #create list to hold the plots for this row
      plots = [reordered_coeffs_per_channel[plot_num + plots_done] for plot_num in range(num_plots)]
      #concatenate padding and plots to create the row
      row = np.concatenate([padding] + plots + [padding], axis=1)

      final_array_rows.append(row)
      plots_done += num_plots

  #concatenate rows to create the final array
  final_array = np.concatenate(final_array_rows, axis=0)
  final_array = np.abs(final_array)

  return final_array

def create_layouts_all_epochs(all_coeffs):
  #input: 4D array of ceofficients for each epoch
  #output: 3D array of images for each epoch
  all_images_list = []
  for epoch in tqdm(range(all_coeffs.shape[0])):
    epoch_final_array = create_layout(all_coeffs[epoch,:,:,:])
    all_images_list.append(epoch_final_array)

  all_images_array = np.concatenate(all_images_list, axis=0)
  return all_images_array.astype(float32)


#Make image combining all channels per epoch - useful for checking code but not
#used for results
def make_image(final_array):
  # Normalize the array to the range 0-1
  log_array = np.log1p(final_array)
  normalized_array = (log_array - log_array.min()) / (log_array.max() - log_array.min())

  # TO DO : use log normalization with the max across all samples, not within individauls

  import matplotlib.pyplot as plt

  # Plot the final array
  plt.figure(figsize=(10,10))
  plt.imshow(final_array, cmap='jet')
  plt.colorbar()
  plt.axis('off')
  plt.show()

In [None]:
import os
from collections import defaultdict

def get_filepaths(directory):
  #gets all filepaths within a directory
    files = os.listdir(directory)
    npy_files = [file for file in files if file.endswith('.npy')]

    subject_filepaths = defaultdict(list)  # use a default dict to automatically handle new keys
    for npy_file in npy_files:
        file_path = os.path.join(directory, npy_file)
        patient_number = npy_file.split('_')[1]  # extract patient number from filename
        subject_filepaths[patient_number].append(file_path)

    return subject_filepaths

In [None]:
#Create full images for each subject and upload to Drive folder
batch_size = 64
subject_label = 'A'
directory = f"/content/drive/Shared drives/NeurotechX Shared Drive/CWT Coeffs/{subject_label}/"
subject_filepaths = get_filepaths(directory)

for patient, patient_files in subject_filepaths.items():
    patient_data = []
    for file_path in patient_files:
        file_data = np.load(file_path, allow_pickle=True)  # load one file at a time
        patient_data.append(file_data)
    patient_data = np.concatenate(patient_data, axis=0)

    #calculate the number of batches
    num_batches = len(patient_data) // batch_size + (len(patient_data) % batch_size != 0)

    for batch in range(num_batches):
        #determine the start and end index of the batch
        start = batch * batch_size
        end = min(start + batch_size, len(patient_data))

        #process the batch
        batch_data = patient_data[start:end]
        batch_result = create_layouts_all_epochs(batch_data)

        #save batch result to google drive
        result_path = f"/content/drive/Shared drives/NeurotechX Shared Drive/CWT Coeffs/Combined Images/patient_{patient}_{subject_label}_batch_{batch}.npy"
        np.save(result_path, batch_result)

        # clear the memory by deleting the variable
        del batch_data
        del batch_result
        np.load.__defaults__ = (None, True, True, 'ASCII')  # this line helps to clear the cache of np.load

    del patient_data  #clear memory



In [None]:
# #Testing: create full images for one subject and upload to Drive folder
# subject_label = 'A'
# directory = f"/content/drive/Shared drives/NeurotechX Shared Drive/CWT Coeffs/{subject_label}/"
# subject_filepaths = get_filepaths(directory)

# print(subject_filepaths["001"])

In [None]:
batch_size = 64
subject_label = 'A'
patient = "001"
patient_data = []

for file_path in subject_filepaths[patient]:
    print(file_path)
    file_data = np.load(file_path, allow_pickle=True)  # load one file at a time
    patient_data.append(file_data)
patient_data = np.concatenate(patient_data, axis=0)

# calculate the number of batches
num_batches = len(patient_data) // batch_size + (len(patient_data) % batch_size != 0)

for batch in range(num_batches):
    # determine the start and end index of the batch
    start = batch * batch_size
    end = min(start + batch_size, len(patient_data))

    #process the batch
    batch_data = patient_data[start:end]
    batch_result = create_layouts_all_epochs(batch_data)

    #save batch result to Google Drive
    result_path = f"/content/drive/Shared drives/NeurotechX Shared Drive/CWT Coeffs/Combined Images/patient_{patient}_{subject_label}_batch_{batch}.npy"
    np.save(result_path, batch_result)

    # clear the memory by deleting the variable
    del batch_data
    del batch_result
    np.load.__defaults__ = (None, True, True, 'ASCII')  # this line helps to clear the cache of np.load

del patient_data  # clear the memory by deleting the variable


## Spectrograms -- don't provide enough resolution

In [None]:
#testing spectrogram and label generation
test_spectrograms = generate_spectrograms(epochs)
patient_label = subj_types[subject-1]
test_labels = generate_labels(test_spectrograms, patient_label)
test_spectrograms.shape

(104, 19, 129, 15)

In [None]:
#Generate set of spectrograms from preprocessed data without ICA // TO DO: Add array for labels for each epoch
def generate_spectrograms(eeg_data, fs=120):
    #Input: 3D array (epochs, channels, samples), fs of data
    #Output: 4D array (epochs, channels, frequencies, time) - formatted to
    #feed into dataloader after making labels array

    # get number of epochs, channels, and samples
    epochs, channels, samples = eeg_data.shape

    spectrograms = []

    # Loop over the epochs and channels
    for i in range(epochs):
        epoch_spectrograms = []
        for j in range(channels):
            # compute spectrogram of the current channel in the current epoch
            f, t, Sgram = signal.spectrogram(eeg_data[i, j, :], fs=fs)
            epoch_spectrograms.append(Sgram)

        #append the list of channel spectrograms to the list of epoch spectrograms
        spectrograms.append(epoch_spectrograms)

    #convert the list of spectrograms to a 4D array and return
    sgram_array= np.array(spectrograms)
    return np.array(sgram_array)

def generate_labels(sgram_array, patient_label):
    #create labels vector
    labels = []
    for i in range(0, len(sgram_array)):
      labels.append(patient_label)
    return labels


In [None]:
#Create arrays for all subjects
all_spectrograms = []
all_labels = []

epoch_length = 4  # 4 seconds per epoch
overlap = 0.75  # 75% overlap
sampling_rate = 120  # Signal is sampled at 120 Hz

for subject in tqdm(range(1, 89)):
  #set file path to EEG data for current subject
  file_path = f"/content/drive/Shared drives/NeurotechX Shared Drive/Alzheimer's Dataset/derivatives/sub-{subject:03}/eeg/sub-{subject:03}_task-eyesclosed_eeg.set"
  #get signal and split into epochs
  subject_data = get_signal(file_path)
  subject_epochs = epoch_signal(subject_data, epoch_length, overlap, sampling_rate)
  #create spectrograms and add to running list
  subjects_spectrograms = generate_spectrograms(subject_epochs)
  all_spectrograms.append(subjects_spectrograms)
  #get label for subject and add to list
  stored_label = subj_types[subject-1]
  subject_labels = generate_labels(subjects_spectrograms, stored_label)
  all_labels.append(subject_labels)

all_spectrograms = np.concatenate(all_spectrograms, axis=0)
all_labels = np.concatenate(all_labels, axis=0)

#save data to google drive
np.save('/content/drive/Shared drives/NeurotechX Shared Drive/spectrograms/larger_spectrograms.npy', all_spectrograms)

## Create new train/test splits

In [None]:
from sklearn.preprocessing import StandardScaler

def split_data(epochs_by_patient, subj_types, test_fraction):
  # Renaming data_values to X and sample_class to y
  # X = data_values
  X = epochs_by_patient # list of 88 elements, i-th element contains the dataframe subsection of features for all epochs corresponding to the i-th subject
  # y = sample_class
  y = subj_types # list of 88 elements, i-th element contains classification group of the i-th subject ("A", "F", "C")


  # Splitting data to train-test using stratified split (by classification group)
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_fraction, stratify=y)

  # Separate epochs from their subject-specific arrays to form combined dataframes
  # (no longer need to know which subject each epoch comes from)
  flat_X_train = []
  flat_X_test = []
  flat_y_train = []
  flat_y_test = []
  for i in range(len(X_train)):
    flat_X_train.extend(X_train[i])
  for i in range(len(X_test)):
    flat_X_test.extend(X_test[i])
  for i in range(len(y_train)):
    flat_y_train.extend([y_train[i]] * len(X_train[i]))
  for i in range(len(y_test)):
    flat_y_test.extend([y_test[i]] * len(X_test[i]))

  scaler = StandardScaler()
  scaled_X_train = scaler.fit_transform(flat_X_train)
  scaled_X_test = scaler.transform(flat_X_test)

  return scaled_X_train, scaled_X_test, flat_y_train, flat_y_test


In [None]:
# Import epochs_by_patient and subj_types from shared drive
import pickle

epochs_by_patient = pickle.load(open("/content/drive/Shared drives/NeurotechX Shared Drive/epochs_by_patient.p", "rb"))
subj_types = pickle.load(open("/content/drive/Shared drives/NeurotechX Shared Drive/subj_types.p", "rb"))

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

#Prepare training and testing labels - fitting encoder ensures one-hot encodings
#are consistently assigned to same class labels between train and test sets
#since encoding is done after splitting
def prep_data(train_features, test_features, train_labels, test_labels):

  le = LabelEncoder()     #make and fit the label encoder
  le.fit(train_labels)

  train_classes = le.transform(train_labels)    #transform the training and test labels into integer classes
  test_classes = le.transform(test_labels)

  ohe = OneHotEncoder(sparse=False)           #fit the one-hot encoder
  ohe.fit(train_classes.reshape(-1, 1))

  train_one_hot = ohe.transform(train_classes.reshape(-1, 1)) # transform to one-hot encodings
  test_one_hot = ohe.transform(test_classes.reshape(-1, 1))

  train_labels = torch.tensor(train_one_hot).float()
  test_labels = torch.tensor(test_one_hot).float()

  # Prepare training features

  train_features = torch.tensor(train_features).float()
  test_features = torch.tensor(test_features).float()

  return train_features, test_features, train_labels, test_labels


In [None]:
train_features, test_features, train_labels, test_labels = split_data(epochs_by_patient, subj_types, 0.3)
train_features, test_features, train_labels, test_labels = prep_data(train_features, test_features, train_labels, test_labels)

In [None]:
#Incorporate class weights -- didn't help in practice
# If train_labels is a numpy array of one-hot encoded labels
train_labels_class_indices = np.array(np.argmax(train_labels, axis=1))

# Compute class weights
class_weights = compute_class_weight('balanced', classes=np.unique(train_labels_class_indices), y=train_labels_class_indices)

# Convert class weights to a PyTorch tensor
class_weights = torch.tensor(class_weights).float()


## Implement MLP


In [None]:
# Create a TensorDataset and a DataLoader
train_dataset = TensorDataset(train_features, train_labels)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = TensorDataset(test_features, test_labels)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

# Define the MLP model
class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(152, 80)
        self.fc2 = nn.Linear(80, 20)
        self.fc3 = nn.Linear(10, 3)

    def forward(self, x):
        x = torch.tanh(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = (self.fc3(x))
        return x

In [None]:
import matplotlib.pyplot as plt

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = MLP()
model.to(device)
criterion = nn.CrossEntropyLoss()
#option to include class weights - doesn't help in practice
# criterion = nn.CrossEntropyLoss(weight = class_weights)
optimizer = optim.SGD(model.parameters(), lr=0.001)

train_accuracy_list = []
test_accuracy_list = []

for epoch in range(20):
    model.train()

    #training loop
    for batch_features, batch_labels in train_dataloader: # loop through batches
        batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
        optimizer.zero_grad()
        output = model(batch_features)
        loss = criterion(output, batch_labels)
        loss.backward()
        optimizer.step()

    #track training accuracy every 5 epochs
    if epoch % 5 == 0:
        model.eval()
        correct_train = 0
        total_train = 0
        with torch.no_grad():
            for batch_features, batch_labels in train_dataloader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                output = model(batch_features)
                _, predicted = torch.max(output.data, 1)
                total_train += batch_labels.size(0)
                correct_train += (predicted == torch.argmax(batch_labels, dim=1)).sum().item()

        train_accuracy = 100 * correct_train / total_train
        train_accuracy_list.append(train_accuracy)

        #evaluate on test data
        correct_test = 0
        total_test = 0
        with torch.no_grad():
            for batch_features, batch_labels in test_dataloader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                output = model(batch_features)
                _, predicted = torch.max(output.data, 1)
                total_test += batch_labels.size(0)
                correct_test += (predicted == torch.argmax(batch_labels, dim=1)).sum().item()

        test_accuracy = 100 * correct_test / total_test
        test_accuracy_list.append(test_accuracy)
        print(f'Epoch {epoch}, Loss: {loss.item()}, Train Accuracy: {train_accuracy}, Test Accuracy: {test_accuracy}')

#plotting train/test accuracy
plt.figure(figsize=(10, 5))
plt.plot(range(0, len(train_accuracy_list)*5, 5), train_accuracy_list, label='Train')
plt.plot(range(0, len(test_accuracy_list)*5, 5), test_accuracy_list, label='Test')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


In [None]:
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

model.eval()
predictions = []
true_labels = []

with torch.no_grad():
    for batch_features, batch_labels in test_dataloader:
        batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
        output = model(batch_features)

        # Apply softmax to output.
        pred_probabilities = F.softmax(output, dim=1)

        # Take the class with the highest probability from the output as prediction
        _, predicted = torch.max(pred_probabilities.data, 1)

        predictions.extend(predicted.cpu().numpy().tolist())
        true_labels.extend(torch.argmax(batch_labels, dim=1).cpu().numpy().tolist())

#Classification report
from sklearn.metrics import classification_report
print(classification_report(true_labels, predictions))

#Confusion matrix
conf_mat = confusion_matrix(true_labels, predictions)

plt.figure(figsize=(10, 10))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', cbar = False)
plt.ylabel('Actual')
plt.xlabel('Predicted')
class_names = ["Alzheimer's", "Control", 'FT Dementia']
plt.xticks(ticks=np.arange(len(class_names))+.5, labels=class_names, ha='center')
plt.yticks(ticks=np.arange(len(class_names))+.5, labels=class_names)
plt.show()

## Try grid search for oversimplified hyperparameter tuning

In [None]:
import torch
from torch import nn, optim
from sklearn.metrics import accuracy_score

#MLP model
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(MLP, self).__init__()
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_size, hidden_size))
        for _ in range(num_layers - 1):
            self.layers.append(nn.Linear(hidden_size, hidden_size))
        self.layers.append(nn.Linear(hidden_size, output_size))

    def forward(self, x):
        for layer in self.layers[:-1]:
            x = torch.relu(layer(x))
        x = self.layers[-1](x)
        return x

#define hyperparameters for grid search
learning_rates = [0.1, 0.01, 0.001]
num_layers = [1, 2, 3]
hidden_sizes = [32, 64, 128]

#to hold best model and parameters
best_model = None
best_accuracy = 0
best_params = {}

#grid search over hyperparameters
for lr in learning_rates:
    print('checking learning rate = ', lr)
    for layers in num_layers:
        print('checking num layers = ', layers)
        for hidden_size in hidden_sizes:
            print('checking hidden size = ', hidden_size)
            model = MLP(input_size=152, hidden_size=hidden_size, num_layers=layers, output_size=3)
            criterion = nn.CrossEntropyLoss()
            optimizer = optim.SGD(model.parameters(), lr=lr)

            #training loop
            for epoch in range(40):  #set appropriate number of epochs
                for batch_features, batch_labels in train_dataloader:
                    optimizer.zero_grad()
                    output = model(batch_features)
                    loss = criterion(output, torch.argmax(batch_labels, dim=1))
                    loss.backward()
                    optimizer.step()

            #evaluation
            model.eval()
            with torch.no_grad():
                correct = 0
                total = 0
                for batch_features, batch_labels in test_dataloader:
                    output = model(batch_features)
                    _, predicted = torch.max(output.data, 1)
                    total += batch_labels.size(0)
                    correct += (predicted == torch.argmax(batch_labels, dim=1)).sum().item()

            accuracy = correct / total
            print(f'Learning rate: {lr}, Hidden layers: {layers}, Hidden size: {hidden_size}, Accuracy: {accuracy}')

            #save model if it has the best accuracy so far
            if accuracy > best_accuracy:
                best_model = model
                best_accuracy = accuracy
                best_params = {'Learning rate': lr, 'Hidden layers': layers, 'Hidden size': hidden_size}

print(f'Best model parameters: {best_params}, Best model accuracy: {best_accuracy}')


## Implement Gradient Boosted Decision Trees with XGBoost

In [None]:
!pip install XGBoost



In [None]:
import xgboost as xgb
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

train_features, test_features, train_labels, test_labels = split_data(epochs_by_patient, subj_types, 0.2)
train_features, test_features, train_labels, test_labels = prep_data(train_features, test_features, train_labels, test_labels)

# Convert data to DMatrix format for XGBoost
train_labels = np.argmax(train_labels, axis=1)
test_labels = np.argmax(test_labels, axis=1)
dtrain = xgb.DMatrix(train_features, label=train_labels)
dtest = xgb.DMatrix(test_features, label=test_labels)

#specify parameters
param = {
    'max_depth': 11,  # maximum tree depth
    'eta': 0.3,  #learning rate
    'objective': 'multi:softprob',  #loss function
    'num_class': 3,
    'tree_method': 'gpu_hist'}  #uses gpu

#train model
num_round = 30 #number of training rounds
bst = xgb.train(param, dtrain, num_round)

#make prediction
preds = bst.predict(dtest)
preds_class = np.argmax(preds, axis=1)

#calculate accuracy
accuracy = accuracy_score(test_labels, preds_class)
print("Accuracy: %.2f%%" % (accuracy * 100.0))


In [None]:
from sklearn.metrics import classification_report
print(classification_report(test_labels, preds_class))

              precision    recall  f1-score   support

           0       0.48      0.59      0.53      2509
           1       0.65      0.71      0.68      2635
           2       0.55      0.30      0.39      1739

    accuracy                           0.56      6883
   macro avg       0.56      0.53      0.53      6883
weighted avg       0.57      0.56      0.55      6883



## Hyperparameter tuning with grid search

In [None]:
from sklearn.model_selection import GridSearchCV
import xgboost as xgb
from sklearn.metrics import accuracy_score

train_features, test_features, train_labels, test_labels = split_data(epochs_by_patient, subj_types, 0.1)
train_features, test_features, train_labels, test_labels = prep_data(train_features, test_features, train_labels, test_labels)

#convert data to DMatrix format for XGBoost
train_labels = np.argmax(train_labels, axis=1)
test_labels = np.argmax(test_labels, axis=1)
dtrain = xgb.DMatrix(train_features, label=train_labels)
dtest = xgb.DMatrix(test_features, label=test_labels)

#create a dictionary of hyperparameters
param_grid = {
    'max_depth': range(1, 10),
    'n_estimators': range(1, 50, 5)  # number of rounds/trees
}

#create base model
xgb_model = xgb.XGBClassifier(objective='multi:softprob', num_class=3, tree_method='gpu_hist')

#instantiate grid search model
grid_search = GridSearchCV(estimator=xgb_model, param_grid=param_grid, cv=3, scoring='accuracy', verbose=3)


#fit the grid search to the data
grid_search.fit(train_features, train_labels)

#print the best parameters
print("Best parameters: ", grid_search.best_params_)

#print the performance of each combination
cv_results = grid_search.cv_results_
for mean_score, params in zip(cv_results["mean_test_score"], cv_results["params"]):
    print(params, " Mean accuracy: ", mean_score)

#use the best model to make predictions
best_model = grid_search.best_estimator_
preds = best_model.predict(test_features)

#calculate accuracy
accuracy = accuracy_score(test_labels, preds)
print("Accuracy: %.2f%%" % (accuracy * 100.0))


In [None]:
import torch.nn.functional as F
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

model.eval()
predictions = []
true_labels = []

with torch.no_grad():
    for batch_features, batch_labels in test_dataloader:
        batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
        output = model(batch_features)

        #get prediction
        pred_probabilities = F.softmax(output, dim=1)
        _, predicted = torch.max(pred_probabilities.data, 1)

        #store predictions
        predictions.extend(predicted.cpu().numpy().tolist())
        true_labels.extend(torch.argmax(batch_labels, dim=1).cpu().numpy().tolist())

#Classification report
from sklearn.metrics import classification_report
print(classification_report(true_labels, predictions))

#Confusion matrix
conf_mat = confusion_matrix(true_labels, predictions)

plt.figure(figsize=(10, 10))
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', cbar = False)
plt.ylabel('Actual')
plt.xlabel('Predicted')
class_names = ["Alzheimer's", "Control", 'FT Dementia']
plt.xticks(ticks=np.arange(len(class_names))+.5, labels=class_names, ha='center')
plt.yticks(ticks=np.arange(len(class_names))+.5, labels=class_names)
plt.show()

## Implement CNN


In [None]:
#Create dataset class
class EEGDataset(Dataset):
    def __init__(self, data, labels=None, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]

        if self.transform:
            sample = self.transform(sample)

        if self.labels is not None:
            return sample, self.labels[idx]
        else:
            return sample

#Create dataloader for spectrograms

def create_dataloader(data, labels=None, transform=None, batch_size=32, shuffle=True):
    dataset = EEGDataset(data, labels, transform)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    return dataloader


In [None]:
from sklearn.preprocessing import LabelEncoder
all_spectrograms = all_spectrograms.astype('float32')
labelcoder = LabelEncoder()
encoded_labels = labelcoder.fit_transform(all_labels)

#To convert back after predictions
# y_pred_labels = labelcoder.inverse_transform(y_pred)

dataloader = create_dataloader(all_spectrograms, encoded_labels, batch_size=32, shuffle=True)

In [None]:
import torch
import torch.nn as nn

class AlexNet(nn.Module):

    def __init__(self, num_classes=3):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(19, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x


In [None]:
#Check for device
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

In [None]:
num_epochs = 15
batch_size = 128
learning_rate = 1e-5

model = AlexNet()
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
criterion = nn.CrossEntropyLoss()
model.train()
loss_tracker = torch.zeros(num_epochs)

for i in tqdm(range(num_epochs)):
  for (data, label) in dataloader:
    label = torch.flatten(label)
    data, label = data.to(device), label.to(device)
    optimizer.zero_grad()
    outputs = model.forward(data)
    loss = criterion(outputs, label)
    loss.backward()
    optimizer.step()

  loss_tracker[i] = loss.item()
  print(loss.item())

In [None]:
size_dataset = len(dataloader.dataset)
correct = 0
with torch.no_grad():
    #Iterate through all datapoints in dataloader
    for (data, label) in dataloader:
      label = torch.flatten(label)
      data, label = data.to(device), label.to(device)
      outputs = model(data)
      correct += (outputs.argmax(dim = 1)==label).sum().item()

accuracy = correct / size_dataset

In [None]:
class CNN(nn.Module):

    def __init__(self, num_classes=3):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(19, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 192, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(192, 384, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x


In [None]:
import os
import numpy as np
import random
from itertools import cycle
from sklearn.model_selection import train_test_split

# Load all files
all_files = os.listdir('/path/to/your/directory')
# Get unique subjects
subjects = set(file.split('_')[1] for file in all_files)

# Split subjects into train and test
train_subjects, test_subjects = train_test_split(list(subjects), test_size=0.2, random_state=42)

# Group files by subject and label
train_files = {label: [] for label in ('class1', 'class2', 'class3')}
test_files = {label: [] for label in ('class1', 'class2', 'class3')}
for file in all_files:
    subject = file.split('_')[1]
    label = file.split('_')[2]  # assuming the label is the third element when splitting by '_'
    if subject in train_subjects:
        train_files[label].append(file)
    else:
        test_files[label].append(file)

# Calculate class proportions
total_train_files = sum(len(files) for files in train_files.values())
train_class_proportions = {label: len(files) / total_train_files for label, files in train_files.items()}

def shuffle_in_unison(a, b):
    assert len(a) == len(b)
    combined = np.c_[a, b]  # Stack the two lists column-wise
    np.random.shuffle(combined)  # Shuffle the combined array
    return combined[:, 0], combined[:, 1]  # Split the shuffled array into two

def load_data(file):
    # Extract label from filename
    label = file.split('_')[2]  # assuming the label is the third element when splitting by '_'

    # Load data from file
    data = np.load(file)  # You need to adjust this line based on where your files are stored

    # Create a label array of the same length as the data
    labels = np.full(len(data), label)

    return data, labels

def data_generator(label_groups, class_proportions, batch_size):
    # Create an iterator for each group
    iterators = {label: iter(cycle(files)) for label, files in label_groups.items()}

    while True:
        batch_data = []
        batch_labels = []
        # For each class, sample proportional to its representation
        for label, proportion in class_proportions.items():
            num_samples = round(proportion * batch_size)
            file = next(iterators[label])
            # Load the data and labels from the file
            data, labels = load_data(file)

            # Add a proportional amount of data and labels to the batch
            batch_data.extend(data[:num_samples])
            batch_labels.extend(labels[:num_samples])

        # Shuffle the data and labels in the same order to prevent ordering bias
        shuffle_in_unison(batch_data, batch_labels)

        # Yield the batch
        yield np.array(batch_data), np.array(batch_labels)

# Use the data generator to train the model
model.fit(data_generator(train_files, train_class_proportions, batch_size=64), steps_per_epoch=100, epochs=10)
