In [None]:
import numpy as np
import scipy.signal as signal
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, confusion_matrix
import matplotlib.pyplot as plt

# Load saved data from Notebook-1
eeg = np.load("../data/eeg.npy")
stim_code = np.load("../data/stim_code.npy")
stim_type = np.load("../data/stim_type.npy")

fs = 240
print(eeg.shape, stim_code.shape, stim_type.shape)


In [None]:
def bandpass_filter(data, fs, low=0.1, high=30):
    b, a = signal.butter(4, [low/(fs/2), high/(fs/2)], btype="band")
    return signal.filtfilt(b, a, data, axis=0)

eeg_filt = bandpass_filter(eeg, fs)
print("Filtered EEG shape:", eeg_filt.shape)


In [None]:
epoch_len = int(0.8 * fs)  # 800 ms
epochs = []
labels = []

for i in range(len(stim_code)):
    if stim_code[i] != 0:
        start = i
        end = i + epoch_len
        if end < eeg_filt.shape[0]:
            epochs.append(eeg_filt[start:end, :])
            labels.append(stim_type[i])

epochs = np.array(epochs)
labels = np.array(labels)

print("Epochs:", epochs.shape)
print("Labels:", labels.shape)


In [None]:
# (epochs, time, channels) → (epochs, channels, time, 1)
X = np.transpose(epochs, (0, 2, 1))[..., np.newaxis]
y = labels

print("X shape:", X.shape)
print("y shape:", y.shape)


In [None]:
X_train, X_test, y_train, y_test, stim_train, stim_test = train_test_split(
    X, y, stim_code[stim_code != 0],
    test_size=0.2,
    random_state=42,
    stratify=y
)

print(X_test.shape, stim_test.shape)


In [None]:
from tensorflow.keras.layers import (
    Input, Conv2D, BatchNormalization,
    Activation, AveragePooling2D,
    Dropout, Flatten, Dense
)
from tensorflow.keras.models import Model

def EEGNet(input_shape):
    inputs = Input(shape=input_shape)

    x = Conv2D(16, (1, 64), padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("elu")(x)
    x = AveragePooling2D((1, 4))(x)
    x = Dropout(0.5)(x)

    x = Flatten()(x)
    outputs = Dense(1, activation="sigmoid")(x)

    return Model(inputs, outputs)

model = EEGNet(X_train.shape[1:])
model.summary()


In [None]:
model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=["accuracy"]
)

history = model.fit(
    X_train, y_train,
    epochs=20,          # 20 is correct
    batch_size=64,
    validation_split=0.2,
    verbose=1
)


We trained for 20 epochs because the model converged before that point; validation accuracy saturated and further training risked overfitting without improving performance.
In future work, we would use early stopping to automatically determine the optimal number of epochs.

In [None]:
test_loss, test_acc = model.evaluate(X_test, y_test)
print("Test accuracy:", test_acc)

y_prob = model.predict(X_test).ravel()
auc = roc_auc_score(y_test, y_prob)
print("Test AUC:", auc)

cm = confusion_matrix(y_test, (y_prob > 0.5).astype(int))
print(cm)


In [None]:
model.save("../models/p300_cnn.keras")
print("Model saved successfully in Keras format")


In [None]:
# Predict probabilities (NOT binary)
y_prob = model.predict(X_test).ravel()


In [None]:
epochs = []
labels = []
epoch_stim_codes = []

epoch_len = int(0.8 * fs)

for i in range(len(stim_code)):
    if stim_code[i] != 0:
        start = i
        end = i + epoch_len

        if end < eeg_filt.shape[0]:
            epochs.append(eeg_filt[start:end, :])
            labels.append(stim_type[i])
            epoch_stim_codes.append(stim_code[i])

epochs = np.array(epochs)
labels = np.array(labels)
epoch_stim_codes = np.array(epoch_stim_codes)

print("Epochs:", epochs.shape)
print("Labels:", labels.shape)
print("Stim codes:", epoch_stim_codes.shape)


In [None]:
from sklearn.model_selection import train_test_split

