## Importing Libraries

In [None]:
import pandas as pd
import numpy as np                                      # for dealing with data
from scipy.signal import butter, sosfiltfilt, sosfreqz  # for filtering
import matplotlib.pyplot as plt                         # for plotting
import seaborn as sns
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_curve, auc
import os
from os import listdir
from os.path import isfile, join, isdir

## Load data

In [None]:
train_data_list = np.load('./data/train_data.npy')
test_data_list = np.load('./data/test_data.npy')
print('Epoched training data shape: ' + str(train_data_list.shape)) #! (16, 340, 56, 140)
print('Epoched training data: ' , train_data_list)
print('Epoched testing data shape: ' + str(test_data_list.shape)) #! (10, 340, 56, 140)
print('Epoched testing data: ' , test_data_list)
Y_true_Labels_for_test = np.reshape(pd.read_csv('./data/true_labels.csv', header=None).values, 3400)
# Shape: (3400,)
print('Y_true_Labels_for_test: ', Y_true_Labels_for_test.shape)
unique, counts = np.unique(Y_true_Labels_for_test, return_counts=True)
print("Class distribution:")
for label, count in zip(unique, counts):
    print(f"Class {label}: {count}")
    
subj, trial, numChannel, sample = train_data_list.shape #! (16, 340, 30, 140)
X_train_valid = np.reshape(train_data_list, (-1, 1, numChannel, sample)) #! (5440, 1, 30, 140)
X_test = np.reshape(test_data_list, (-1, 1, numChannel, sample)) #! (3400, 1, 30, 140)

Y_train_valid = pd.read_csv('data/TrainLabels.csv')['Prediction'].values

print('subject: ', subj)
print('trial: ', trial)
print('numChannel: ', numChannel)
print('sample: ', sample)
print('X_train_valid: ', X_train_valid.shape)
print('X_test: ', X_test.shape)
print('Y_train_valid: ', Y_train_valid.shape)

## EEGNet

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Dropout
from tensorflow.keras.layers import Conv2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import SpatialDropout2D
from tensorflow.keras.layers import Input, Flatten
from tensorflow.keras.constraints import max_norm

def EEGNet(nb_classes, Chans = 56, Samples = 128, 
             dropoutRate = 0.5, kernLength = 64, F1 = 8, 
             D = 2, F2 = 16, norm_rate = 0.25, dropoutType = 'Dropout'):
    if dropoutType == 'SpatialDropout2D':
        dropoutType = SpatialDropout2D
    elif dropoutType == 'Dropout':
        dropoutType = Dropout
    else:
        raise ValueError('dropoutType must be one of SpatialDropout2D '
                         'or Dropout, passed as a string.')
            
    input1   = Input(shape = (1, Chans, Samples))
    
    ##################################################################
    block1       = Conv2D(F1, (1, kernLength), padding = 'same',
                                   input_shape = (1, Chans, Samples),
                                   use_bias = False)(input1)
    block1       = BatchNormalization(axis = 1)(block1)
    block1       = DepthwiseConv2D((Chans, 1), use_bias = False, 
                                   depth_multiplier = D,
                                   depthwise_constraint = max_norm(1.),
                                  data_format='channels_first')(block1)
    block1       = BatchNormalization(axis = 1)(block1)
    block1       = Activation('elu')(block1)
    block1       = AveragePooling2D((1, 4), data_format='channels_first')(block1)
    block1       = dropoutType(dropoutRate)(block1)
    
    block2       = SeparableConv2D(F2, (1, 16),
                                   use_bias = False, padding = 'same')(block1)
    block2       = BatchNormalization(axis = 1)(block2)
    block2       = Activation('elu')(block2)
    block2       = AveragePooling2D((1, 8), data_format='channels_first')(block2)
    block2       = dropoutType(dropoutRate)(block2)
        
    flatten      = Flatten(name = 'flatten')(block2)
    
    dense        = Dense(nb_classes, name = 'dense', 
                         kernel_constraint = max_norm(norm_rate))(flatten)
    softmax      = Activation('softmax', name = 'softmax')(dense)
    
    return Model(inputs=input1, outputs=softmax)


## z-score normalization

In [None]:
import numpy as np
def z_score_normalize(X):
    mean = np.mean(X, axis=-1, keepdims=True)
    std = np.std(X, axis=-1, keepdims=True)
    return (X - mean) / (std + 1e-6)

## Classification

In [None]:
from sklearn.model_selection import train_test_split
import keras
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import matplotlib.pyplot as plt
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.keras.callbacks import ReduceLROnPlateau
from sklearn.metrics import f1_score, balanced_accuracy_score, accuracy_score, precision_score, recall_score, roc_auc_score

import tensorflow as tf
import pandas as pd
import numpy as np

# Define lists to store results
accuracy_results = []
accuracy_results_balanced = []
precision_results = []
recall_results = []
f1_results = []
auc_result = []

# Number of model runs
n_runs = 10

# Z-score normalization on the full training+validation data
X_train_valid = z_score_normalize(X_train_valid)
# Normalize the test set (separately!)
X_test = z_score_normalize(X_test)

