In [None]:
model_name = 'conformer'
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import numpy as np

from braindecode.models import EEGConformer

from tensorflow.keras import utils as np_utils
from torch.utils.data import DataLoader, TensorDataset, random_split
from processing_eeg_methods.data_utils import (
    get_dataset_basic_info,
)
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from processing_eeg_methods.data_loaders import load_data_labels_based_on_dataset
from processing_eeg_methods.share import datasets_basic_infos
from collections import Counter

dataset_name = "braincommand"  # Only two things I should be able to change

dataset_info = get_dataset_basic_info(datasets_basic_infos, dataset_name)

data_path = r"C:\Users\rosit\Documents\workprojects\bci_complete\EEG-Classifiers-Ensemble\Datasets\braincommand_dataset"
subject_ids = range(1, 27)
results_df = pd.DataFrame(columns=['subject_id', 'test_accuracy', 'f1_score_macro'])

# Loop through each subject
for subject_id in subject_ids:

    epochs_calibration, X_calibration, y_calibration_original = load_data_labels_based_on_dataset(
        dataset_info,
        subject_id,
        data_path,
        game_mode="calibration",
    )

    epochs_singleplayer, X_singleplayer, y_singleplayer_original = load_data_labels_based_on_dataset(
        dataset_info,
        subject_id,
        data_path,
        game_mode="singleplayer",
    )
    y_calibration = [0] * len(y_calibration_original)
    y_singleplayer = [1] * len(y_singleplayer_original)

    X = np.concatenate((X_calibration, X_singleplayer), axis=0)
    y = y_calibration + y_singleplayer

    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.5, random_state=42)

    # Split the temp data into 50% test and 50% validation, resulting in 25% of the original data each
    X_test, X_val, y_test, y_val = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
    print(y_train)
    num_classess = len(set(y_train))
    print(f'There are {num_classess} unique classes in the dataset')

    kernels, chans, samples = 1, dataset_info["#_channels"], dataset_info["samples"]

    # y_train = y_train - 1
    # y_val = y_val - 1
    # y_test = y_test - 1

    # X_train      = X_train.reshape(X_train.shape[0], chans, samples)
    # X_val   = X_val.reshape(X_val.shape[0], chans, samples)
    # X_test       = X_test.reshape(X_test.shape[0], chans, samples)

    print('X_train shape:', X_train.shape)
    print(X_train.shape[0], 'train samples')
    print(X_test.shape[0], 'test samples')

    # Convert data to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)  # Keep as integers

    X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_val, dtype=torch.long)

    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    # Create datasets and loaders
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

    train_loader_size = len(train_loader)
    print(f"Number of batches in train_loader: {train_loader_size}")

    counts = Counter(y)

    total_samples = sum(counts.values())

    num_classes = len(counts)

    class_weights = {class_label - 1: total_samples / (num_classes * count) for class_label, count in counts.items()}

    print(class_weights)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # device = 'cpu'

    n_classes = dataset_info["#_class"]
    classes = list(range(n_classes))

    n_outputs = dataset_info["#_class"]
    n_chans = dataset_info["#_channels"]
    n_filters_time = 40
    filter_time_length = 25
    pool_time_length = 75
    pool_time_stride = 15
    drop_prob = 0.5
    att_depth = 6
    att_heads = 10
    att_drop_prob = 0.5
    final_fc_length = 640
    return_features = False
    n_times = dataset_info["samples"]
    chs_info = None
    input_window_seconds = None
    sfreq = dataset_info["sample_rate"]
    add_log_softmax = True

    # Initialize the EEGConformer model
    conformer = EEGConformer(
        n_outputs=n_outputs,
        n_chans=n_chans,
        n_filters_time=n_filters_time,
        filter_time_length=filter_time_length,
        pool_time_length=pool_time_length,
        pool_time_stride=pool_time_stride,
        drop_prob=drop_prob,
        att_depth=att_depth,
        att_heads=att_heads,
        att_drop_prob=att_drop_prob,
        final_fc_length=final_fc_length,
        return_features=return_features,
        n_times=n_times,
        chs_info=chs_info,
        input_window_seconds=input_window_seconds,
        sfreq=sfreq,
        add_log_softmax=add_log_softmax
    )
    conformer = conformer.to(device)

    print(conformer)

    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(conformer.parameters(), lr=0.0002, betas=[0.5, 0.999])

    import torch

    conformer.to(device)

    # Create a dummy input with the correct shape and move it to the same device
    dummy_input = torch.randn(kernels, chans, samples).to(device)  # Batch size

    # Pass the dummy input through the model up to the transformer encoder
    try:
        with torch.no_grad():  # Disable gradient calculation for inference
            x = torch.unsqueeze(dummy_input, dim=1)  # Add an extra dimension to match input shape
            x = conformer.patch_embedding(x)  # Pass through Patch Embedding
            x = conformer.transformer(x)  # Pass through Transformer Encoder

            # Get the shape after the transformer and calculate the new `final_fc_length`
            print(f"Output shape after transformer: {x.shape}")
            final_fc_length_calculated = x.shape[1] * x.shape[2]
            print(f"Calculated `final_fc_length`: {final_fc_length_calculated}")

    except Exception as e:
        print("Error during partial forward pass:", str(e))

    num_epochs = 250
    ## TRAIN
    for epoch in range(num_epochs):
        conformer.train()

        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()

            outputs = conformer(X_batch)
            loss = criterion(outputs, y_batch)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == y_batch).sum().item()
            total_predictions += y_batch.size(0)

        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = correct_predictions / total_predictions

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

        ## Validation
        conformer.eval()
        val_loss = 0.0
        correct_predictions = 0
        total_predictions = 0

        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)

                outputs = conformer(X_batch)
                loss = criterion(outputs, y_batch)
                val_loss += loss.item()

                # Calculate accuracy
                _, predicted = torch.max(outputs, 1)
                correct_predictions += (predicted == y_batch).sum().item()
                total_predictions += y_batch.size(0)

        val_loss /= len(val_loader)
        val_accuracy = correct_predictions / total_predictions

        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    ## TEST
    test_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    all_labels = []
    all_predictions = []

    criterion = torch.nn.CrossEntropyLoss()

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            outputs = conformer(X_batch)

            loss = criterion(outputs, y_batch)
            test_loss += loss.item()

            _, predicted = torch.max(outputs, 1)  # Get the class with the highest score
            correct_predictions += (predicted == y_batch).sum().item()
            total_predictions += y_batch.size(0)

            all_labels.extend(y_batch.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = correct_predictions / total_predictions
    f1 = f1_score(all_labels, all_predictions, average='macro')

    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"F1 Score (Macro): {f1:.4f}")

    temp_df = pd.DataFrame({'subject_id': [subject_id], 'test_accuracy': [test_accuracy], 'f1_score_macro': [f1]})

    # Concatenate the temporary dataframe with the results dataframe
    results_df = pd.concat([results_df, temp_df], ignore_index=True)

