In [None]:
import os
import numpy as np
import pickle
from sklearn.preprocessing import OneHotEncoder
from torch.utils.data import TensorDataset, DataLoader
import torch.nn as nn
import torch.optim as optim 
import torch


In [None]:
file_path = "/Users/hrakol/Desktop/Thesis EEG"

Get features

In [None]:
#Keep only npz files
files =[f for f in os.listdir(file_path) if f.endswith(".npz")]
all_data = {}

for filename in files:
    data = np.load(os.path.join(file_path, filename))
    filename = filename[:-4] #removes npz
    
    features = data["features"]
    user_id = data["user_id"]
    stimuli_sequence = data["stimuli_sequence"]
    
    all_data[filename] = {"features": features, 
                          "user_id": int(user_id), 
                          "stimuli_sequence": stimuli_sequence,
                          "extraction_model": int(filename[-1:]) #Feature Extraction Model
                          }

Cosine Similarity

Averages between each session and for features from the same acoustic stimuli

In [None]:
#We need to compute the averages to use later for the cosine similarity model 

#Set so only unique values stay
user_ids = set(info["user_id"] for info in all_data.values())
av_per_user = {}

for user_id in user_ids:
    #Feutures for each user in list 
    user_features_1 = [
                        np.array(info["features"])
                        for _, info in all_data.items() #_, because we don't care about the filename
                        if info["user_id"] == user_id and info["extraction_model"] == 1
                    ]
    
    stim_sequece_1 = [
                        info["stimuli_sequence"]
                        for _, info in all_data.items()
                        if info["user_id"] == user_id and info["extraction_model"] == 1
                    ]
    
    user_features_2 = [
                        np.array(info["features"])
                        for _, info in all_data.items() 
                        if info["user_id"] == user_id and info["extraction_model"] == 2
                    ]
    
    stim_sequece_2 = [
                        info["stimuli_sequence"]
                        for _, info in all_data.items() 
                        if info["user_id"] == user_id and info["extraction_model"] == 2
                    ]
    
    #Average per session for each user per model
    session_avgs_1 = [np.mean(feature, axis = 0) for feature in user_features_1]
    session_avgs_2 = [np.mean(feature, axis = 0) for feature in user_features_2]
    
    #Get each embedding array from the user_features
    each_freq_av = {}
        
    for cl_num in set(stim_sequece_1[0]):
        temp_freq_list = []
        for i in range(len(user_features_1)):
            for j in range(len(stim_sequece_1[i])):
                if stimuli_sequence[i][j] == cl_num:
                    temp_freq_list.append(user_features_1[i][j])
        each_freq_av[(cl_num, 1)] = np.mean(temp_freq_list, axis = 0)

    for cl_num in set(stim_sequece_2[0]):
        temp_freq_list = []
        for i in range(len(user_features_2)):
            for j in range(len(stim_sequece_2[i])):
                if stimuli_sequence[i][j] == cl_num:
                    temp_freq_list.append(user_features_2[i][j])
        each_freq_av[(cl_num, 2)] = np.mean(temp_freq_list, axis = 0)


    #Get the averages per user for each model
    av_per_user[(user_id, 1)] = np.mean(session_avgs_1, axis = 0)
    av_per_user[(user_id, 2)] = np.mean(session_avgs_2, axis = 0)

In [None]:
#Save as pickle file to preserve dictionary structure
with open("each_freq_av.pkl" "wb") as f:
    pickle.dump(each_freq_av, f)
    
with open("av_per_user.pkl" "wb") as f:
    pickle.dump(av_per_user, f)

Neural Network

In [None]:
#Classification Layer for the data without the labels
class UserClassifier(nn.Module):
    def __init__(self, embedding_dim, n_users):
        super().__init__()
        self.fc = nn.Linear(embedding_dim, n_users)

    def forward(self, x):
        return self.fc(x)

In [None]:
#NN with classification layer for features with stimuli label 
class EEGUserClassifier(nn.Module):
    def __init__(self, input_dim, n_users):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, n_users)
        )

    def forward(self, x):
        return self.net(x)

In [None]:
#Check if the system has a GPU
cuda = torch.cuda.is_available()
device = "cuda" if cuda else "cpu"
if cuda:
    torch.backends.cudnn.benchmark = True 

