In [None]:
import numpy as np
import pandas as pd
import os
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Dropout, LSTM, Dense, BatchNormalization
from keras.utils import to_categorical
import tensorflow as tf

# --- SETTINGS ---
base_dir = r"C:\Users\zezom\PycharmProjects\HorusEye"
dl_ready_dir = os.path.join(base_dir, "Data", "Processed", "DL_ready")
labels_path = os.path.join(base_dir, "train.csv")
ssvep_labels = ['Forward', 'Backward', 'Left', 'Right']  # Adjust as needed

# --- LOAD LABELS CSV & CLEAN COLUMN HEADERS ---
labels_df = pd.read_csv(labels_path)
labels_df.columns = labels_df.columns.str.strip()  # Remove any trailing/leading whitespace
print("CSV columns:", labels_df.columns.tolist())

# --- SELECT ONLY SSVEP DATA ---
if 'paradigm' in labels_df.columns:
    # Filter by paradigm and label
    ssvep_df = labels_df[
        (labels_df['paradigm'].str.strip() == 'SSVEP') &
        (labels_df['label'].isin(ssvep_labels))
    ]
else:
    # No paradigm column: filter by label only
    ssvep_df = labels_df[labels_df['label'].isin(ssvep_labels)]

print(f"Total SSVEP trials found: {len(ssvep_df)}")

# --- LOAD EEG DATA EPOCHS ---
X = []
y = []
missing_files = []

for i, row in ssvep_df.iterrows():
    subject = str(row['subject_id']).strip()
    session = str(row['trial_session']).strip()
    trial_num = int(row['trial'])  # 1-based index
    label = row['label'].strip()

    npy_path = os.path.join(dl_ready_dir, f"{subject}_{session}_EEGdata_preprocessed_DLready.npy")
    if not os.path.exists(npy_path):
        missing_files.append(npy_path)
        continue

    epochs = np.load(npy_path)  # (n_trials, n_channels, n_samples)
    if trial_num - 1 >= epochs.shape[0]:
        print(f"Warning: trial {trial_num} out of range in file {npy_path}")
        continue

    X.append(epochs[trial_num - 1])
    y.append(label)

if len(X) == 0:
    raise RuntimeError("No SSVEP samples found! Check your preprocessed .npy files and label filtering.")

X = np.stack(X)  # (num_trials, n_channels, n_samples)
label_encoder = LabelEncoder()
y_enc = label_encoder.fit_transform(y)
num_classes = len(label_encoder.classes_)

# --- PREPARE FOR DEEP LEARNING ---
# Transpose to (samples, timesteps, channels) for Conv1D/LSTM
X = np.transpose(X, (0, 2, 1))
y_cat = to_categorical(y_enc, num_classes=num_classes)

# --- TRAIN/VAL SPLIT ---
X_train, X_val, y_train, y_val = train_test_split(
    X, y_cat, test_size=0.2, random_state=42, stratify=y_enc
)

print("X_train:", X_train.shape, "X_val:", X_val.shape)
print("Classes:", label_encoder.classes_)

# --- MODEL ---
model = Sequential()
model.add(Conv1D(filters=64, kernel_size=7, activation='relu', input_shape=(X.shape[1], X.shape[2])))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.3))

model.add(Conv1D(filters=128, kernel_size=5, activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.3))

model.add(Conv1D(filters=256, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(pool_size=2))
model.add(Dropout(0.4))

model.add(LSTM(128, return_sequences=False))
model.add(Dropout(0.5))

model.add(Dense(64, activation='relu'))
model.add(Dropout(0.4))
model.add(Dense(num_classes, activation='softmax'))

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

model.summary()

# --- TRAINING ---
history = model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=32,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=8, restore_best_weights=True)
    ]
)

# --- RESULTS ---
if missing_files:
    print(f"Warning: {len(missing_files)} .npy files were missing, e.g. {missing_files[:3]}")

val_loss, val_acc = model.evaluate(X_val, y_val, verbose=0)
print(f"Validation Accuracy: {val_acc:.4f}")

train_loss, train_acc = model.evaluate(X_train, y_train, verbose=0)
print(f"Training Accuracy: {train_acc:.4f}")


In [2]:
import os
import pickle

# --- Define save paths ---
ssvep_model_path = os.path.join(base_dir, "ssvep_model.keras")
ssvep_encoder_path = os.path.join(base_dir, "ssvep_label_encoder.pkl")

# --- Save the trained SSVEP model (Keras format) ---
model.save(ssvep_model_path)
print(f"SSVEP model saved to: {ssvep_model_path}")

# --- Save the LabelEncoder object for later decoding ---
with open(ssvep_encoder_path, "wb") as f:
    pickle.dump(label_encoder, f)
print(f"SSVEP label encoder saved to: {ssvep_encoder_path}")


SSVEP model saved to: C:\Users\zezom\PycharmProjects\HorusEye\ssvep_model.keras
SSVEP label encoder saved to: C:\Users\zezom\PycharmProjects\HorusEye\ssvep_label_encoder.pkl