mean_accuracy = results_df['test_accuracy'].mean()
std_accuracy = results_df['test_accuracy'].std()
min_accuracy = results_df['test_accuracy'].min()
max_accuracy = results_df['test_accuracy'].max()
mean_f1 = results_df['f1_score_macro'].mean()
std_f1 = results_df['f1_score_macro'].std()
min_f1 = results_df['f1_score_macro'].min()
max_f1 = results_df['f1_score_macro'].max()

# Report the results
print(f"\n{'='*40}")
print(f"{'Overall Performance':^40}")
print(f"{'='*40}")
print(f"Mean Test Accuracy: {mean_accuracy:.4f}")
print(f"Test Accuracy Standard Deviation: {std_accuracy:.4f}")
print(f"Minimum Test Accuracy: {min_accuracy:.4f}")
print(f"Maximum Test Accuracy: {max_accuracy:.4f}")
print(f"Mean F1 Score (Macro): {mean_f1:.4f}")
print(f"F1 Score (Macro) Standard Deviation: {std_f1:.4f}")
print(f"Minimum F1 Score (Macro): {min_f1:.4f}")
print(f"Maximum F1 Score (Macro): {max_f1:.4f}")
print(f"{'='*40}")

# Save the results to a CSV file
results_df.to_csv(r"C:\Users\rosit\Documents\workprojects\bci_complete\EEG-Classifiers-Ensemble\Results\braincommand\FINAL\conformer_subject_results_cleaned.csv", index=False)
print("\nResults saved to conformer_subject_results.csv")

# Set the Seaborn palette
sns.set_palette("Set2")

# Create a boxplot for test accuracies
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
sns.boxplot(y=results_df['test_accuracy'])
plt.title('Boxplot of Test Accuracies per Subject')
plt.ylabel('Test Accuracy')

# Create a boxplot for F1 scores
plt.subplot(1, 2, 2)
sns.boxplot(y=results_df['f1_score_macro'])
plt.title('Boxplot of Macro F1 Scores per Subject')
plt.ylabel('Macro F1 Score')

plt.tight_layout()
plt.show()