In [None]:
#Training model for both Classifiers 
def train_classifier(model, train_loader, n_epochs = 10, lr = 1e-3, device = "cpu"):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss() #loss for classification
    optimizer = optim.Adam(model.parameters(), lr = lr) #learning rate(weight updates)

    for epoch in range(n_epochs):
        model.train()
        total_loss, correct, total = 0, 0, 0

        for x_batch, y_batch in train_loader:
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            preds = outputs.argmax(dim = 1)
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

        acc = correct / total
        print(f"Epoch {epoch+1}/{n_epochs} | Loss={total_loss:.4f} | Acc={acc:.4f}")

    return model

In [None]:
#Get the features and the user_ids (plus the stimuli sequence to use later)
features_1 = []
stimuli_1 = []
user_ids_1 = []
for info in all_data.values(): 
    if info["extraction_model"] == 1:
        features_1.append(np.array(info["features"]))
        stimuli_1.append(np.array(info["stimuli_sequence"]))
        user_ids_1.append(info["user_id"])

#Now for the second model
features_2 = []
stimuli_2 = []
user_ids_2 = []
for info in all_data.values(): 
    if info["extraction_model"] == 2:
        features_2.append(np.array(info["features"]))
        stimuli_2.append(np.array(info["stimuli_sequence"]))
        user_ids_2.append(info["user_id"])

In [None]:
#Only features Datasets
x_1 = torch.tensor(np.vstack(features_1), dtype = torch.float32)
y_1 = torch.tensor(np.hstack(user_ids_1), dtype = torch.long)

x_2 = torch.tensor(np.vstack(features_2), dtype = torch.float32)
y_2 = torch.tensor(np.hstack(user_ids_2), dtype = torch.long)

dataset_1 = TensorDataset(x_1, y_1)
loader_1 = DataLoader(dataset_1, batch_size = 32, shuffle = True)

dataset_2 = TensorDataset(x_2, y_2)
loader_2 = DataLoader(dataset_2, batch_size = 32, shuffle = True)

In [None]:
#Train the feature-only model
model1 = UserClassifier(x_1.shape[1], len(torch.unique(y_1)))
trained_model1 = train_classifier(model1, loader_1, n_epochs = 10)

model2 = UserClassifier(x_2.shape[1], len(torch.unique(y_2)))
trained_model1 = train_classifier(model2, loader_2, n_epochs = 10)

In [None]:
#Add the stimuli labels to each feature with OneHotEncoder
stimuli_1 = np.hstack(stimuli_1)
stimuli_2 = np.hstack(stimuli_2)

stimuli_1 = stimuli_1.reshape(-1,1)
stimuli_2 = stimuli_2.reshape(-1,1)

encoder = OneHotEncoder(sparse_output = False)
stimuli_1 = encoder.fit_transform(stimuli_1)
stimuli_2 = encoder.fit_transform(stimuli_2)

#Add the encoded stimuli labels to the corresponding feature
features_1 = np.hstack([features_1, stimuli_1])
features_2 = np.hstack([features_2, stimuli_2])

In [None]:

x_1 = torch.tensor(np.vstack(features_1), dtype = torch.float32)
y_1 = torch.tensor(np.hstack(user_ids_1), dtype = torch.long)

x_2 = torch.tensor(np.vstack(features_2), dtype = torch.float32)
y_2 = torch.tensor(np.hstack(user_ids_2), dtype = torch.long)

dataset_1 = TensorDataset(x_1, y_1)
loader_1 = DataLoader(dataset_1, batch_size = 32, shuffle = True)

dataset_2 = TensorDataset(x_2, y_2)
loader_2 = DataLoader(dataset_2, batch_size = 32, shuffle = True)

In [None]:
#Train the feature-label model
model3 = EEGUserClassifier(x_1.shape[1], len(torch.unique(y_1)))
trained_model3 = train_classifier(model3, loader_1, n_epochs = 10)

model4 = EEGUserClassifier(x_2.shape[1], len(torch.unique(y_2)))
trained_model4 = train_classifier(model4, loader_2, n_epochs = 10)

Save all the models

In [None]:
torch.save(model1.state_dict(), "user_classifier_1.pth")
torch.save(model2.state_dict(), "user_classifier_2.pth")
torch.save(model3.state_dict(), "user_classifier_w_labels_1.pth")
torch.save(model4.state_dict(), "user_classifier_w_labels_2.pth")