### Import data

In [None]:
import pickle
import numpy as np

def read_data(filename):
    x = pickle._Unpickler(open(filename, 'rb'))
    x.encoding = 'latin1'
    data = x.load()
    return data

files = []
for n in range(1, 33):
    s = ''
    if n < 10:
        s += '0'
    s += str(n)
    files.append(s)

labels = []
data = []
for i in files:
    filepath = "archive/data_preprocessed_python/s" + i + ".dat"
    d = read_data(filepath)
    labels.append(d['labels'])
    data.append(d['data'])



#### Reshape data and select EEG channels

In [None]:

labels = np.array(labels).reshape(1280, 4)
data = np.array(data).reshape(1280, 40, 8064)
eeg_data = data[:,:32,:] # Select the 32 EEG channels
print(eeg_data.shape)
print(labels.shape)


#### Visualize data

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Plot overlapping histograms for the first and second columns of labels
plt.figure(figsize=(12, 6))

sns.histplot(labels[:, 0], bins=20, color='blue', alpha=0.7, label='Valence')
sns.histplot(labels[:, 1], bins=20, color='green', alpha=0.7, label='Arousal')

plt.title('Label histogram')
plt.xlabel('Value')
plt.ylabel('Frequency')
plt.legend()

plt.tight_layout()
plt.show()


#### Compute Power Spectral Density of brain signals for each brain wave band

In [None]:
from scipy.signal import welch
from scipy.integrate import simps

def bandpower(data, sf, band):
    band = np.asarray(band)
    low, high = band
    nperseg = int((2 / low) * sf)
    freqs, psd = welch(data, sf, nperseg=nperseg)
    freq_res = freqs[1] - freqs[0]
    idx_band = np.logical_and(freqs >= low, freqs <= high)
    bp = simps(psd[idx_band], dx=freq_res)
    return bp

def get_band_power(data, sf, channel, band):
    bands = {
        "delta": (0.5, 4),
        "theta": (4, 8),
        "alpha": (8, 12),
        "beta": (12, 30),
        "gamma": (30, 64)
    }
    return bandpower(data[channel], sf, bands[band])

sf = 128  # Sampling frequency
bands = ["delta", "theta", "alpha", "beta", "gamma"]

eeg_band = np.zeros((eeg_data.shape[0], eeg_data.shape[1] * len(bands)))

for i in range(eeg_data.shape[0]):
    for j in range(eeg_data.shape[1]):
        for k, band in enumerate(bands):
            eeg_band[i, j * len(bands) + k] = get_band_power(eeg_data[i], sf, j, band)

print(eeg_band.shape)


In [None]:
import pandas as pd
pd.DataFrame(eeg_band).head(10)

#### Encode labels

In [None]:
label_name = ["valence","arousal"]
labels_valence = []
labels_arousal = []

#9 classes
#labels_valence = ["LOW" if la[0] < 3 else "MEDIUM" if la[0] < 6 else "HIGH" for la in labels] 
#labels_arousal = ["LOW" if la[1] < 3 else "MEDIUM" if la[1] < 6 else "HIGH" for la in labels]

#4 classes
labels_valence = ["LOW" if lab[0] < 5 else "HIGH" for lab in labels]
labels_arousal = ["LOW" if lab[1] < 5 else "HIGH" for lab in labels]

combined_labels = np.array([labels_valence[i] + '_' + labels_arousal[i] for i in range(len(labels_valence))]).reshape(-1,1)


from sklearn.preprocessing import OrdinalEncoder
enc = OrdinalEncoder()
enc.fit(combined_labels)
enc.categories_
yData_enc = enc.transform(combined_labels).reshape(-1)

num_categories = len(np.unique(yData_enc))
num_categories

In [None]:
pd.DataFrame({'Valence': np.array(labels_valence).reshape(-1), 'Arousal': np.array(labels_arousal).reshape(-1),'combined_labels': combined_labels.reshape(-1), 'yData_enc': yData_enc.reshape(-1)}).head(10)

#### Extract polynomial features and select the ones that explain variance (PCA) - Include this in the report ? - Generate xData1

In [None]:
from sklearn.decomposition import PCA
from sklearn import preprocessing

xData_eeg_band = eeg_band