X = np.transpose(epochs, (0, 2, 1))[..., np.newaxis]
y = labels
stim_codes = epoch_stim_codes

X_train, X_test, y_train, y_test, stim_train, stim_test = train_test_split(
    X, y, stim_codes,
    test_size=0.2,
    random_state=42,
    stratify=y
)

print(X_test.shape, stim_test.shape)


In [None]:
import numpy as np

def decode_row_col(stim_codes, probs):
    """
    stim_codes: array of stimulus codes (1–12)
    probs: P300 probabilities for those flashes
    """
    row_scores = np.zeros(6)
    col_scores = np.zeros(6)

    for code, p in zip(stim_codes, probs):
        if 1 <= code <= 6:          # rows
            row_scores[code - 1] += p
        elif 7 <= code <= 12:       # columns
            col_scores[code - 7] += p

    row = np.argmax(row_scores)
    col = np.argmax(col_scores)

    return row, col, row_scores, col_scores


In [None]:
trial_size = 120

stim_trial = stim_test[:trial_size]
prob_trial = y_prob[:trial_size]

row, col, r_scores, c_scores = decode_row_col(stim_trial, prob_trial)

print("Predicted row:", row)
print("Predicted column:", col)
print("Row scores:", r_scores)
print("Column scores:", c_scores)


In [None]:
speller = np.array([
    ['A','B','C','D','E','F'],
    ['G','H','I','J','K','L'],
    ['M','N','O','P','Q','R'],
    ['S','T','U','V','W','X'],
    ['Y','Z','1','2','3','4'],
    ['5','6','7','8','9','_']
])

predicted_char = speller[row, col]
print("Predicted character:", predicted_char)


In [None]:
def decode_character_from_epochs(stim_codes, probs):
    row_scores = np.zeros(6)
    col_scores = np.zeros(6)

    for code, p in zip(stim_codes, probs):
        if 1 <= code <= 6:
            row_scores[code - 1] += p
        elif 7 <= code <= 12:
            col_scores[code - 7] += p

    row = np.argmax(row_scores)
    col = np.argmax(col_scores)

    return row, col


In [None]:
speller = np.array([
    ['A','B','C','D','E','F'],
    ['G','H','I','J','K','L'],
    ['M','N','O','P','Q','R'],
    ['S','T','U','V','W','X'],
    ['Y','Z','1','2','3','4'],
    ['5','6','7','8','9','_']
])


In [None]:
def decode_word(stim_codes, probs, chars=5, flashes_per_char=120):
    decoded = []
    pointer = 0

    for _ in range(chars):
        stim_chunk = stim_codes[pointer:pointer + flashes_per_char]
        prob_chunk = probs[pointer:pointer + flashes_per_char]

        row, col = decode_character_from_epochs(stim_chunk, prob_chunk)
        decoded.append(speller[row, col])

        pointer += flashes_per_char

    return ''.join(decoded)


In [None]:
decoded_word = decode_word(
    stim_test,
    y_prob,
    chars=5,
    flashes_per_char=120
)

print("Decoded word:", decoded_word)


In [None]:
import time
def live_p300_demo(stim_codes, probs, flashes_per_char=120, delay=0.05, max_chars=5):
    row_scores = np.zeros(6)
    col_scores = np.zeros(6)
    flash_count = 0
    char_count = 0

    for code, p in zip(stim_codes, probs):
        time.sleep(delay)

        if 1 <= code <= 6:
            row_scores[code - 1] += p
        elif 7 <= code <= 12:
            col_scores[code - 7] += p

        flash_count += 1

        if flash_count == flashes_per_char:
            row = np.argmax(row_scores)
            col = np.argmax(col_scores)
            char = speller[row, col]

            print("➡️ Spelled character:", char)

            row_scores[:] = 0
            col_scores[:] = 0
            flash_count = 0
            char_count += 1

            if char_count == max_chars:
                print("✅ Demo finished")
                break


In [None]:
live_p300_demo(stim_test, y_prob, max_chars=10)


In [None]:
np.save("../data/X_test.npy", X_test)
np.save("../data/y_test.npy", y_test)
np.save("../data/stim_test.npy", stim_test)