for run in range(n_runs):
    print(f"Running iteration {run + 1}/{n_runs}...")
    
    # Re-split training/validation set (ensure different random allocation)
    print(X_train_valid.shape, Y_train_valid.shape)
    X_train, X_valid, Y_train, Y_valid = train_test_split(
        X_train_valid, Y_train_valid, test_size=0.25, stratify=Y_train_valid, random_state=run
    )

    # Calculate class_weights
    class_weights = compute_class_weight(
        class_weight='balanced', classes=np.unique(Y_train), y=Y_train
    )
    class_weights = dict(enumerate(class_weights))  # Convert to dictionary format
    print("Class Weights:", class_weights)

    # Initialize and train model
    model = EEGNet(nb_classes=2, Chans=numChannel, Samples=sample, dropoutRate=0.5)
    model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    # Set ModelCheckpoint
    checkpointer = ModelCheckpoint(filepath=f'/tmp/checkpoint_{run}.keras', verbose=0, save_best_only=True, monitor='val_loss', mode='min')
    early_stopping = EarlyStopping(monitor='val_accuracy', patience=10, restore_best_weights=True, verbose=1)
    
    # Define ReduceLROnPlateau
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',       # Monitor the metric
        factor=0.3,               # Learning rate decrease factor
        patience=10,              # Number of epochs to wait before reducing learning rate
        min_lr=1e-6,              # Minimum learning rate
        verbose=1                 # Show messages
    )
    
    fittedModel = model.fit(X_train, Y_train, batch_size=64, epochs=150, verbose=2,
            validation_data=(X_valid, Y_valid),
            callbacks=[checkpointer, early_stopping], class_weight=class_weights)

    # Load best weights
    model.load_weights(f'/tmp/checkpoint_{run}.keras')

    # Output layer: softmax
    print(X_test.shape, Y_true_Labels_for_test.shape) 
    y_probs = model.predict(X_test)[:, 1]  # 取得類別 1 的機率
    y_pred = model.predict(X_test).argmax(axis=-1)

    # Calculate metrics
    accuracy = accuracy_score(Y_true_Labels_for_test, y_pred)
    accuracy_balanced = balanced_accuracy_score(Y_true_Labels_for_test, y_pred)
    precision = precision_score(Y_true_Labels_for_test, y_pred)
    recall = recall_score(Y_true_Labels_for_test, y_pred)
    f1 = f1_score(Y_true_Labels_for_test, y_pred)
    auc = roc_auc_score(Y_true_Labels_for_test, y_probs)  # Use probability of positive class
    
    # Save results
    accuracy_results.append(accuracy)
    accuracy_results_balanced.append(accuracy_balanced)
    precision_results.append(precision)
    recall_results.append(recall)
    f1_results.append(f1)
    auc_result.append(auc)

# Calculate mean and std
accuracy_mean, accuracy_std = np.mean(accuracy_results), np.std(accuracy_results)
accuracy_mean_balanced, accuracy_std_balanced = np.mean(accuracy_results_balanced), np.std(accuracy_results_balanced)
precision_mean, precision_std = np.mean(precision_results), np.std(precision_results)
recall_mean, recall_std = np.mean(recall_results), np.std(recall_results)
f1_mean, f1_std = np.mean(f1_results), np.std(f1_results)
auc_mean, auc_std = np.mean(auc_result), np.std(auc_result)

# now get min/max directly from each list
max_accuracy             = max(accuracy_results)
min_accuracy             = min(accuracy_results)
max_balanced_accuracy    = max(accuracy_results_balanced)
min_balanced_accuracy    = min(accuracy_results_balanced)
max_precision            = max(precision_results)
min_precision            = min(precision_results)
max_recall               = max(recall_results)
min_recall               = min(recall_results)
max_f1                   = max(f1_results)
min_f1                   = min(f1_results)
max_auc                  = max(auc_result)
min_auc                  = min(auc_result)

# 顯示結果
print('---------------------------------------------------------------')
print(accuracy_results)
print(accuracy_results_balanced)
print(precision_results)
print(recall_results)
print(f1_results)
print(auc_result)
print('---------------------------------------------------------------')
print(f"Accuracy: Mean = {accuracy_mean:.4f}, Std = {accuracy_std:.4f}")
print(f"Balanced Accuracy: Mean = {accuracy_mean_balanced:.4f}, Std = {accuracy_std_balanced:.4f}")
print(f"Precision: Mean = {precision_mean:.4f}, Std = {precision_std:.4f}")
print(f"Recall: Mean = {recall_mean:.4f}, Std = {recall_std:.4f}")
print(f"F1: Mean = {f1_mean:.4f}, Std = {f1_std:.4f}")
print(f"AUC: Mean = {auc_mean:.4f}, Std = {auc_std:.4f}")
print('---------------------------------------------------------------')
print(f"Max Accuracy:             {max_accuracy:.4f}")
print(f"Max Balanced Accuracy:    {max_balanced_accuracy:.4f}")
print(f"Max Precision:            {max_precision:.4f}")
print(f"Max Recall:               {max_recall:.4f}")
print(f"Max F1:                   {max_f1:.4f}")
print(f"Max AUC:                  {max_auc:.4f}")
print('---------------------------------------------------------------')
print(f"Min Accuracy:             {min_accuracy:.4f}")
print(f"Min Balanced Accuracy:    {min_balanced_accuracy:.4f}")
print(f"Min Precision:            {min_precision:.4f}")
print(f"Min Recall:               {min_recall:.4f}")
print(f"Min F1:                   {min_f1:.4f}")
print(f"Min AUC:                  {min_auc:.4f}")
print('---------------------------------------------------------------')