poly = preprocessing.PolynomialFeatures(degree=2)
pca = PCA(n_components=500)

xData_eeg_band = poly.fit_transform(xData_eeg_band)
xData_eeg_band=pca.fit_transform(xData_eeg_band)

xData_eeg_band = preprocessing.scale(xData_eeg_band)
xData_eeg_band.shape

#### K-nearest neighbors classifier

In [None]:
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(xData_eeg_band, yData_enc, test_size=0.2, random_state=42)

# Loop over different values of n_neighbors
neighbors = range(1, 21)
train_scores = []
test_scores = []

for n in neighbors:
    modelKNN = KNeighborsClassifier(n_neighbors=n)
    modelKNN.fit(X_train, y_train)
    
    train_score = modelKNN.score(X_train, y_train)
    test_score = modelKNN.score(X_test, y_test)
    
    train_scores.append(train_score)
    test_scores.append(test_score)
    
    print(f"n_neighbors: {n}, train_score: {train_score:.4f}, test_score: {test_score:.4f}")

print("Best number of neighbors: ",neighbors[np.argmax(test_scores)])

# Plot the results
plt.figure(figsize=(10, 5))
plt.plot(neighbors, train_scores, label='Train Accuracy', marker='o')
plt.plot(neighbors, test_scores, label='Test Accuracy', marker='o')
plt.xlabel('Number of Neighbors')
plt.ylabel('Accuracy')
plt.title('KNN Accuracy for Different n_neighbors')
plt.legend()
plt.grid(True)
plt.show()


test_score = {}


modelKNN = KNeighborsClassifier(n_neighbors = neighbors[np.argmax(test_scores)])
modelKNN.fit(X_train, y_train)
test_score['KNN'] = modelKNN.score(X_test, y_test)
    

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report

# Predict the labels for the test set
y_pred = modelKNN.predict(X_test)

# Compute the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=np.unique(y_test))
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for KNN Classifier')
plt.show()

# Generate a classification report
report = classification_report(y_test, y_pred, target_names=enc.categories_[0])
print(report)

#### Define Neural Netwrok models

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

# Define the neural network model
class LinearNeuralNet(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(LinearNeuralNet, self).__init__()
        # First fully connected layer
        self.fc1 = nn.Linear(input_size, hidden_size)
        # Second fully connected layer
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        # Third fully connected layer
        self.fc3 = nn.Linear(hidden_size, hidden_size)

        self.fc4 = nn.Linear(hidden_size, hidden_size)
        self.fc5 = nn.Linear(hidden_size, hidden_size)

        # Output layer
        self.fcFinal = nn.Linear(hidden_size, num_classes)

        # Activations, Dropout, and Batch Normalization
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.3)  # Dropout with 30% probability
        self.batchnorm1 = nn.BatchNorm1d(hidden_size)
        self.batchnorm2 = nn.BatchNorm1d(hidden_size)

    def forward(self, x):
        # Forward pass with batch normalization and dropout
        out = self.fc1(x)
        out = self.batchnorm1(out)
        out = self.relu(out)
        out = self.dropout(out)

        out = self.fc2(out)
        out = self.batchnorm2(out)
        out = self.relu(out)
        out = self.dropout(out)

        out = self.fc3(out)
        out = self.batchnorm2(out)
        out = self.relu(out)
        out = self.dropout(out)

        out = self.fc4(out)
        out = self.batchnorm2(out)
        out = self.relu(out)
        out = self.dropout(out)


        out = self.fc5(out)
        out = self.relu(out)

        out = self.fcFinal(out)
        return out
    


    import torch


class ConvNeuralNet(nn.Module):
    def __init__(self, num_classes):
        super(ConvNeuralNet, self).__init__()
        
        # First convolutional layer
        self.conv1 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=7, stride=1, padding=3)
        self.bn1 = nn.BatchNorm1d(64)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Second convolutional layer
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Third convolutional layer
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)
        
        # Fully connected layers
        self.fc1 = nn.Linear(256 * (8064 // 8), 512)  # Adjust to account for pooling
        self.fc2 = nn.Linear(512, 256)  # New hidden layer
        self.dropout = nn.Dropout(0.5)
        self.fc3 = nn.Linear(256, num_classes)  # Output layer
        
    def forward(self, x):
        x = self.pool1(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool2(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool3(torch.relu(self.bn3(self.conv3(x))))
        
        # Flatten for fully connected layers
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))  # New hidden layer activation
        x = self.fc3(x)
        
        return x


class EEGNet(nn.Module):
    def __init__(self, num_channels=32, num_samples=8064, num_classes=9, dropout_rate=0.5):
        super(EEGNet, self).__init__()
        
        # First block: Temporal Convolution
        self.temporal_conv = nn.Conv2d(1, 8, (1, 64), padding=(0, 32), bias=False)
        self.batchnorm1 = nn.BatchNorm2d(8)
        
        # Second block: Depthwise Convolution
        self.depthwise_conv = nn.Conv2d(8, 16, (num_channels, 1), groups=8, bias=False)
        self.batchnorm2 = nn.BatchNorm2d(16)
        self.activation = nn.ELU()
        self.avgpool1 = nn.AvgPool2d((1, 4))
        self.dropout1 = nn.Dropout(dropout_rate)
        
        # Third block: Separable Convolutions
        self.separable_conv1 = nn.Conv2d(16, 16, (1, 16), padding=(0, 8), bias=False)
        self.batchnorm3 = nn.BatchNorm2d(16)
        self.separable_conv2 = nn.Conv2d(16, 16, (1, 1), bias=False)
        self.batchnorm4 = nn.BatchNorm2d(16)
        self.avgpool2 = nn.AvgPool2d((1, 8))
        self.dropout2 = nn.Dropout(dropout_rate)
        
        # Final Fully Connected Layer
        self.fc = nn.Linear(16 * (num_samples // (4 * 8)), num_classes)
    
    def forward(self, x):
        # Temporal Convolution
        x = self.temporal_conv(x)
        x = self.batchnorm1(x)
        x = x ** 2  # Squaring non-linearity
        
        # Depthwise Convolution
        x = self.depthwise_conv(x)
        x = self.batchnorm2(x)
        x = self.activation(x)
        x = self.avgpool1(x)
        x = self.dropout1(x)
        
        # Separable Convolutions
        x = self.separable_conv1(x)
        x = self.batchnorm3(x)
        x = self.separable_conv2(x)
        x = self.batchnorm4(x)
        x = self.activation(x)
        x = self.avgpool2(x)
        x = self.dropout2(x)
        
        # Flatten for Fully Connected layer
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        
        return x


# Function to calculate accuracy
def calculate_accuracy(outputs, targets):
    _, predicted = torch.max(outputs, 1)  # Get index of the max log-probability
    correct = (predicted == targets).sum().item()
    accuracy = correct / targets.size(0)
    return accuracy


In [None]:
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import StepLR

batch_size = 32
num_epochs = 50
learning_rate = 0.01
hidden_size = 128

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   # Use GPU if available


In [None]:
# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(xData_eeg_band, yData_enc, test_size=0.2, random_state=42)

# Create DataLoader for training and validation sets
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Linear Neural Network
model_linear = LinearNeuralNet(input_size=500, hidden_size=128, num_classes=num_categories).to(device)
optimizer_linear = optim.Adam(model_linear.parameters(), lr=learning_rate)
scheduler_linear = StepLR(optimizer_linear, step_size=50, gamma=0.1)
criterion = nn.CrossEntropyLoss()

print("Training LinearNN...")
for epoch in range(num_epochs):
    model_linear.train()
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model_linear(batch_X)
        loss = criterion(outputs, batch_y)
        optimizer_linear.zero_grad()
        loss.backward()
        optimizer_linear.step()
    scheduler_linear.step()
    model_linear.eval()
    val_loss = 0
    val_accuracy = 0
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model_linear(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            val_accuracy += calculate_accuracy(outputs, batch_y)
    val_loss /= len(val_loader)
    val_accuracy /= len(val_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Learning Rate: {scheduler_linear.get_last_lr()[0]:.6f}')
print("Finished training LinearNN.\n")
test_score["LinearNN"] = val_accuracy


In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Predict the labels for the test set
y_pred = modelKNN.predict(X_test)

# Compute the confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=enc.categories_[0])
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for KNN Classifier')
plt.show()

In [None]:
# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(eeg_data, yData_enc, test_size=0.2, random_state=42)

# Create DataLoader for training and validation sets
train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Conv Neural Network
model_conv = ConvNeuralNet(4).to(device)
optimizer_conv = optim.Adam(model_conv.parameters(), lr=learning_rate)
scheduler_conv = StepLR(optimizer_conv, step_size=50, gamma=0.1)

print("Training ConvNN...")
for epoch in range(num_epochs):
    model_conv.train()
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model_conv(batch_X)
        loss = criterion(outputs, batch_y)
        optimizer_conv.zero_grad()
        loss.backward()
        optimizer_conv.step()
    scheduler_conv.step()
    model_conv.eval()
    val_loss = 0
    val_accuracy = 0
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model_conv(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            val_accuracy += calculate_accuracy(outputs, batch_y)
    val_loss /= len(val_loader)
    val_accuracy /= len(val_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Learning Rate: {scheduler_conv.get_last_lr()[0]:.6f}')
print("Finished training ConvNN.\n")
test_score["ConvNN"] = val_accuracy


In [None]:
# ConvNN Confusion Matrix
model_conv.eval()
y_pred_conv = []
with torch.no_grad():
    for batch_X, batch_y in val_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model_conv(batch_X)
        _, predicted = torch.max(outputs, 1)
        y_pred_conv.extend(predicted.cpu().numpy())

cm_conv = confusion_matrix(y_val, y_pred_conv)
disp_conv = ConfusionMatrixDisplay(confusion_matrix=cm_conv, display_labels=enc.categories_[0])
disp_conv.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for ConvNN Classifier')
plt.show()


In [None]:
# EEGNet
model_eegnet = EEGNet(32, 8064, 4, 0.5).to(device)
optimizer_eegnet = optim.Adam(model_eegnet.parameters(), lr=learning_rate)
scheduler_eegnet = StepLR(optimizer_eegnet, step_size=50, gamma=0.1)

print("Training EEGNet...")
for epoch in range(num_epochs):
    model_eegnet.train()
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        # Adjust input shape for EEGNet
        batch_X = batch_X.unsqueeze(1)  # Add channel dimension
        outputs = model_eegnet(batch_X)
        loss = criterion(outputs, batch_y)
        optimizer_eegnet.zero_grad()
        loss.backward()
        optimizer_eegnet.step()
    scheduler_eegnet.step()
    model_eegnet.eval()
    val_loss = 0
    val_accuracy = 0
    with torch.no_grad():
        for batch_X, batch_y in val_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            batch_X = batch_X.unsqueeze(1)  # Add channel dimension
            outputs = model_eegnet(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item()
            val_accuracy += calculate_accuracy(outputs, batch_y)
    val_loss /= len(val_loader)
    val_accuracy /= len(val_loader)
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Learning Rate: {scheduler_eegnet.get_last_lr()[0]:.6f}')
print("Finished training EEGNet.\n")
test_score["EEGNet"] = val_accuracy

# Save the models
torch.save(model_linear.state_dict(), 'LinearNN_model.pth')
torch.save(model_conv.state_dict(), 'ConvNN_model.pth')
torch.save(model_eegnet.state_dict(), 'EEGNet_model.pth')

In [None]:

# EEGNet Confusion Matrix
model_eegnet.eval()
y_pred_eegnet = []
with torch.no_grad():
    for batch_X, batch_y in val_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        batch_X = batch_X.unsqueeze(1)  # Add channel dimension
        outputs = model_eegnet(batch_X)
        _, predicted = torch.max(outputs, 1)
        y_pred_eegnet.extend(predicted.cpu().numpy())

cm_eegnet = confusion_matrix(y_val, y_pred_eegnet)
disp_eegnet = ConfusionMatrixDisplay(confusion_matrix=cm_eegnet, display_labels=enc.categories_[0])
disp_eegnet.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix for EEGNet Classifier')
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Extract model names and their corresponding test scores
model_names = list(test_score.keys())
scores = list(test_score.values())

# Create a bar plot
plt.figure(figsize=(10, 6))
plt.bar(model_names, scores, color=['blue', 'green', 'red'])

# Add title and labels
plt.title('Test Score of Each Model')
plt.xlabel('Model')
plt.ylabel('Test Score')

# Show the plot
plt.show()